You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/07/17 01:57:38 UTC
[incubator-iotdb] branch dev_merge updated: add merge statistics
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev_merge
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/dev_merge by this push:
new 354d637 add merge statistics
354d637 is described below
commit 354d63704ff8a2c8faed8e7e369f0c5adb03deae
Author: 江天 <jt...@163.com>
AuthorDate: Wed Jul 17 09:54:58 2019 +0800
add merge statistics
---
.../apache/iotdb/db/engine/merge/MergeTask.java | 29 +++++++++++++++++++---
1 file changed, 26 insertions(+), 3 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
index 134763d..6244f10 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
@@ -110,6 +110,8 @@ public class MergeTask implements Callable<Void> {
// future feature
private boolean fullMerge;
+ private int totalChunkWritten;
+
public MergeTask(List<TsFileResource> seqFiles,
List<TsFileResource> unseqFiles, String storageGroupDir, MergeCallback callback,
String taskName, boolean fullMerge) throws IOException {
@@ -141,6 +143,7 @@ public class MergeTask implements Callable<Void> {
unseqFiles.size());
}
long startTime = System.currentTimeMillis();
+ long totalFileSize = collectFileSizes(seqFiles, unseqFiles);
this.mergeLogger = new MergeLogger(storageGroupDir);
logFiles();
@@ -152,8 +155,26 @@ public class MergeTask implements Callable<Void> {
cleanUp(true);
if (logger.isInfoEnabled()) {
- logger.info("{} ends after {}ms", taskName, System.currentTimeMillis() - startTime);
+ double elapsedTime = (double) (System.currentTimeMillis() - startTime);
+ double byteRate = totalFileSize / elapsedTime / 1024;
+ double seriesRate = unmergedSeries.size() / elapsedTime;
+ double chunkRate = totalChunkWritten / elapsedTime;
+ double fileRate = (seqFiles.size() + unseqFiles.size()) / elapsedTime;
+ logger.info("{} ends after {}ms, byteRate: {}MB/s, seriesRate {}/s, chunkRate: {}/s, "
+ + "fileRate: {}/s",
+ taskName, elapsedTime, byteRate, seriesRate, chunkRate, fileRate);
+ }
+ }
+
+ private long collectFileSizes(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles) {
+ long totalSize = 0;
+ for (TsFileResource tsFileResource : seqFiles) {
+ totalSize += tsFileResource.getFileSize();
+ }
+ for (TsFileResource tsFileResource : unseqFiles) {
+ totalSize += tsFileResource.getFileSize();
}
+ return totalSize;
}
void mergeFiles(List<TsFileResource> unmergedFiles) throws IOException {
@@ -215,11 +236,11 @@ public class MergeTask implements Callable<Void> {
mergeOnePath(path);
mergeLogger.logTSEnd(path);
mergedCnt ++;
- if (logger.isDebugEnabled()) {
+ if (logger.isInfoEnabled()) {
double newProgress = 100 * mergedCnt / (double) (unmergedSeries.size());
if (newProgress - progress >= 0.01) {
progress = newProgress;
- logger.debug("{} has merged {}% series", taskName, progress);
+ logger.info("{} has merged {}% series", taskName, progress);
}
}
}
@@ -548,6 +569,7 @@ public class MergeTask implements Callable<Void> {
currTimeValuePair = unseqReader.hasNext() ? unseqReader.next() : null;
cnt ++;
}
+ totalChunkWritten ++;
return cnt;
}
@@ -584,6 +606,7 @@ public class MergeTask implements Callable<Void> {
writeBatchPoint(batchData, i, dataType, chunkWriter);
}
}
+ totalChunkWritten ++;
return chunk.getHeader().getNumOfPages();
}