You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/07/16 03:26:11 UTC

[incubator-iotdb] branch dev_merge updated: add mergeChunkNumberThreshold

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev_merge
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/dev_merge by this push:
     new 0e2b10d  add mergeChunkNumberThreshold
0e2b10d is described below

commit 0e2b10d958e5b4d4a26a1e815a081ad305ca3209
Author: 江天 <jt...@163.com>
AuthorDate: Tue Jul 16 11:23:48 2019 +0800

    add mergeChunkNumberThreshold
---
 iotdb/iotdb/conf/iotdb-engine.properties           | 18 +++++++----
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 35 ++++++++++++++++++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  2 ++
 .../apache/iotdb/db/engine/merge/MergeTask.java    | 28 +++++++++--------
 .../iotdb/db/engine/merge/MergePerfTest.java       | 21 ++++---------
 .../iotdb/db/engine/merge/MergeTaskTest.java       | 23 ++++++++++++++
 .../apache/iotdb/db/engine/merge/MergeTest.java    | 13 ++++++--
 .../tsfile/file/metadata/ChunkGroupMetaData.java   |  2 +-
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |  2 +-
 9 files changed, 106 insertions(+), 38 deletions(-)

diff --git a/iotdb/iotdb/conf/iotdb-engine.properties b/iotdb/iotdb/conf/iotdb-engine.properties
index c25bcc1..924e71e 100644
--- a/iotdb/iotdb/conf/iotdb-engine.properties
+++ b/iotdb/iotdb/conf/iotdb-engine.properties
@@ -146,21 +146,27 @@ merge_thread_num=1
 # merge_memory_budget=2147483648
 
 # When set to true, if some crashed merges are detected during system rebooting, such merges will
-# be continued, otherwise, the unfinished part of such merges will not be restarted while the
-# finished part still remains as it is.
+# be continued, otherwise, the unfinished parts of such merges will not be continued while the
+# finished parts still remains as they are.
 # If you are feeling the rebooting is too slow, set this to false, true by default
 continue_merge_after_reboot=true
 
 # A global merge will be performed each such interval, that is, each storage group will be merged
-# (if proper merge candidates can be found). Unit: second, default: 2hours.
+# (if proper merge candidates can be found). Unit: second, default: 1hours.
 # When less than or equal to 0, timed merge is disabled.
-merge_interval_sec=7200
+merge_interval_sec=3600
 
 # When set to true, all merges becomes full merge (the whole SeqFiles are re-written despite how
-# much they are overflowed). This may increase merge overhead significantly, do not open until
-# you need full merge urgently.
+# much they are overflowed). This may increase merge overhead depending on how much the SeqFiles
+# are overflowed.
 force_full_merge=false
 
+# During a merge, if a chunk with less number of chunks than this parameter, the chunk will be
+# merged with its succeeding chunks even if it is not overflowed, until the merged chunks reach
+# this threshold and the new chunk will be flushed.
+# When less than 0, this mechanism is disabled.
+chunk_merge_point_threshold=4096
+
 
 ####################
 ### Statistics Monitor configuration
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 1408dd1..150848b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -194,16 +194,43 @@ public class IoTDBConfig {
    */
   private boolean chunkBufferPoolEnable = false;
 
+  /**
+   * How much memory (in byte) can be used by a single merge task.
+   */
   private long mergeMemoryBudget = (long) (Runtime.getRuntime().maxMemory() * 0.2);
 
+  /**
+   * How many thread will be set up to perform merges.
+   */
   private int mergeThreadNum = 1;
 
+  /**
+   * When set to true, if some crashed merges are detected during system rebooting, such merges will
+   * be continued, otherwise, the unfinished parts of such merges will not be continued while the
+   * finished parts still remain as they are.
+   */
   private boolean continueMergeAfterReboot = true;
 
+  /**
+   * A global merge will be performed each such interval, that is, each storage group will be merged
+   * (if proper merge candidates can be found). Unit: second.
+   */
   private long mergeIntervalSec = 2 * 3600L;
 
+  /**
+   * When set to true, all merges becomes full merge (the whole SeqFiles are re-written despite how
+   * much they are overflowed). This may increase merge overhead depending on how much the SeqFiles
+   * are overflowed.
+   */
   private boolean forceFullMerge = false;
 
+  /**
+   * During a merge, if a chunk with less number of chunks than this parameter, the chunk will be
+   * merged with its succeeding chunks even if it is not overflowed, until the merged chunks reach
+   * this threshold and the new chunk will be flushed.
+   */
+  private int chunkMergePointThreshold = 512;
+
   public IoTDBConfig() {
     // empty constructor
   }
@@ -576,4 +603,12 @@ public class IoTDBConfig {
   public void setForceFullMerge(boolean forceFullMerge) {
     this.forceFullMerge = forceFullMerge;
   }
+
+  public int getChunkMergePointThreshold() {
+    return chunkMergePointThreshold;
+  }
+
+  public void setChunkMergePointThreshold(int chunkMergePointThreshold) {
+    this.chunkMergePointThreshold = chunkMergePointThreshold;
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 568f3e1..49cbaba 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -210,6 +210,8 @@ public class IoTDBDescriptor {
           Long.toString(conf.getMergeIntervalSec()))));
       conf.setForceFullMerge(Boolean.parseBoolean(properties.getProperty("force_full_merge",
           Boolean.toString(conf.isForceFullMerge()))));
+      conf.setChunkMergePointThreshold(Integer.parseInt(properties.getProperty(
+          "chunk_merge_point_threshold", Integer.toString(conf.getChunkMergePointThreshold()))));
 
       conf.setEnablePerformanceStat(Boolean
           .parseBoolean(properties.getProperty("enable_performance_stat",
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
index ab83d2f..ad597aa 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
@@ -38,12 +38,14 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.factory.SeriesReaderFactoryImpl;
 import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
 import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
@@ -74,7 +76,7 @@ import org.slf4j.LoggerFactory;
  */
 public class MergeTask implements Callable<Void> {
 
-  private static final int MIN_CHUNK_POINT_NUM = 4*1024*1024;
+  private static int minChunkPointNum = IoTDBDescriptor.getInstance().getConfig().getChunkMergePointThreshold();
   public static final String MERGE_SUFFIX = ".merge";
   private static final Logger logger = LoggerFactory.getLogger(MergeTask.class);
 
@@ -167,16 +169,16 @@ public class MergeTask implements Callable<Void> {
       int unmergedChunkNum = unmergedChunkCnt.getOrDefault(seqFile, 0);
       if (mergedChunkNum >= unmergedChunkNum) {
         // move the unmerged data to the new file
-        if (logger.isDebugEnabled()) {
-          logger.debug("{} moving unmerged data of {} to the merged file", taskName,
-              seqFile.getFile().getName());
+        if (logger.isInfoEnabled()) {
+          logger.info("{} moving unmerged data of {} to the merged file, {} merged chunks, {} "
+                  + "unmerged chunks", taskName, seqFile.getFile().getName(), mergedChunkNum, unmergedChunkNum);
         }
         moveUnmergedToNew(seqFile);
       } else {
         // move the merged data to the old file
-        if (logger.isDebugEnabled()) {
-          logger.debug("{} moving merged data of {} to the old file", taskName,
-              seqFile.getFile().getName());
+        if (logger.isInfoEnabled()) {
+          logger.info("{} moving merged data of {} to the old file {} merged chunks, {} "
+                  + "unmerged chunks", taskName, seqFile.getFile().getName(), mergedChunkNum, unmergedChunkNum);
         }
         moveMergedToOld(seqFile);
       }
@@ -401,7 +403,7 @@ public class MergeTask implements Callable<Void> {
         currTimeValuePair = unseqReader.next();
       }
       for (int i = 0; i < seqFiles.size(); i++) {
-        mergeOneFile(path, i, unseqReader, schema);
+        pathMergeOneFile(path, i, unseqReader, schema);
       }
     } catch (IOException e) {
       logger.error("Cannot read unseq data of {} during merge", path, e);
@@ -414,7 +416,7 @@ public class MergeTask implements Callable<Void> {
     }
   }
 
-  private void mergeOneFile(Path path, int seqFileIdx, IPointReader unseqReader,
+  private void pathMergeOneFile(Path path, int seqFileIdx, IPointReader unseqReader,
       MeasurementSchema measurementSchema)
       throws IOException {
     TsFileResource currTsFile = seqFiles.get(seqFileIdx);
@@ -480,13 +482,13 @@ public class MergeTask implements Callable<Void> {
         unmergedChunkStartTimes.get(currFile).get(path).add(currMeta.getStartTime());
       }
 
-      if (ptWritten >= MIN_CHUNK_POINT_NUM) {
+      if (minChunkPointNum >= 0 && ptWritten >= minChunkPointNum || ptWritten > 0 && minChunkPointNum < 0) {
         // the new chunk's size is large enough and it should be flushed
         chunkWriter.writeToFileWriter(mergeFileWriter);
         ptWritten = 0;
       }
     }
-    if (ptWritten >= 0) {
+    if (ptWritten > 0) {
       // the last merged chunk may still be smaller than the threshold, flush it anyway
       chunkWriter.writeToFileWriter(mergeFileWriter);
     }
@@ -509,7 +511,7 @@ public class MergeTask implements Callable<Void> {
     // a small chunk has been written, this chunk merge with it to create a larger chunk
     // or this chunk is too small and it is not the last chunk, merge it with the next chunk
     boolean chunkTooSmall =
-        ptWritten > 0 || (currMeta.getNumOfPoints() < MIN_CHUNK_POINT_NUM && !isLastChunk);
+        ptWritten > 0 || (minChunkPointNum >= 0 && currMeta.getNumOfPoints() < minChunkPointNum && !isLastChunk);
     boolean chunkModified = currMeta.getDeletedAt() > Long.MIN_VALUE;
     int newPtWritten = ptWritten;
 
@@ -662,6 +664,6 @@ public class MergeTask implements Callable<Void> {
 
   private IChunkWriter getChunkWriter(MeasurementSchema measurementSchema) {
     return  new ChunkWriterImpl(measurementSchema,
-        new ChunkBuffer(measurementSchema), MIN_CHUNK_POINT_NUM);
+        new ChunkBuffer(measurementSchema), TSFileConfig.pageCheckSizeThreshold);
   }
 }
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java
index 8f1bc59..1909a98 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java
@@ -24,14 +24,12 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.junit.Before;
 
 public class MergePerfTest extends MergeTaskTest{
 
-  private int seqFileNum;
-  private int unseqFileNum;
   private double unseqRatio;
 
   private Random random = new Random(System.currentTimeMillis());
@@ -39,16 +37,8 @@ public class MergePerfTest extends MergeTaskTest{
   private long timeConsumption;
   private boolean fullMerge;
 
-  @Before
   @Override
-  public void setUp() throws IOException, WriteProcessException {
-    tempSGDir = new File("tempSG");
-    tempSGDir.mkdirs();
-    prepareSeries();
-    prepareFiles();
-  }
-
-  private void prepareFiles() throws IOException, WriteProcessException {
+  void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException {
     for (int i = 0; i < seqFileNum; i++) {
       File file = new File(i + "seq.tsfile");
       TsFileResource tsFileResource = new TsFileResource(file);
@@ -59,7 +49,7 @@ public class MergePerfTest extends MergeTaskTest{
     long unseqLength = (long) (timeRange * unseqRatio);
     for (int i = 0; i < unseqFileNum; i++) {
       long unseqOffset = (long) ((1.0 - unseqRatio) * random.nextDouble() * timeRange);
-      System.out.println(unseqOffset + "  " + unseqLength);
+      //System.out.println(unseqOffset + "  " + unseqLength);
       File file = new File(i + "unseq.tsfile");
       TsFileResource tsFileResource = new TsFileResource(file);
       unseqResources.add(tsFileResource);
@@ -79,6 +69,7 @@ public class MergePerfTest extends MergeTaskTest{
   }
 
   public static void main(String[] args) throws Exception {
+    IoTDBDescriptor.getInstance().getConfig().setChunkMergePointThreshold(-1);
 
     List<Long> timeConsumptions = new ArrayList<>();
     MergePerfTest perfTest = new MergePerfTest();
@@ -90,7 +81,7 @@ public class MergePerfTest extends MergeTaskTest{
     perfTest.unseqRatio = 0.2;
     perfTest.ptNum = 10000;
     perfTest.flushInterval = 1000;
-    perfTest.fullMerge = true;
+    perfTest.fullMerge = false;
 
     for (int i = 0; i < 3; i++) {
       // cache warm-up
@@ -117,7 +108,7 @@ public class MergePerfTest extends MergeTaskTest{
 //    }
 //    double[] doubleParameters = new double[10];
 //    for (int i = 1; i <= 10; i++) {
-//      doubleParameters[i-1] = 0.1 * i;
+//      doubleParameters[i-1] = 0.01 * i;
 //    }
 //    for (double param : doubleParameters) {
 //      perfTest.unseqRatio = param;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
index d9b3377..fa10f1e 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
@@ -25,6 +25,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
 import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.reader.sequence.SequenceSeriesReader;
@@ -96,6 +97,28 @@ public class MergeTaskTest extends MergeTest {
   }
 
   @Test
+  public void testChunkNumThreshold() throws Exception {
+    IoTDBDescriptor.getInstance().getConfig().setChunkMergePointThreshold(Integer.MAX_VALUE);
+    MergeTask mergeTask =
+        new MergeTask(seqResources, unseqResources, tempSGDir.getPath(), (k, v, l) -> {}, "test",
+            false);
+    mergeTask.call();
+
+    QueryContext context = new QueryContext();
+    Path path = new Path(deviceIds[0], measurementSchemas[0].getMeasurementId());
+    SequenceSeriesReader tsFilesReader = new SequenceSeriesReader(path,
+        Collections.singletonList(seqResources.get(0)),
+        null, context);
+    while (tsFilesReader.hasNext()) {
+      BatchData batchData = tsFilesReader.nextBatch();
+      for (int i = 0; i < batchData.length(); i++) {
+        assertEquals(batchData.getTimeByIndex(i) + 20000.0, batchData.getDoubleByIndex(i), 0.001);
+      }
+    }
+    tsFilesReader.close();
+  }
+
+  @Test
   public void testPartialMerge1() throws Exception {
     MergeTask mergeTask =
         new MergeTask(seqResources, unseqResources.subList(0, 1), tempSGDir.getPath(),
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
index 8972d9f..4f5fe00 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
@@ -38,6 +39,8 @@ import org.junit.Before;
 
 abstract class MergeTest {
 
+  int seqFileNum = 5;
+  int unseqFileNum = 5;
   int measurementNum = 10;
   int deviceNum = 10;
   long ptNum = 100;
@@ -49,10 +52,15 @@ abstract class MergeTest {
   List<TsFileResource> seqResources = new ArrayList<>();
   List<TsFileResource> unseqResources = new ArrayList<>();
 
+  private int prevMergeChunkThreshold;
+
   @Before
   public void setUp() throws IOException, WriteProcessException {
+    prevMergeChunkThreshold =
+        IoTDBDescriptor.getInstance().getConfig().getChunkMergePointThreshold();
+    IoTDBDescriptor.getInstance().getConfig().setChunkMergePointThreshold(-1);
     prepareSeries();
-    prepareFiles(5, 5);
+    prepareFiles(seqFileNum, unseqFileNum);
   }
 
   @After
@@ -60,6 +68,7 @@ abstract class MergeTest {
     removeFiles();
     seqResources.clear();
     unseqResources.clear();
+    IoTDBDescriptor.getInstance().getConfig().setChunkMergePointThreshold(prevMergeChunkThreshold);
   }
 
   void prepareSeries() {
@@ -74,7 +83,7 @@ abstract class MergeTest {
     }
   }
 
-  private void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException {
+  void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException {
     for (int i = 0; i < seqFileNum; i++) {
       File file = new File(i + "seq.tsfile");
       TsFileResource tsFileResource = new TsFileResource(file);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaData.java
index 01cc0ab..dc17641 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaData.java
@@ -173,7 +173,7 @@ public class ChunkGroupMetaData {
   }
 
   public List<ChunkMetaData> getChunkMetaDataList() {
-    return chunkMetaDataList == null ? null : Collections.unmodifiableList(chunkMetaDataList);
+    return chunkMetaDataList;
   }
 
   @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index f4750a3..84e1311 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -414,7 +414,7 @@ public class TsFileIOWriter {
         boolean chunkValid = startTimeIdx < pathChunkStartTimes.size()
             && pathChunkStartTimes.get(startTimeIdx) == chunkMetaData.getStartTime();
         if (!chunkValid) {
-          chunkGroupMetaDataIterator.remove();
+          chunkMetaDataIterator.remove();
           chunkNum--;
           invalidChunkNum++;
         } else {