You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/05/10 11:18:26 UTC

[iotdb] branch master updated: Add continuous compaction in level compaction strategy (#2080)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 819b195  Add continuous compaction in level compaction strategy (#2080)
819b195 is described below

commit 819b19551d34406dade92527bfb36d10501bb6a2
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Mon May 10 19:18:02 2021 +0800

    Add continuous compaction in level compaction strategy (#2080)
    
    * add enable unseq compaction
    
    * add continuous compaction in level compaction strategy
    
    * fix compaction bug
    
    * add load configure
    
    * update continuous compaction to new compaction task
    
    * fix comment
    
    * update config and variable name
    
    * fix format
    
    * fix compile
    
    * resolve conversation
    
    Co-authored-by: zhanglingzhe <su...@foxmail.com>
---
 .../resources/conf/iotdb-engine.properties         |  4 ++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 14 ++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 13 ++++++
 .../db/engine/compaction/TsFileManagement.java     |  9 +++-
 .../level/LevelCompactionTsFileManagement.java     | 51 ++++++++++++----------
 .../engine/storagegroup/StorageGroupProcessor.java | 30 ++++++++++---
 .../compaction/LevelCompactionCacheTest.java       |  3 +-
 .../engine/compaction/LevelCompactionLogTest.java  |  3 +-
 .../compaction/LevelCompactionMergeTest.java       |  3 +-
 .../compaction/LevelCompactionMoreDataTest.java    |  3 +-
 .../NoCompactionTsFileManagementTest.java          |  4 +-
 .../org/apache/iotdb/db/script/EnvScriptIT.java    |  6 ++-
 12 files changed, 105 insertions(+), 38 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index e7cd8ce..3a7725a 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -374,6 +374,10 @@ timestamp_precision=ms
 # enable_unseq_compaction=true
 
 # Works when the compaction_strategy is LEVEL_COMPACTION.
+# Whether to start next compaction task automatically after finish one compaction task
+# enable_continuous_compaction=true
+
+# Works when the compaction_strategy is LEVEL_COMPACTION.
 # The max seq file num of each level.
 # When the num of files in one level exceeds this,
 # the files in this level will merge to one and put to upper level.
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index a21e783..23a1994 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -322,6 +322,12 @@ public class IoTDBConfig {
   private boolean enableUnseqCompaction = true;
 
   /**
+   * Works when the compaction_strategy is LEVEL_COMPACTION. Whether to start next compaction task
+   * automatically after finish one compaction task
+   */
+  private boolean enableContinuousCompaction = true;
+
+  /**
    * Works when the compaction_strategy is LEVEL_COMPACTION. The max seq file num of each level.
    * When the num of files in one level exceeds this, the files in this level will merge to one and
    * put to upper level.
@@ -1474,6 +1480,14 @@ public class IoTDBConfig {
     this.enableUnseqCompaction = enableUnseqCompaction;
   }
 
+  public boolean isEnableContinuousCompaction() {
+    return enableContinuousCompaction;
+  }
+
+  public void setEnableContinuousCompaction(boolean enableContinuousCompaction) {
+    this.enableContinuousCompaction = enableContinuousCompaction;
+  }
+
   public int getSeqFileNumInEachLevel() {
     return seqFileNumInEachLevel;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 988f865..4590e33 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -321,6 +321,12 @@ public class IoTDBDescriptor {
               properties.getProperty(
                   "enable_unseq_compaction", Boolean.toString(conf.isEnableUnseqCompaction()))));
 
+      conf.setEnableContinuousCompaction(
+          Boolean.parseBoolean(
+              properties.getProperty(
+                  "enable_continuous_compaction",
+                  Boolean.toString(conf.isEnableContinuousCompaction()))));
+
       conf.setSeqLevelNum(
           Integer.parseInt(
               properties.getProperty("seq_level_num", Integer.toString(conf.getSeqLevelNum()))));
@@ -1029,6 +1035,13 @@ public class IoTDBDescriptor {
       // update slow_query_threshold
       conf.setSlowQueryThreshold(Long.parseLong(properties.getProperty("slow_query_threshold")));
 
+      // update enable_continuous_compaction
+      conf.setEnableContinuousCompaction(
+          Boolean.parseBoolean(properties.getProperty("enable_continuous_compaction")));
+
+      // update merge_write_throughput_mb_per_sec
+      conf.setMergeWriteThroughputMbPerSec(
+          Integer.parseInt(properties.getProperty("merge_write_throughput_mb_per_sec")));
     } catch (Exception e) {
       throw new QueryProcessException(String.format("Fail to reload configuration because %s", e));
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 3b68e37..a5ea1da 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -72,6 +72,9 @@ public abstract class TsFileManagement {
 
   private long mergeStartTime;
 
+  /** whether execute merge chunk in this task */
+  protected boolean isMergeExecutedInCurrentTask = false;
+
   protected boolean isForceFullMerge = IoTDBDescriptor.getInstance().getConfig().isForceFullMerge();
 
   public TsFileManagement(String storageGroupName, String storageGroupDir) {
@@ -166,7 +169,7 @@ public abstract class TsFileManagement {
     @Override
     public Void call() {
       merge(timePartitionId);
-      closeCompactionMergeCallBack.call();
+      closeCompactionMergeCallBack.call(isMergeExecutedInCurrentTask, timePartitionId);
       return null;
     }
   }
@@ -182,7 +185,9 @@ public abstract class TsFileManagement {
     @Override
     public Void call() {
       recover();
-      closeCompactionMergeCallBack.call();
+      // in recover logic, we do not have to start next compaction task, and in this case the param
+      // time partition is useless, we can just pass 0L
+      closeCompactionMergeCallBack.call(false, 0L);
       return null;
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 61f928d..6bac52a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -564,25 +564,20 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
       forkTsFileList(
           forkedSequenceTsFileResources,
           sequenceTsFileResources.computeIfAbsent(timePartition, this::newSequenceTsFileResources),
-          seqLevelNum,
-          seqFileNumInEachLevel);
+          seqLevelNum);
       // we have to copy all unseq file
       forkTsFileList(
           forkedUnSequenceTsFileResources,
           unSequenceTsFileResources.computeIfAbsent(
               timePartition, this::newUnSequenceTsFileResources),
-          unseqLevelNum + 1,
-          unseqFileNumInEachLevel);
+          unseqLevelNum + 1);
     } finally {
       readUnLock();
     }
   }
 
   private void forkTsFileList(
-      List<List<TsFileResource>> forkedTsFileResources,
-      List rawTsFileResources,
-      int currMaxLevel,
-      int currFileNumInEachLevel) {
+      List<List<TsFileResource>> forkedTsFileResources, List rawTsFileResources, int currMaxLevel) {
     forkedTsFileResources.clear();
     for (int i = 0; i < currMaxLevel - 1; i++) {
       List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>();
@@ -591,9 +586,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
       for (TsFileResource tsFileResource : levelRawTsFileResources) {
         if (tsFileResource.isClosed()) {
           forkedLevelTsFileResources.add(tsFileResource);
-          if (forkedLevelTsFileResources.size() > currFileNumInEachLevel) {
-            break;
-          }
         }
       }
       forkedTsFileResources.add(forkedLevelTsFileResources);
@@ -602,25 +594,31 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
 
   @Override
   protected void merge(long timePartition) {
-    merge(forkedSequenceTsFileResources, true, timePartition, seqLevelNum, seqFileNumInEachLevel);
-    if (enableUnseqCompaction && unseqLevelNum <= 1 && forkedUnSequenceTsFileResources.size() > 0) {
+    isMergeExecutedInCurrentTask =
+        merge(
+            forkedSequenceTsFileResources, true, timePartition, seqLevelNum, seqFileNumInEachLevel);
+    if (enableUnseqCompaction
+        && unseqLevelNum <= 1
+        && forkedUnSequenceTsFileResources.get(0).size() > 0) {
+      isMergeExecutedInCurrentTask = true;
       merge(
           isForceFullMerge,
           getTsFileListByTimePartition(true, timePartition),
           forkedUnSequenceTsFileResources.get(0),
           Long.MAX_VALUE);
     } else {
-      merge(
-          forkedUnSequenceTsFileResources,
-          false,
-          timePartition,
-          unseqLevelNum,
-          unseqFileNumInEachLevel);
+      isMergeExecutedInCurrentTask =
+          merge(
+              forkedUnSequenceTsFileResources,
+              false,
+              timePartition,
+              unseqLevelNum,
+              unseqFileNumInEachLevel);
     }
   }
 
   @SuppressWarnings("squid:S3776")
-  private void merge(
+  private boolean merge(
       List<List<TsFileResource>> mergeResources,
       boolean sequence,
       long timePartition,
@@ -633,16 +631,21 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
       } catch (InterruptedException e) {
         logger.error("{} [Compaction] shutdown", storageGroupName, e);
         Thread.currentThread().interrupt();
-        return;
+        return false;
       }
     }
     isSeqMerging = true;
     long startTimeMillis = System.currentTimeMillis();
+    // whether execute merge chunk in the loop below
+    boolean isMergeExecutedInCurrentTask = false;
     CompactionLogger compactionLogger = null;
     try {
       logger.info("{} start to filter compaction condition", storageGroupName);
       for (int i = 0; i < currMaxLevel - 1; i++) {
-        if (currMaxFileNumInEachLevel <= mergeResources.get(i).size()) {
+        List<TsFileResource> currLevelTsFileResource = mergeResources.get(i);
+        if (currMaxFileNumInEachLevel <= currLevelTsFileResource.size()) {
+          // just merge part of the file
+          isMergeExecutedInCurrentTask = true;
           // level is numbered from 0
           if (enableUnseqCompaction && !sequence && i == currMaxLevel - 2) {
             // do not merge current unseq file level to upper level and just merge all of them to
@@ -663,7 +666,8 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
                 TsFileResource.modifyTsFileNameMergeCnt(mergeResources.get(i).get(0).getTsFile());
             compactionLogger.logSequence(sequence);
             compactionLogger.logFile(TARGET_NAME, newLevelFile);
-            List<TsFileResource> toMergeTsFiles = mergeResources.get(i);
+            List<TsFileResource> toMergeTsFiles =
+                mergeResources.get(i).subList(0, currMaxFileNumInEachLevel);
             logger.info(
                 "{} [Compaction] merge level-{}'s {} TsFiles to next level",
                 storageGroupName,
@@ -740,6 +744,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
           sequence,
           System.currentTimeMillis() - startTimeMillis);
     }
+    return isMergeExecutedInCurrentTask;
   }
 
   private List<SortedSet<TsFileResource>> newSequenceTsFileResources(Long k) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 3cea096..2e0ac5c 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -494,7 +494,8 @@ public class StorageGroupProcessor {
       throw new StorageGroupProcessorException(e);
     }
 
-    for (TsFileResource resource : tsFileManagement.getTsFileList(true)) {
+    List<TsFileResource> seqTsFileResources = tsFileManagement.getTsFileList(true);
+    for (TsFileResource resource : seqTsFileResources) {
       long timePartitionId = resource.getTimePartition();
       Map<String, Long> endTimeMap = new HashMap<>();
       for (String deviceId : resource.getDevices()) {
@@ -509,20 +510,30 @@ public class StorageGroupProcessor {
           .putAll(endTimeMap);
       globalLatestFlushedTimeForEachDevice.putAll(endTimeMap);
     }
+
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()
+        && seqTsFileResources.size() > 0) {
+      for (long timePartitionId : timePartitionIdVersionControllerMap.keySet()) {
+        executeCompaction(
+            timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+      }
+    }
   }
 
   private void recoverCompaction() {
     if (!CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
       compactionMergeWorking = true;
       logger.info(
-          "{} - {} submit a compaction merge task", logicalStorageGroupName, virtualStorageGroupId);
+          "{} - {} submit a compaction recover merge task",
+          logicalStorageGroupName,
+          virtualStorageGroupId);
       try {
         CompactionMergeTaskPoolManager.getInstance()
             .submitTask(
                 logicalStorageGroupName,
                 tsFileManagement.new CompactionRecoverTask(this::closeCompactionMergeCallBack));
       } catch (RejectedExecutionException e) {
-        this.closeCompactionMergeCallBack();
+        this.closeCompactionMergeCallBack(false, 0);
         logger.error(
             "{} - {} compaction submit task failed",
             logicalStorageGroupName,
@@ -1979,7 +1990,7 @@ public class StorageGroupProcessor {
                 tsFileManagement
                 .new CompactionMergeTask(this::closeCompactionMergeCallBack, timePartition));
       } catch (IOException | RejectedExecutionException e) {
-        this.closeCompactionMergeCallBack();
+        this.closeCompactionMergeCallBack(false, timePartition);
         logger.error(
             "{} compaction submit task failed",
             logicalStorageGroupName + "-" + virtualStorageGroupId,
@@ -1993,8 +2004,13 @@ public class StorageGroupProcessor {
   }
 
   /** close compaction merge callback, to release some locks */
-  private void closeCompactionMergeCallBack() {
-    this.compactionMergeWorking = false;
+  private void closeCompactionMergeCallBack(boolean isMerge, long timePartitionId) {
+    if (isMerge && IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
+      executeCompaction(
+          timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+    } else {
+      this.compactionMergeWorking = false;
+    }
   }
 
   /**
@@ -2961,7 +2977,7 @@ public class StorageGroupProcessor {
   @FunctionalInterface
   public interface CloseCompactionMergeCallBack {
 
-    void call();
+    void call(boolean isMergeExecutedInCurrentTask, long timePartitionId);
   }
 
   @FunctionalInterface
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
index 46c2771..d632cb9 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
@@ -120,7 +120,8 @@ public class LevelCompactionCacheTest extends LevelCompactionTest {
   }
 
   /** close compaction merge callback, to release some locks */
-  private void closeCompactionMergeCallBack() {
+  private void closeCompactionMergeCallBack(
+      boolean isMergeExecutedInCurrentTask, long timePartitionId) {
     this.compactionMergeWorking = false;
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java
index d6d9c99..2170fbd 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java
@@ -80,7 +80,8 @@ public class LevelCompactionLogTest extends LevelCompactionTest {
   }
 
   /** close compaction merge callback, to release some locks */
-  private void closeCompactionMergeCallBack() {
+  private void closeCompactionMergeCallBack(
+      boolean isMergeExecutedInCurrentTask, long timePartitionId) {
     this.compactionMergeWorking = false;
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
index 3d9aecd..9b3984b 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
@@ -226,7 +226,8 @@ public class LevelCompactionMergeTest extends LevelCompactionTest {
   }
 
   /** close compaction merge callback, to release some locks */
-  private void closeCompactionMergeCallBack() {
+  private void closeCompactionMergeCallBack(
+      boolean isMergeExecutedInCurrentTask, long timePartitionId) {
     this.compactionMergeWorking = false;
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMoreDataTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMoreDataTest.java
index 1411eac..75e576b 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMoreDataTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMoreDataTest.java
@@ -232,7 +232,8 @@ public class LevelCompactionMoreDataTest extends LevelCompactionTest {
   }
 
   /** close compaction merge callback, to release some locks */
-  private void closeCompactionMergeCallBack() {
+  private void closeCompactionMergeCallBack(
+      boolean isMergeExecutedInCurrentTask, long timePartitionId) {
     this.compactionMergeWorking = false;
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java
index 6407c9e..8ad9f18 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java
@@ -153,7 +153,9 @@ public class NoCompactionTsFileManagementTest extends LevelCompactionTest {
     noCompactionTsFileManagement.forkCurrentFileList(0);
     noCompactionTsFileManagement.recover();
     CompactionMergeTask compactionMergeTask =
-        noCompactionTsFileManagement.new CompactionMergeTask(() -> {}, 0);
+        noCompactionTsFileManagement
+        .new CompactionMergeTask(
+            (boolean isMergeExecutedInCurrentTask, long timePartitionId) -> {}, 0);
     compactionMergeTask.call();
     assertEquals(1, noCompactionTsFileManagement.size(true));
     assertEquals(1, noCompactionTsFileManagement.size(false));
diff --git a/server/src/test/java/org/apache/iotdb/db/script/EnvScriptIT.java b/server/src/test/java/org/apache/iotdb/db/script/EnvScriptIT.java
index ef8961a..4051f06 100644
--- a/server/src/test/java/org/apache/iotdb/db/script/EnvScriptIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/script/EnvScriptIT.java
@@ -22,7 +22,11 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;