You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/07/17 01:57:38 UTC

[incubator-iotdb] branch dev_merge updated: add merge statistics

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev_merge
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/dev_merge by this push:
     new 354d637  add merge statistics
354d637 is described below

commit 354d63704ff8a2c8faed8e7e369f0c5adb03deae
Author: 江天 <jt...@163.com>
AuthorDate: Wed Jul 17 09:54:58 2019 +0800

    add merge statistics
---
 .../apache/iotdb/db/engine/merge/MergeTask.java    | 29 +++++++++++++++++++---
 1 file changed, 26 insertions(+), 3 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
index 134763d..6244f10 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
@@ -110,6 +110,8 @@ public class MergeTask implements Callable<Void> {
   // future feature
   private boolean fullMerge;
 
+  private int totalChunkWritten;
+
   public MergeTask(List<TsFileResource> seqFiles,
       List<TsFileResource> unseqFiles, String storageGroupDir, MergeCallback callback,
       String taskName, boolean fullMerge) throws IOException {
@@ -141,6 +143,7 @@ public class MergeTask implements Callable<Void> {
           unseqFiles.size());
     }
     long startTime = System.currentTimeMillis();
+    long totalFileSize = collectFileSizes(seqFiles, unseqFiles);
     this.mergeLogger = new MergeLogger(storageGroupDir);
     logFiles();
 
@@ -152,8 +155,26 @@ public class MergeTask implements Callable<Void> {
 
     cleanUp(true);
     if (logger.isInfoEnabled()) {
-      logger.info("{} ends after {}ms", taskName, System.currentTimeMillis() - startTime);
+      double elapsedTime = (double) (System.currentTimeMillis() - startTime);
+      double byteRate = totalFileSize / elapsedTime / 1024;
+      double seriesRate = unmergedSeries.size() / elapsedTime;
+      double chunkRate = totalChunkWritten / elapsedTime;
+      double fileRate = (seqFiles.size() + unseqFiles.size()) / elapsedTime;
+      logger.info("{} ends after {}ms, byteRate: {}MB/s, seriesRate {}/s, chunkRate: {}/s, "
+              + "fileRate: {}/s",
+          taskName, elapsedTime, byteRate, seriesRate, chunkRate, fileRate);
+    }
+  }
+
+  private long collectFileSizes(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles) {
+    long totalSize = 0;
+    for (TsFileResource tsFileResource : seqFiles) {
+      totalSize += tsFileResource.getFileSize();
+    }
+    for (TsFileResource tsFileResource : unseqFiles) {
+      totalSize += tsFileResource.getFileSize();
     }
+    return totalSize;
   }
 
   void mergeFiles(List<TsFileResource> unmergedFiles) throws IOException {
@@ -215,11 +236,11 @@ public class MergeTask implements Callable<Void> {
       mergeOnePath(path);
       mergeLogger.logTSEnd(path);
       mergedCnt ++;
-      if (logger.isDebugEnabled()) {
+      if (logger.isInfoEnabled()) {
         double newProgress = 100 * mergedCnt / (double) (unmergedSeries.size());
         if (newProgress - progress >= 0.01) {
           progress = newProgress;
-          logger.debug("{} has merged {}% series", taskName, progress);
+          logger.info("{} has merged {}% series", taskName, progress);
         }
       }
     }
@@ -548,6 +569,7 @@ public class MergeTask implements Callable<Void> {
       currTimeValuePair = unseqReader.hasNext() ? unseqReader.next() : null;
       cnt ++;
     }
+    totalChunkWritten ++;
     return cnt;
   }
 
@@ -584,6 +606,7 @@ public class MergeTask implements Callable<Void> {
         writeBatchPoint(batchData, i, dataType, chunkWriter);
       }
     }
+    totalChunkWritten ++;
     return chunk.getHeader().getNumOfPages();
   }