You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/06/01 12:22:44 UTC

[iotdb] 01/01: clean merge code

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch clear_merge_code
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 20ce1dcb4860839a3509bde11e8e0467e2612301
Author: qiaojialin <64...@qq.com>
AuthorDate: Tue Jun 1 20:21:59 2021 +0800

    clean merge code
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  10 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   4 +-
 ...PoolManager.java => CompactionTaskManager.java} |  12 +-
 .../db/engine/compaction/TsFileManagement.java     | 109 +++---------
 .../level/LevelCompactionTsFileManagement.java     | 191 +++++++++------------
 .../iotdb/db/engine/merge/manage/MergeManager.java |   2 +-
 .../db/engine/merge/manage/MergeResource.java      |  48 ++++--
 .../iotdb/db/engine/merge/recover/LogAnalyzer.java |   7 +-
 .../engine/merge/selector/IMergeFileSelector.java  |   4 +
 .../iotdb/db/engine/merge/task/MergeTask.java      | 183 ++++++++++----------
 .../db/engine/merge/task/RecoverMergeTask.java     |   1 +
 .../engine/storagegroup/StorageGroupProcessor.java |  52 +++---
 .../java/org/apache/iotdb/db/service/IoTDB.java    |   4 +-
 .../apache/iotdb/db/engine/merge/MergeLogTest.java |   3 +-
 .../iotdb/db/engine/merge/MergeManagerTest.java    |  11 +-
 .../iotdb/db/engine/merge/MergeOverLapTest.java    |   3 +-
 .../iotdb/db/engine/merge/MergePerfTest.java       |   4 +-
 .../iotdb/db/engine/merge/MergeTaskTest.java       |  33 ++--
 .../storagegroup/StorageGroupProcessorTest.java    |   2 +-
 .../iotdb/db/integration/IoTDBRestartIT.java       |   4 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   4 +-
 21 files changed, 302 insertions(+), 389 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 6f9498b..61b8997 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -519,7 +519,7 @@ public class IoTDBConfig {
    * despite how much they are overflowed). This may increase merge overhead depending on how much
    * the SeqFiles are overflowed.
    */
-  private boolean forceFullMerge = true;
+  private boolean fullMerge = true;
 
   /** The limit of compaction merge can reach per second */
   private int mergeWriteThroughputMbPerSec = 8;
@@ -1404,12 +1404,12 @@ public class IoTDBConfig {
     this.enablePartialInsert = enablePartialInsert;
   }
 
-  public boolean isForceFullMerge() {
-    return forceFullMerge;
+  public boolean isFullMerge() {
+    return fullMerge;
   }
 
-  void setForceFullMerge(boolean forceFullMerge) {
-    this.forceFullMerge = forceFullMerge;
+  void setFullMerge(boolean fullMerge) {
+    this.fullMerge = fullMerge;
   }
 
   public int getCompactionThreadNum() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 645a534..6077a3d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -479,10 +479,10 @@ public class IoTDBDescriptor {
           Long.parseLong(
               properties.getProperty(
                   "merge_interval_sec", Long.toString(conf.getMergeIntervalSec()))));
-      conf.setForceFullMerge(
+      conf.setFullMerge(
           Boolean.parseBoolean(
               properties.getProperty(
-                  "force_full_merge", Boolean.toString(conf.isForceFullMerge()))));
+                  "force_full_merge", Boolean.toString(conf.isFullMerge()))));
       conf.setCompactionThreadNum(
           Integer.parseInt(
               properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
similarity index 93%
rename from server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
rename to server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index cdf6f4a..53dc9cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -46,17 +46,17 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPACTION_LOG_NAME;
 
-/** CompactionMergeTaskPoolManager provides a ThreadPool to queue and run all compaction tasks. */
-public class CompactionMergeTaskPoolManager implements IService {
+/** CompactionTaskManager provides a ThreadPool to queue and run all compaction tasks. */
+public class CompactionTaskManager implements IService {
 
   private static final Logger logger =
-      LoggerFactory.getLogger(CompactionMergeTaskPoolManager.class);
-  private static final CompactionMergeTaskPoolManager INSTANCE =
-      new CompactionMergeTaskPoolManager();
+      LoggerFactory.getLogger(CompactionTaskManager.class);
+  private static final CompactionTaskManager INSTANCE =
+      new CompactionTaskManager();
   private ExecutorService pool;
   private Map<String, Set<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
 
-  public static CompactionMergeTaskPoolManager getInstance() {
+  public static CompactionTaskManager getInstance() {
     return INSTANCE;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index aea6e8b..439c50d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.engine.compaction;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
-import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.engine.merge.manage.MergeResource;
 import org.apache.iotdb.db.engine.merge.selector.IMergeFileSelector;
 import org.apache.iotdb.db.engine.merge.selector.MaxFileMergeFileSelector;
@@ -33,7 +32,6 @@ import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.CloseCompactionMergeCallBack;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.MergeException;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 
 import org.slf4j.Logger;
@@ -62,20 +60,16 @@ public abstract class TsFileManagement {
   /** Serialize queries, delete resource files, compaction cleanup files */
   private final ReadWriteLock compactionMergeLock = new ReentrantReadWriteLock();
 
-  public volatile boolean isUnseqMerging = false;
-  public volatile boolean isSeqMerging = false;
   /**
    * This is the modification file of the result of the current merge. Because the merged file may
    * be invisible at this moment, without this, deletion/update during merge could be lost.
    */
   public ModificationFile mergingModification;
 
-  private long mergeStartTime;
-
   /** whether execute merge chunk in this task */
   protected boolean isMergeExecutedInCurrentTask = false;
 
-  protected boolean isForceFullMerge = IoTDBDescriptor.getInstance().getConfig().isForceFullMerge();
+  protected boolean isForceFullMerge = IoTDBDescriptor.getInstance().getConfig().isFullMerge();
   private final int maxOpenFileNumInEachUnseqCompaction =
       IoTDBDescriptor.getInstance().getConfig().getMaxOpenFileNumInEachUnseqCompaction();
 
@@ -84,7 +78,7 @@ public abstract class TsFileManagement {
     this.storageGroupDir = storageGroupDir;
   }
 
-  public void setForceFullMerge(boolean forceFullMerge) {
+  public void setFullMerge(boolean forceFullMerge) {
     isForceFullMerge = forceFullMerge;
   }
 
@@ -176,11 +170,11 @@ public abstract class TsFileManagement {
     }
   }
 
-  public class CompactionRecoverTask implements Callable<Void> {
+  public class LevelCompactionRecoverTask implements Callable<Void> {
 
     private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
 
-    public CompactionRecoverTask(CloseCompactionMergeCallBack closeCompactionMergeCallBack) {
+    public LevelCompactionRecoverTask(CloseCompactionMergeCallBack closeCompactionMergeCallBack) {
       this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
     }
 
@@ -194,107 +188,63 @@ public abstract class TsFileManagement {
     }
   }
 
-  public synchronized void merge(
+  protected void doUnseqMerge(
       boolean fullMerge,
       List<TsFileResource> seqMergeList,
-      List<TsFileResource> unSeqMergeList,
-      long dataTTL) {
-    if (isUnseqMerging) {
-      if (logger.isInfoEnabled()) {
-        logger.info(
-            "{} Last merge is ongoing, currently consumed time: {}ms",
-            storageGroupName,
-            (System.currentTimeMillis() - mergeStartTime));
-      }
-      return;
-    }
-    // wait until seq merge has finished
-    while (isSeqMerging) {
-      try {
-        wait(200);
-      } catch (InterruptedException e) {
-        logger.error("{} [Compaction] shutdown", storageGroupName, e);
-        Thread.currentThread().interrupt();
-        return;
-      }
-    }
-    isUnseqMerging = true;
-
-    if (seqMergeList.isEmpty()) {
-      logger.info("{} no seq files to be merged", storageGroupName);
-      isUnseqMerging = false;
-      return;
-    }
+      List<TsFileResource> unSeqMergeList) {
 
-    if (unSeqMergeList.isEmpty()) {
-      logger.info("{} no unseq files to be merged", storageGroupName);
-      isUnseqMerging = false;
+    if (seqMergeList.isEmpty() || unSeqMergeList.isEmpty()) {
+      logger.info("{} no files to be merged, seqFiles={}, unseqFiles={}",
+          storageGroupName, seqMergeList.size(), unSeqMergeList.size());
       return;
     }
 
+    // the number of unseq files in one merge should not exceed maxOpenFileNumInEachUnseqCompaction
     if (unSeqMergeList.size() > maxOpenFileNumInEachUnseqCompaction) {
-      logger.info(
-          "{} too much unseq files to be merged, reduce it to {}",
-          storageGroupName,
-          maxOpenFileNumInEachUnseqCompaction);
       unSeqMergeList = unSeqMergeList.subList(0, maxOpenFileNumInEachUnseqCompaction);
     }
 
-    long budget = IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget();
-    long timeLowerBound = System.currentTimeMillis() - dataTTL;
+    long timeLowerBound = System.currentTimeMillis() - IoTDBDescriptor.getInstance().getConfig().getDefaultTTL();
     MergeResource mergeResource = new MergeResource(seqMergeList, unSeqMergeList, timeLowerBound);
 
-    IMergeFileSelector fileSelector = getMergeFileSelector(budget, mergeResource);
+    IMergeFileSelector fileSelector = getMergeFileSelector(mergeResource);
     try {
       List[] mergeFiles = fileSelector.select();
       if (mergeFiles.length == 0) {
         logger.info(
-            "{} cannot select merge candidates under the budget {}", storageGroupName, budget);
-        isUnseqMerging = false;
+            "{} cannot select merge candidates under the budget {}",
+            storageGroupName, IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget());
         return;
       }
-      // avoid pending tasks holds the metadata and streams
-      mergeResource.clear();
-      String taskName = storageGroupName + "-" + System.currentTimeMillis();
-      // do not cache metadata until true candidates are chosen, or too much metadata will be
-      // cached during selection
-      mergeResource.setCacheDeviceMeta(true);
-
-      for (TsFileResource tsFileResource : mergeResource.getSeqFiles()) {
-        tsFileResource.setMerging(true);
-      }
-      for (TsFileResource tsFileResource : mergeResource.getUnseqFiles()) {
-        tsFileResource.setMerging(true);
-      }
 
-      mergeStartTime = System.currentTimeMillis();
+      mergeResource.startMerging();
+
       MergeTask mergeTask =
           new MergeTask(
               mergeResource,
               storageGroupDir,
               this::mergeEndAction,
-              taskName,
               fullMerge,
               fileSelector.getConcurrentMergeNum(),
               storageGroupName);
+
       mergingModification =
           new ModificationFile(storageGroupDir + File.separator + MERGING_MODIFICATION_FILE_NAME);
-      MergeManager.getINSTANCE().submitMainTask(mergeTask);
-      if (logger.isInfoEnabled()) {
-        logger.info(
-            "{} submits a merge task {}, merging {} seqFiles, {} unseqFiles",
-            storageGroupName,
-            taskName,
-            mergeFiles[0].size(),
-            mergeFiles[1].size());
-      }
 
-    } catch (MergeException | IOException e) {
+      logger.info(
+          "{} start merge {} seqFiles, {} unseqFiles", storageGroupName,
+          mergeFiles[0].size(),
+          mergeFiles[1].size());
+
+      mergeTask.doMerge();
+
+    } catch (Exception e) {
       logger.error("{} cannot select file for merge", storageGroupName, e);
     }
   }
 
-  private IMergeFileSelector getMergeFileSelector(long budget, MergeResource resource) {
+  private IMergeFileSelector getMergeFileSelector(MergeResource resource) {
+    long budget = IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget();
     MergeFileStrategy strategy = IoTDBDescriptor.getInstance().getConfig().getMergeFileStrategy();
     switch (strategy) {
       case MAX_FILE_NUM:
@@ -402,14 +352,12 @@ public abstract class TsFileManagement {
 
     if (Thread.currentThread().isInterrupted() || unseqFiles.isEmpty()) {
       // merge task abort, or merge runtime exception arose, just end this merge
-      isUnseqMerging = false;
       logger.info("{} a merge task abnormally ends", storageGroupName);
       return;
     }
     removeUnseqFiles(unseqFiles);
 
-    for (int i = 0; i < seqFiles.size(); i++) {
-      TsFileResource seqFile = seqFiles.get(i);
+    for (TsFileResource seqFile : seqFiles) {
       // get both seqFile lock and merge lock
       doubleWriteLock(seqFile);
 
@@ -430,7 +378,6 @@ public abstract class TsFileManagement {
 
     try {
       removeMergingModification();
-      isUnseqMerging = false;
       Files.delete(mergeLog.toPath());
     } catch (IOException e) {
       logger.error(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 98f1c2e..0b6730d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -64,7 +64,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
   private static final Logger logger =
       LoggerFactory.getLogger(LevelCompactionTsFileManagement.class);
 
-  private final int seqLevelNum =
+  private final int totalSeqLevelNum =
       Math.max(IoTDBDescriptor.getInstance().getConfig().getSeqLevelNum(), 1);
   private final int seqFileNumInEachLevel =
       Math.max(IoTDBDescriptor.getInstance().getConfig().getSeqFileNumInEachLevel(), 1);
@@ -267,7 +267,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
       long timePartitionId = tsFileResource.getTimePartition();
       int level = getMergeLevel(tsFileResource.getTsFile());
       if (sequence) {
-        if (level <= seqLevelNum - 1) {
+        if (level <= totalSeqLevelNum - 1) {
           // current file has normal level
           sequenceTsFileResources
               .computeIfAbsent(timePartitionId, this::newSequenceTsFileResources)
@@ -277,7 +277,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
           // current file has too high level
           sequenceTsFileResources
               .computeIfAbsent(timePartitionId, this::newSequenceTsFileResources)
-              .get(seqLevelNum - 1)
+              .get(totalSeqLevelNum - 1)
               .add(tsFileResource);
         }
       } else {
@@ -397,7 +397,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
       if (sequence) {
         for (List<SortedSet<TsFileResource>> partitionSequenceTsFileResource :
             sequenceTsFileResources.values()) {
-          for (int i = seqLevelNum - 1; i >= 0; i--) {
+          for (int i = totalSeqLevelNum - 1; i >= 0; i--) {
             result += partitionSequenceTsFileResource.get(i).size();
           }
         }
@@ -564,7 +564,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
       forkTsFileList(
           forkedSequenceTsFileResources,
           sequenceTsFileResources.computeIfAbsent(timePartition, this::newSequenceTsFileResources),
-          seqLevelNum);
+          totalSeqLevelNum);
       // we have to copy all unseq file
       forkTsFileList(
           forkedUnSequenceTsFileResources,
@@ -594,47 +594,28 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
 
   @Override
   protected void merge(long timePartition) {
-    isMergeExecutedInCurrentTask =
-        merge(
-            forkedSequenceTsFileResources, true, timePartition, seqLevelNum, seqFileNumInEachLevel);
-    if (enableUnseqCompaction
-        && unseqLevelNum <= 1
-        && forkedUnSequenceTsFileResources.get(0).size() > 0) {
-      isMergeExecutedInCurrentTask = true;
-      merge(
-          isForceFullMerge,
-          getTsFileListByTimePartition(true, timePartition),
-          forkedUnSequenceTsFileResources.get(0),
-          Long.MAX_VALUE);
-    } else {
-      isMergeExecutedInCurrentTask =
-          merge(
-              forkedUnSequenceTsFileResources,
-              false,
-              timePartition,
-              unseqLevelNum,
-              unseqFileNumInEachLevel);
+    // do unseq compaction if has an unseq file in max unseq level
+    if (enableUnseqCompaction && forkedUnSequenceTsFileResources.get(unseqLevelNum - 1).size() > 0) {
+      doUnseqMerge(isForceFullMerge, getTsFileListByTimePartition(true, timePartition),
+          forkedUnSequenceTsFileResources.get(unseqLevelNum - 1));
     }
+
+    doLevelCompaction(forkedSequenceTsFileResources, true, timePartition, totalSeqLevelNum,
+        seqFileNumInEachLevel);
+
+    doLevelCompaction(forkedUnSequenceTsFileResources, false, timePartition, unseqLevelNum,
+        unseqFileNumInEachLevel);
+
   }
 
-  @SuppressWarnings("squid:S3776")
-  private boolean merge(
-      List<List<TsFileResource>> mergeResources,
+  @SuppressWarnings("squid:S3776") //MERGE TODO: move to a LevelCompactionExecutor
+  private boolean doLevelCompaction(
+      List<List<TsFileResource>> mergeResources, // each level is a List<TsFileResource>
       boolean sequence,
       long timePartition,
       int currMaxLevel,
       int currMaxFileNumInEachLevel) {
     // wait until unseq merge has finished
-    while (isUnseqMerging) {
-      try {
-        Thread.sleep(200);
-      } catch (InterruptedException e) {
-        logger.error("{} [Compaction] shutdown", storageGroupName, e);
-        Thread.currentThread().interrupt();
-        return false;
-      }
-    }
-    isSeqMerging = true;
     long startTimeMillis = System.currentTimeMillis();
     // whether execute merge chunk in the loop below
     boolean isMergeExecutedInCurrentTask = false;
@@ -646,83 +627,72 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
         if (currMaxFileNumInEachLevel <= currLevelTsFileResource.size()) {
           // just merge part of the file
           isMergeExecutedInCurrentTask = true;
-          // level is numbered from 0
-          if (enableUnseqCompaction && !sequence && i == currMaxLevel - 2) {
-            // do not merge current unseq file level to upper level and just merge all of them to
-            // seq file
-            isSeqMerging = false;
-            merge(
-                isForceFullMerge,
-                getTsFileListByTimePartition(true, timePartition),
-                mergeResources.get(i),
-                Long.MAX_VALUE);
-          } else {
-            compactionLogger = new CompactionLogger(storageGroupDir, storageGroupName);
-            // log source file list and target file for recover
-            for (TsFileResource mergeResource : mergeResources.get(i)) {
-              compactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile());
-            }
-            File newLevelFile =
-                TsFileResource.modifyTsFileNameMergeCnt(mergeResources.get(i).get(0).getTsFile());
-            compactionLogger.logSequence(sequence);
-            compactionLogger.logFile(TARGET_NAME, newLevelFile);
-            List<TsFileResource> toMergeTsFiles =
-                mergeResources.get(i).subList(0, currMaxFileNumInEachLevel);
+          compactionLogger = new CompactionLogger(storageGroupDir, storageGroupName);
+          // log source file list and target file for recover
+          for (TsFileResource mergeResource : mergeResources.get(i)) {
+            compactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile());
+          }
+          File newLevelFile =
+              TsFileResource.modifyTsFileNameMergeCnt(mergeResources.get(i).get(0).getTsFile());
+          compactionLogger.logSequence(sequence);
+          compactionLogger.logFile(TARGET_NAME, newLevelFile);
+          List<TsFileResource> toMergeTsFiles =
+              mergeResources.get(i).subList(0, currMaxFileNumInEachLevel);
+          logger.info(
+              "{} [Compaction] merge level-{}'s {} TsFiles to next level",
+              storageGroupName,
+              i,
+              toMergeTsFiles.size());
+          for (TsFileResource toMergeTsFile : toMergeTsFiles) {
             logger.info(
-                "{} [Compaction] merge level-{}'s {} TsFiles to next level",
-                storageGroupName,
-                i,
-                toMergeTsFiles.size());
-            for (TsFileResource toMergeTsFile : toMergeTsFiles) {
-              logger.info(
-                  "{} [Compaction] start to merge TsFile {}", storageGroupName, toMergeTsFile);
-            }
+                "{} [Compaction] start to merge TsFile {}", storageGroupName, toMergeTsFile);
+          }
 
-            TsFileResource newResource = new TsFileResource(newLevelFile);
-            List<Modification> modifications = new ArrayList<>();
-            // merge, read from source files and write to target file
-            CompactionUtils.merge(
-                newResource,
-                toMergeTsFiles,
-                storageGroupName,
-                compactionLogger,
-                new HashSet<>(),
-                sequence,
-                modifications);
-            logger.info(
-                "{} [Compaction] merged level-{}'s {} TsFiles to next level, and start to delete old files",
-                storageGroupName,
-                i,
-                toMergeTsFiles.size());
-            writeLock();
-            try {
-              if (Thread.currentThread().isInterrupted()) {
-                throw new InterruptedException(
-                    String.format("%s [Compaction] abort", storageGroupName));
-              }
+          TsFileResource newResource = new TsFileResource(newLevelFile);
+          List<Modification> modifications = new ArrayList<>();
+          // merge, read from source files and write to target file
+          CompactionUtils.merge(
+              newResource,
+              toMergeTsFiles,
+              storageGroupName,
+              compactionLogger,
+              new HashSet<>(),
+              sequence,
+              modifications);
+          logger.info(
+              "{} [Compaction] merged level-{}'s {} TsFiles to next level, and start to delete old files",
+              storageGroupName,
+              i,
+              toMergeTsFiles.size());
+          writeLock();
+          try {
+            if (Thread.currentThread().isInterrupted()) {
+              throw new InterruptedException(
+                  String.format("%s [Compaction] abort", storageGroupName));
+            }
 
-              if (sequence) {
-                sequenceTsFileResources.get(timePartition).get(i + 1).add(newResource);
-              } else {
-                unSequenceTsFileResources.get(timePartition).get(i + 1).add(newResource);
-              }
-              deleteLevelFilesInList(timePartition, toMergeTsFiles, i, sequence);
-              if (mergeResources.size() > i + 1) {
-                mergeResources.get(i + 1).add(newResource);
-              }
-            } finally {
-              writeUnlock();
+            if (sequence) {
+              sequenceTsFileResources.get(timePartition).get(i + 1).add(newResource);
+            } else {
+              unSequenceTsFileResources.get(timePartition).get(i + 1).add(newResource);
             }
-            deleteLevelFilesInDisk(toMergeTsFiles);
-            renameLevelFilesMods(modifications, toMergeTsFiles, newResource);
-            compactionLogger.close();
-            File logFile =
-                FSFactoryProducer.getFSFactory()
-                    .getFile(storageGroupDir, storageGroupName + COMPACTION_LOG_NAME);
-            if (logFile.exists()) {
-              Files.delete(logFile.toPath());
+            deleteLevelFilesInList(timePartition, toMergeTsFiles, i, sequence);
+            if (mergeResources.size() > i + 1) {
+              mergeResources.get(i + 1).add(newResource);
             }
+          } finally {
+            writeUnlock();
+          }
+          deleteLevelFilesInDisk(toMergeTsFiles);
+          renameLevelFilesMods(modifications, toMergeTsFiles, newResource);
+          compactionLogger.close();
+          File logFile =
+              FSFactoryProducer.getFSFactory()
+                  .getFile(storageGroupDir, storageGroupName + COMPACTION_LOG_NAME);
+          if (logFile.exists()) {
+            Files.delete(logFile.toPath());
           }
+          break;
         }
       }
     } catch (Exception e) {
@@ -736,7 +706,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
       restoreCompaction();
       logger.error("Error occurred in Compaction Merge thread", e);
     } finally {
-      isSeqMerging = false;
       // reset the merge working state to false
       logger.info(
           "{} [Compaction] merge end time isSeq = {}, consumption: {} ms",
@@ -749,7 +718,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
 
   private List<SortedSet<TsFileResource>> newSequenceTsFileResources(Long k) {
     List<SortedSet<TsFileResource>> newSequenceTsFileResources = new ArrayList<>();
-    for (int i = 0; i < seqLevelNum; i++) {
+    for (int i = 0; i < totalSeqLevelNum; i++) {
       newSequenceTsFileResources.add(
           new TreeSet<>(
               (o1, o2) -> {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
index f9112e1..84c6859 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
@@ -253,7 +253,7 @@ public class MergeManager implements IService, MergeManagerMBean {
   private void mergeAll() {
     try {
       StorageEngine.getInstance()
-          .mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+          .mergeAll(IoTDBDescriptor.getInstance().getConfig().isFullMerge());
     } catch (StorageEngineException e) {
       logger.error("Cannot perform a global merge because", e);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
index d7bc827..050d83e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.query.reader.resource.CachedUnseqResourceMergeReader;
 import org.apache.iotdb.db.utils.MergeUtils;
+import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
@@ -91,26 +92,27 @@ public class MergeResource {
     this.unseqFiles = unseqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
   }
 
-  public void clear() throws IOException {
-    for (TsFileSequenceReader sequenceReader : fileReaderCache.values()) {
-      sequenceReader.close();
-    }
-    for (RestorableTsFileIOWriter writer : fileWriterCache.values()) {
-      writer.close();
-    }
-
-    fileReaderCache.clear();
-    fileWriterCache.clear();
-    modificationCache.clear();
-    measurementSchemaMap.clear();
-    chunkWriterCache.clear();
-  }
-
   public IMeasurementSchema getSchema(PartialPath path) {
     return measurementSchemaMap.get(path);
   }
 
   /**
+   * startMerging() is called after selecting files
+   *
+   * do not cache metadata until true candidates are chosen, or too much metadata will be
+   * cached during selection
+   */
+  public void startMerging() {
+    cacheDeviceMeta = true;
+    for (TsFileResource tsFileResource : seqFiles) {
+      tsFileResource.setMerging(true);
+    }
+    for (TsFileResource tsFileResource : unseqFiles) {
+      tsFileResource.setMerging(true);
+    }
+  }
+
+  /**
    * Construct a new or get an existing RestorableTsFileIOWriter of a merge temp file for a SeqFile.
    * The path of the merge temp file will be the seqFile's + ".merge".
    *
@@ -290,4 +292,20 @@ public class MergeResource {
   public Map<String, Pair<Long, Long>> getStartEndTime(TsFileResource tsFileResource) {
     return startEndTimeCache.getOrDefault(tsFileResource, new HashMap<>());
   }
+
+  @TestOnly
+  public void clear() throws IOException {
+    for (TsFileSequenceReader sequenceReader : fileReaderCache.values()) {
+      sequenceReader.close();
+    }
+    for (RestorableTsFileIOWriter writer : fileWriterCache.values()) {
+      writer.close();
+    }
+
+    fileReaderCache.clear();
+    fileWriterCache.clear();
+    modificationCache.clear();
+    measurementSchemaMap.clear();
+    chunkWriterCache.clear();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java
index 9693d3c..e6f84fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java
@@ -78,6 +78,7 @@ public class LogAnalyzer {
 
   private Status status;
 
+  // MERGE TODO: add two methods: List<String> getSeqTsFiles()  List<String> getUnseqTsFiles()
   public LogAnalyzer(
       MergeResource resource, String taskName, File logFile, String storageGroupName) {
     this.resource = resource;
@@ -102,10 +103,11 @@ public class LogAnalyzer {
 
         analyzeUnseqFiles(bufferedReader);
 
-        List<PartialPath> storageGroupPaths =
+        // get all timeseries in this storage group
+        List<PartialPath> timeseriesPaths =
             IoTDB.metaManager.getAllTimeseriesPath(new PartialPath(storageGroupName + ".*"));
         unmergedPaths = new ArrayList<>();
-        unmergedPaths.addAll(storageGroupPaths);
+        unmergedPaths.addAll(timeseriesPaths);
 
         analyzeMergedSeries(bufferedReader, unmergedPaths);
 
@@ -120,6 +122,7 @@ public class LogAnalyzer {
       return;
     }
     long startTime = System.currentTimeMillis();
+    //
     List<TsFileResource> mergeSeqFiles = new ArrayList<>();
     while ((currLine = bufferedReader.readLine()) != null) {
       if (STR_UNSEQ_FILES.equals(currLine)) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/IMergeFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/IMergeFileSelector.java
index 077a58b..63fc04c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/IMergeFileSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/IMergeFileSelector.java
@@ -29,6 +29,10 @@ import java.util.List;
  */
 public interface IMergeFileSelector {
 
+  /**
+   * @return seqFileList, unseqFileList
+   * @throws MergeException
+   */
   List[] select() throws MergeException;
 
   int getConcurrentMergeNum();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
index 381651c..89f4574 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
@@ -51,7 +51,7 @@ import java.util.concurrent.Callable;
  * merged chunks in the temp files back to the seqFiles or move the unmerged chunks in the seqFiles
  * into temp files and replace the seqFiles with the temp files. 3. remove unseqFiles
  */
-public class MergeTask implements Callable<Void> {
+public class MergeTask {
 
   public static final String MERGE_SUFFIX = ".merge";
   private static final Logger logger = LoggerFactory.getLogger(MergeTask.class);
@@ -67,7 +67,7 @@ public class MergeTask implements Callable<Void> {
   States states = States.START;
   MergeMultiChunkTask chunkTask;
   MergeFileTask fileTask;
-  private MergeCallback callback;
+  protected MergeCallback callback;
 
   MergeTask(
       List<TsFileResource> seqFiles,
@@ -90,29 +90,18 @@ public class MergeTask implements Callable<Void> {
       MergeResource mergeResource,
       String storageGroupSysDir,
       MergeCallback callback,
-      String taskName,
       boolean fullMerge,
       int concurrentMergeSeriesNum,
       String storageGroupName) {
     this.resource = mergeResource;
     this.storageGroupSysDir = storageGroupSysDir;
     this.callback = callback;
-    this.taskName = taskName;
+    this.taskName = storageGroupName;
     this.fullMerge = fullMerge;
     this.concurrentMergeSeriesNum = concurrentMergeSeriesNum;
     this.storageGroupName = storageGroupName;
   }
 
-  @Override
-  public Void call() throws Exception {
-    try {
-      doMerge();
-    } catch (Exception e) {
-      logger.error("Runtime exception in merge {}", taskName, e);
-      abort();
-    }
-    return null;
-  }
 
   private void abort() throws IOException {
     states = States.ABORTED;
@@ -125,101 +114,103 @@ public class MergeTask implements Callable<Void> {
         new File(storageGroupSysDir, MergeLogger.MERGE_LOG_NAME));
   }
 
-  private void doMerge() throws IOException, MetadataException {
-    if (resource.getSeqFiles().isEmpty()) {
-      logger.info("{} no sequence file to merge into, so will abort task.", taskName);
-      abort();
-      return;
-    }
-    if (logger.isInfoEnabled()) {
-      logger.info(
-          "{} starts to merge {} seqFiles, {} unseqFiles",
-          taskName,
-          resource.getSeqFiles().size(),
-          resource.getUnseqFiles().size());
-    }
-    long startTime = System.currentTimeMillis();
-    long totalFileSize =
-        MergeUtils.collectFileSizes(resource.getSeqFiles(), resource.getUnseqFiles());
-    mergeLogger = new MergeLogger(storageGroupSysDir);
+  public void doMerge() throws IOException {
+    try {
+      if (resource.getSeqFiles().isEmpty()) {
+        logger.info("{} no sequence file to merge into, so will abort task.", taskName);
+        abort();
+        return;
+      }
+      if (logger.isInfoEnabled()) {
+        logger.info(
+            "{} starts to merge {} seqFiles, {} unseqFiles",
+            taskName,
+            resource.getSeqFiles().size(),
+            resource.getUnseqFiles().size());
+      }
+      long startTime = System.currentTimeMillis();
+      long totalFileSize =
+          MergeUtils.collectFileSizes(resource.getSeqFiles(), resource.getUnseqFiles());
+      mergeLogger = new MergeLogger(storageGroupSysDir);
 
-    mergeLogger.logFiles(resource);
+      mergeLogger.logFiles(resource);
 
-    Set<PartialPath> devices = IoTDB.metaManager.getDevices(new PartialPath(storageGroupName));
-    Map<PartialPath, IMeasurementSchema> measurementSchemaMap = new HashMap<>();
-    List<PartialPath> unmergedSeries = new ArrayList<>();
-    for (PartialPath device : devices) {
-      MNode deviceNode = IoTDB.metaManager.getNodeByPath(device);
-      // todo add template merge logic
-      for (Entry<String, MNode> entry : deviceNode.getChildren().entrySet()) {
-        PartialPath path = device.concatNode(entry.getKey());
-        measurementSchemaMap.put(path, ((MeasurementMNode) entry.getValue()).getSchema());
-        unmergedSeries.add(path);
+      Set<PartialPath> devices = IoTDB.metaManager.getDevices(new PartialPath(storageGroupName));
+      Map<PartialPath, IMeasurementSchema> measurementSchemaMap = new HashMap<>();
+      List<PartialPath> unmergedSeries = new ArrayList<>();
+      for (PartialPath device : devices) {
+        MNode deviceNode = IoTDB.metaManager.getNodeByPath(device);
+        // todo add template merge logic
+        for (Entry<String, MNode> entry : deviceNode.getChildren().entrySet()) {
+          PartialPath path = device.concatNode(entry.getKey());
+          measurementSchemaMap.put(path, ((MeasurementMNode) entry.getValue()).getSchema());
+          unmergedSeries.add(path);
+        }
       }
-    }
-    resource.setMeasurementSchemaMap(measurementSchemaMap);
+      resource.setMeasurementSchemaMap(measurementSchemaMap);
 
-    mergeLogger.logMergeStart();
+      mergeLogger.logMergeStart();
 
-    chunkTask =
-        new MergeMultiChunkTask(
-            mergeContext,
-            taskName,
-            mergeLogger,
-            resource,
-            fullMerge,
-            unmergedSeries,
-            concurrentMergeSeriesNum,
-            storageGroupName);
-    states = States.MERGE_CHUNKS;
-    chunkTask.mergeSeries();
-    if (Thread.interrupted()) {
-      logger.info("Merge task {} aborted", taskName);
-      abort();
-      return;
-    }
+      chunkTask =
+          new MergeMultiChunkTask(
+              mergeContext,
+              taskName,
+              mergeLogger,
+              resource,
+              fullMerge,
+              unmergedSeries,
+              concurrentMergeSeriesNum,
+              storageGroupName);
+      states = States.MERGE_CHUNKS;
+      chunkTask.mergeSeries();
+      if (Thread.interrupted()) {
+        logger.info("Merge task {} aborted", taskName);
+        abort();
+        return;
+      }
 
-    fileTask =
-        new MergeFileTask(taskName, mergeContext, mergeLogger, resource, resource.getSeqFiles());
-    states = States.MERGE_FILES;
-    chunkTask = null;
-    fileTask.mergeFiles();
-    if (Thread.interrupted()) {
-      logger.info("Merge task {} aborted", taskName);
-      abort();
-      return;
-    }
+      fileTask =
+          new MergeFileTask(taskName, mergeContext, mergeLogger, resource, resource.getSeqFiles());
+      states = States.MERGE_FILES;
+      chunkTask = null;
+      fileTask.mergeFiles();
+      if (Thread.interrupted()) {
+        logger.info("Merge task {} aborted", taskName);
+        abort();
+        return;
+      }
 
-    states = States.CLEAN_UP;
-    fileTask = null;
-    cleanUp(true);
-    if (logger.isInfoEnabled()) {
-      double elapsedTime = (double) (System.currentTimeMillis() - startTime) / 1000.0;
-      double byteRate = totalFileSize / elapsedTime / 1024 / 1024;
-      double seriesRate = unmergedSeries.size() / elapsedTime;
-      double chunkRate = mergeContext.getTotalChunkWritten() / elapsedTime;
-      double fileRate =
-          (resource.getSeqFiles().size() + resource.getUnseqFiles().size()) / elapsedTime;
-      double ptRate = mergeContext.getTotalPointWritten() / elapsedTime;
-      logger.info(
-          "{} ends after {}s, byteRate: {}MB/s, seriesRate {}/s, chunkRate: {}/s, "
-              + "fileRate: {}/s, ptRate: {}/s",
-          taskName,
-          elapsedTime,
-          byteRate,
-          seriesRate,
-          chunkRate,
-          fileRate,
-          ptRate);
+      states = States.CLEAN_UP;
+      fileTask = null;
+      cleanUp(true);
+      if (logger.isInfoEnabled()) {
+        double elapsedTime = (double) (System.currentTimeMillis() - startTime) / 1000.0;
+        double byteRate = totalFileSize / elapsedTime / 1024 / 1024;
+        double seriesRate = unmergedSeries.size() / elapsedTime;
+        double chunkRate = mergeContext.getTotalChunkWritten() / elapsedTime;
+        double fileRate =
+            (resource.getSeqFiles().size() + resource.getUnseqFiles().size()) / elapsedTime;
+        double ptRate = mergeContext.getTotalPointWritten() / elapsedTime;
+        logger.info(
+            "{} ends after {}s, byteRate: {}MB/s, seriesRate {}/s, chunkRate: {}/s, "
+                + "fileRate: {}/s, ptRate: {}/s",
+            taskName,
+            elapsedTime,
+            byteRate,
+            seriesRate,
+            chunkRate,
+            fileRate,
+            ptRate);
+      }
+    } catch (Exception e) {
+      logger.error("Runtime exception in merge {}", taskName, e);
+      abort();
     }
   }
 
   void cleanUp(boolean executeCallback) throws IOException {
     logger.info("{} is cleaning up", taskName);
 
-    resource.clear();
-    mergeContext.clear();
-
     if (mergeLogger != null) {
       mergeLogger.close();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
index 32cd897..cb1e39e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
@@ -53,6 +53,7 @@ public class RecoverMergeTask extends MergeTask {
 
   private LogAnalyzer analyzer;
 
+  // MERGE TODO: get seqFiles and unseqFiles to be recovered from merge.log)
   public RecoverMergeTask(
       List<TsFileResource> seqFiles,
       List<TsFileResource> unseqFiles,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 26261df..75b2175 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.TsFileManagement;
 import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
@@ -174,8 +174,8 @@ public class StorageGroupProcessor {
   private final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors = new TreeMap<>();
   /** time partition id in the storage group -> tsFileProcessor for this time partition */
   private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors = new TreeMap<>();
-  /** compactionMergeWorking is used to wait for last compaction to be done. */
-  private volatile boolean compactionMergeWorking = false;
+  /** isCompactionWorking is used to wait for last compaction to be done. */
+  private volatile boolean isCompactionWorking = false;
   // upgrading sequence TsFile resource list
   private List<TsFileResource> upgradeSeqFileList = new LinkedList<>();
 
@@ -446,33 +446,37 @@ public class StorageGroupProcessor {
         recoverTsFiles(value, false);
       }
 
-      String taskName =
+      String unseqMergeTaskName =
           logicalStorageGroupName + "-" + virtualStorageGroupId + "-" + System.currentTimeMillis();
       File mergingMods =
           SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, MERGING_MODIFICATION_FILE_NAME);
       if (mergingMods.exists()) {
         this.tsFileManagement.mergingModification = new ModificationFile(mergingMods.getPath());
       }
+
+      // MERGE TODO: only pass the tsfileManagement into RecoverMergeTask
       RecoverMergeTask recoverMergeTask =
           new RecoverMergeTask(
               new ArrayList<>(tsFileManagement.getTsFileList(true)),
               tsFileManagement.getTsFileList(false),
               storageGroupSysDir.getPath(),
               tsFileManagement::mergeEndAction,
-              taskName,
-              IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(),
+              unseqMergeTaskName,
+              IoTDBDescriptor.getInstance().getConfig().isFullMerge(),
               logicalStorageGroupName + "-" + virtualStorageGroupId);
       logger.info(
           "{} - {} a RecoverMergeTask {} starts...",
           logicalStorageGroupName,
           virtualStorageGroupId,
-          taskName);
+          unseqMergeTaskName);
       recoverMergeTask.recoverMerge(
           IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
       if (!IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()) {
         mergingMods.delete();
       }
-      recoverCompaction();
+
+      // MERGE TODO: move unseqMergeRecover into TsFileManagement.CompactionRecoverTask()
+      recoverLevelCompaction();
       for (TsFileResource resource : tsFileManagement.getTsFileList(true)) {
         long partitionNum = resource.getTimePartition();
         updatePartitionFileVersion(partitionNum, resource.getVersion());
@@ -511,27 +515,25 @@ public class StorageGroupProcessor {
       globalLatestFlushedTimeForEachDevice.putAll(endTimeMap);
     }
 
-    if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()
-        && seqTsFileResources.size() > 0) {
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
       for (long timePartitionId : timePartitionIdVersionControllerMap.keySet()) {
-        executeCompaction(
-            timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+        executeCompaction(timePartitionId, IoTDBDescriptor.getInstance().getConfig().isFullMerge());
       }
     }
   }
 
-  private void recoverCompaction() {
-    if (!CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
-      compactionMergeWorking = true;
+  private void recoverLevelCompaction() {
+    if (!CompactionTaskManager.getInstance().isTerminated()) {
+      isCompactionWorking = true;
       logger.info(
           "{} - {} submit a compaction recover merge task",
           logicalStorageGroupName,
           virtualStorageGroupId);
       try {
-        CompactionMergeTaskPoolManager.getInstance()
+        CompactionTaskManager.getInstance()
             .submitTask(
                 logicalStorageGroupName,
-                tsFileManagement.new CompactionRecoverTask(this::closeCompactionMergeCallBack));
+                tsFileManagement.new LevelCompactionRecoverTask(this::closeCompactionMergeCallBack));
       } catch (RejectedExecutionException e) {
         this.closeCompactionMergeCallBack(false, 0);
         logger.error(
@@ -1971,20 +1973,20 @@ public class StorageGroupProcessor {
 
     executeCompaction(
         tsFileProcessor.getTimeRangeId(),
-        IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+        IoTDBDescriptor.getInstance().getConfig().isFullMerge());
   }
 
   private void executeCompaction(long timePartition, boolean fullMerge) {
-    if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
-      compactionMergeWorking = true;
+    if (!isCompactionWorking && !CompactionTaskManager.getInstance().isTerminated()) {
+      isCompactionWorking = true;
       logger.info(
           "{} submit a compaction merge task",
           logicalStorageGroupName + "-" + virtualStorageGroupId);
       try {
         // fork and filter current tsfile, then commit then to compaction merge
         tsFileManagement.forkCurrentFileList(timePartition);
-        tsFileManagement.setForceFullMerge(fullMerge);
-        CompactionMergeTaskPoolManager.getInstance()
+        tsFileManagement.setFullMerge(fullMerge);
+        CompactionTaskManager.getInstance()
             .submitTask(
                 logicalStorageGroupName,
                 tsFileManagement
@@ -2007,9 +2009,9 @@ public class StorageGroupProcessor {
   private void closeCompactionMergeCallBack(boolean isMerge, long timePartitionId) {
     if (isMerge && IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
       executeCompaction(
-          timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+          timePartitionId, IoTDBDescriptor.getInstance().getConfig().isFullMerge());
     } else {
-      this.compactionMergeWorking = false;
+      this.isCompactionWorking = false;
     }
   }
 
@@ -2842,7 +2844,7 @@ public class StorageGroupProcessor {
     writeLock();
     try {
       // abort ongoing comapctions and merges
-      CompactionMergeTaskPoolManager.getInstance().abortCompaction(logicalStorageGroupName);
+      CompactionTaskManager.getInstance().abortCompaction(logicalStorageGroupName);
       MergeManager.getINSTANCE().abortMerge(logicalStorageGroupName);
       // close all working files that should be removed
       removePartitions(filter, workSequenceTsFileProcessors.entrySet());
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 51a64ac..e4458ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.cost.statistic.Measurement;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor;
-import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.flush.FlushManager;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
@@ -108,7 +108,7 @@ public class IoTDB implements IoTDBMBean {
     registerManager.register(TVListAllocator.getInstance());
     registerManager.register(CacheHitRatioMonitor.getInstance());
     registerManager.register(MergeManager.getINSTANCE());
-    registerManager.register(CompactionMergeTaskPoolManager.getInstance());
+    registerManager.register(CompactionTaskManager.getInstance());
     JMXService.registerMBean(getInstance(), mbeanName);
     registerManager.register(StorageEngine.getInstance());
     registerManager.register(TemporaryQueryDataFileService.getInstance());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
index cf7441f..1c0dfeb 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
@@ -69,11 +69,10 @@ public class MergeLogTest extends MergeTest {
             new MergeResource(seqResources.subList(0, 1), unseqResources.subList(0, 1)),
             tempSGDir.getPath(),
             this::testCallBack,
-            "test",
             false,
             1,
             MERGE_TEST_SG);
-    mergeTask.call();
+    mergeTask.doMerge();
   }
 
   private void testCallBack(
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java
index 9842f53..2232521 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java
@@ -99,20 +99,11 @@ public class MergeManagerTest extends MergeTest {
     private String progress = "0";
 
     public FakedMainMergeTask(int serialNum) {
-      super(null, null, null, null, false, 0, null);
+      super(null, null, null, false, 0, null);
       this.serialNum = serialNum;
     }
 
     @Override
-    public Void call() {
-      while (!Thread.currentThread().isInterrupted()) {
-        // wait until interrupt
-      }
-      progress = "1";
-      return null;
-    }
-
-    @Override
     public String getStorageGroupName() {
       return "test";
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
index 6c70fe7..0684f46 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
@@ -185,11 +185,10 @@ public class MergeOverLapTest extends MergeTest {
             new MergeResource(seqResources, unseqResources),
             tempSGDir.getPath(),
             (k, v, l) -> {},
-            "test",
             true,
             1,
             MERGE_TEST_SG);
-    mergeTask.call();
+    mergeTask.doMerge();
 
     QueryContext context = new QueryContext();
     PartialPath path =
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java
index ca816cf..84d4c5f 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java
@@ -48,8 +48,8 @@ public class MergePerfTest extends MergeTest {
     resource.setCacheDeviceMeta(true);
     MergeTask mergeTask =
         new MergeTask(
-            resource, tempSGDir.getPath(), (k, v, l) -> {}, "test", fullMerge, 100, MERGE_TEST_SG);
-    mergeTask.call();
+            resource, tempSGDir.getPath(), (k, v, l) -> {}, fullMerge, 100, MERGE_TEST_SG);
+    mergeTask.doMerge();
     timeConsumption = System.currentTimeMillis() - timeConsumption;
     tearDown();
     FileUtils.deleteDirectory(tempSGDir);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
index d4f5dfe..5f7c036 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
@@ -79,11 +79,10 @@ public class MergeTaskTest extends MergeTest {
             new MergeResource(seqResources, unseqResources),
             tempSGDir.getPath(),
             (k, v, l) -> {},
-            "test",
             false,
             1,
             MERGE_TEST_SG);
-    mergeTask.call();
+    mergeTask.doMerge();
 
     QueryContext context = new QueryContext();
     PartialPath path =
@@ -123,11 +122,10 @@ public class MergeTaskTest extends MergeTest {
             (k, v, l) -> {
               assertEquals(499, k.get(2).getEndTime("root.mergeTest.device1"));
             },
-            "test",
             false,
             1,
             MERGE_TEST_SG);
-    mergeTask.call();
+    mergeTask.doMerge();
   }
 
   @Test
@@ -177,11 +175,10 @@ public class MergeTaskTest extends MergeTest {
             (k, v, l) -> {
               assertEquals(49, k.get(0).getEndTime("root.mergeTest.device1"));
             },
-            "test",
             false,
             1,
             MERGE_TEST_SG);
-    mergeTask.call();
+    mergeTask.doMerge();
   }
 
   @Test
@@ -191,11 +188,10 @@ public class MergeTaskTest extends MergeTest {
             new MergeResource(seqResources, unseqResources),
             tempSGDir.getPath(),
             (k, v, l) -> {},
-            "test",
             true,
             1,
             MERGE_TEST_SG);
-    mergeTask.call();
+    mergeTask.doMerge();
 
     QueryContext context = new QueryContext();
     PartialPath path =
@@ -235,11 +231,10 @@ public class MergeTaskTest extends MergeTest {
             new MergeResource(seqResources, unseqResources),
             tempSGDir.getPath(),
             (k, v, l) -> {},
-            "test",
             false,
             1,
             MERGE_TEST_SG);
-    mergeTask.call();
+    mergeTask.doMerge();
 
     QueryContext context = new QueryContext();
     PartialPath path =
@@ -275,11 +270,10 @@ public class MergeTaskTest extends MergeTest {
             new MergeResource(seqResources, unseqResources.subList(0, 1)),
             tempSGDir.getPath(),
             (k, v, l) -> {},
-            "test",
             false,
             1,
             MERGE_TEST_SG);
-    mergeTask.call();
+    mergeTask.doMerge();
 
     QueryContext context = new QueryContext();
     PartialPath path =
@@ -319,11 +313,10 @@ public class MergeTaskTest extends MergeTest {
             new MergeResource(seqResources, unseqResources.subList(5, 6)),
             tempSGDir.getPath(),
             (k, v, l) -> {},
-            "test",
             false,
             1,
             MERGE_TEST_SG);
-    mergeTask.call();
+    mergeTask.doMerge();
 
     QueryContext context = new QueryContext();
     PartialPath path =
@@ -359,11 +352,10 @@ public class MergeTaskTest extends MergeTest {
             new MergeResource(seqResources, unseqResources.subList(0, 5)),
             tempSGDir.getPath(),
             (k, v, l) -> {},
-            "test",
             false,
             1,
             MERGE_TEST_SG);
-    mergeTask.call();
+    mergeTask.doMerge();
 
     QueryContext context = new QueryContext();
     PartialPath path =
@@ -424,11 +416,10 @@ public class MergeTaskTest extends MergeTest {
                 e.printStackTrace();
               }
             },
-            "test",
             false,
             1,
             MERGE_TEST_SG);
-    mergeTask.call();
+    mergeTask.doMerge();
 
     QueryContext context = new QueryContext();
     PartialPath path =
@@ -474,11 +465,10 @@ public class MergeTaskTest extends MergeTest {
             new MergeResource(testSeqResources, testUnseqResource),
             tempSGDir.getPath(),
             (k, v, l) -> {},
-            "test",
             false,
             1,
             MERGE_TEST_SG);
-    mergeTask.call();
+    mergeTask.doMerge();
 
     QueryContext context = new QueryContext();
     PartialPath path =
@@ -540,11 +530,10 @@ public class MergeTaskTest extends MergeTest {
             (k, v, l) -> {
               assertEquals(99, k.get(0).getEndTime("root.mergeTest.device1"));
             },
-            "test",
             false,
             1,
             MERGE_TEST_SG);
-    mergeTask.call();
+    mergeTask.doMerge();
   }
 
   private void prepareFileWithLastSensor(
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 3d7e2b6..8ad49c7 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -610,7 +610,7 @@ public class StorageGroupProcessorTest {
     }
 
     processor.syncCloseAllWorkingTsFileProcessors();
-    processor.merge(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+    processor.merge(IoTDBDescriptor.getInstance().getConfig().isFullMerge());
     while (processor.getTsFileManagement().isUnseqMerging) {
       // wait
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
index d01b9d9..289ce56 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.integration;
 
 import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.jdbc.Config;
@@ -379,7 +379,7 @@ public class IoTDBRestartIT {
     }
 
     try {
-      CompactionMergeTaskPoolManager.getInstance().waitAllCompactionFinish();
+      CompactionTaskManager.getInstance().waitAllCompactionFinish();
       Thread.sleep(10000);
       EnvironmentUtils.restartDaemon();
     } catch (Exception e) {
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index fc29451..9892780 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
-import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.TriggerManagementException;
@@ -86,7 +86,7 @@ public class EnvironmentUtils {
 
   public static void cleanEnv() throws IOException, StorageEngineException {
     // wait all compaction finished
-    CompactionMergeTaskPoolManager.getInstance().waitAllCompactionFinish();
+    CompactionTaskManager.getInstance().waitAllCompactionFinish();
 
     // deregister all user defined classes
     try {