You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/06/01 12:22:43 UTC

[iotdb] branch clear_merge_code created (now 20ce1dc)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a change to branch clear_merge_code
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 20ce1dc  clean merge code

This branch includes the following new commits:

     new 20ce1dc  clean merge code

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: clean merge code

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch clear_merge_code
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 20ce1dcb4860839a3509bde11e8e0467e2612301
Author: qiaojialin <64...@qq.com>
AuthorDate: Tue Jun 1 20:21:59 2021 +0800

    clean merge code
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  10 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   4 +-
 ...PoolManager.java => CompactionTaskManager.java} |  12 +-
 .../db/engine/compaction/TsFileManagement.java     | 109 +++---------
 .../level/LevelCompactionTsFileManagement.java     | 191 +++++++++------------
 .../iotdb/db/engine/merge/manage/MergeManager.java |   2 +-
 .../db/engine/merge/manage/MergeResource.java      |  48 ++++--
 .../iotdb/db/engine/merge/recover/LogAnalyzer.java |   7 +-
 .../engine/merge/selector/IMergeFileSelector.java  |   4 +
 .../iotdb/db/engine/merge/task/MergeTask.java      | 183 ++++++++++----------
 .../db/engine/merge/task/RecoverMergeTask.java     |   1 +
 .../engine/storagegroup/StorageGroupProcessor.java |  52 +++---
 .../java/org/apache/iotdb/db/service/IoTDB.java    |   4 +-
 .../apache/iotdb/db/engine/merge/MergeLogTest.java |   3 +-
 .../iotdb/db/engine/merge/MergeManagerTest.java    |  11 +-
 .../iotdb/db/engine/merge/MergeOverLapTest.java    |   3 +-
 .../iotdb/db/engine/merge/MergePerfTest.java       |   4 +-
 .../iotdb/db/engine/merge/MergeTaskTest.java       |  33 ++--
 .../storagegroup/StorageGroupProcessorTest.java    |   2 +-
 .../iotdb/db/integration/IoTDBRestartIT.java       |   4 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   4 +-
 21 files changed, 302 insertions(+), 389 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 6f9498b..61b8997 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -519,7 +519,7 @@ public class IoTDBConfig {
    * despite how much they are overflowed). This may increase merge overhead depending on how much
    * the SeqFiles are overflowed.
    */
-  private boolean forceFullMerge = true;
+  private boolean fullMerge = true;
 
   /** The limit of compaction merge can reach per second */
   private int mergeWriteThroughputMbPerSec = 8;
@@ -1404,12 +1404,12 @@ public class IoTDBConfig {
     this.enablePartialInsert = enablePartialInsert;
   }
 
-  public boolean isForceFullMerge() {
-    return forceFullMerge;
+  public boolean isFullMerge() {
+    return fullMerge;
   }
 
-  void setForceFullMerge(boolean forceFullMerge) {
-    this.forceFullMerge = forceFullMerge;
+  void setFullMerge(boolean fullMerge) {
+    this.fullMerge = fullMerge;
   }
 
   public int getCompactionThreadNum() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 645a534..6077a3d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -479,10 +479,10 @@ public class IoTDBDescriptor {
           Long.parseLong(
               properties.getProperty(
                   "merge_interval_sec", Long.toString(conf.getMergeIntervalSec()))));
-      conf.setForceFullMerge(
+      conf.setFullMerge(
           Boolean.parseBoolean(
               properties.getProperty(
-                  "force_full_merge", Boolean.toString(conf.isForceFullMerge()))));
+                  "force_full_merge", Boolean.toString(conf.isFullMerge()))));
       conf.setCompactionThreadNum(
           Integer.parseInt(
               properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
similarity index 93%
rename from server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
rename to server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index cdf6f4a..53dc9cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -46,17 +46,17 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPACTION_LOG_NAME;
 
-/** CompactionMergeTaskPoolManager provides a ThreadPool to queue and run all compaction tasks. */
-public class CompactionMergeTaskPoolManager implements IService {
+/** CompactionTaskManager provides a ThreadPool to queue and run all compaction tasks. */
+public class CompactionTaskManager implements IService {
 
   private static final Logger logger =
-      LoggerFactory.getLogger(CompactionMergeTaskPoolManager.class);
-  private static final CompactionMergeTaskPoolManager INSTANCE =
-      new CompactionMergeTaskPoolManager();
+      LoggerFactory.getLogger(CompactionTaskManager.class);
+  private static final CompactionTaskManager INSTANCE =
+      new CompactionTaskManager();
   private ExecutorService pool;
   private Map<String, Set<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
 
-  public static CompactionMergeTaskPoolManager getInstance() {
+  public static CompactionTaskManager getInstance() {
     return INSTANCE;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index aea6e8b..439c50d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.engine.compaction;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
-import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.engine.merge.manage.MergeResource;
 import org.apache.iotdb.db.engine.merge.selector.IMergeFileSelector;
 import org.apache.iotdb.db.engine.merge.selector.MaxFileMergeFileSelector;
@@ -33,7 +32,6 @@ import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.CloseCompactionMergeCallBack;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.MergeException;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 
 import org.slf4j.Logger;
@@ -62,20 +60,16 @@ public abstract class TsFileManagement {
   /** Serialize queries, delete resource files, compaction cleanup files */
   private final ReadWriteLock compactionMergeLock = new ReentrantReadWriteLock();
 
-  public volatile boolean isUnseqMerging = false;
-  public volatile boolean isSeqMerging = false;
   /**
    * This is the modification file of the result of the current merge. Because the merged file may
    * be invisible at this moment, without this, deletion/update during merge could be lost.
    */
   public ModificationFile mergingModification;
 
-  private long mergeStartTime;
-
   /** whether execute merge chunk in this task */
   protected boolean isMergeExecutedInCurrentTask = false;
 
-  protected boolean isForceFullMerge = IoTDBDescriptor.getInstance().getConfig().isForceFullMerge();
+  protected boolean isForceFullMerge = IoTDBDescriptor.getInstance().getConfig().isFullMerge();
   private final int maxOpenFileNumInEachUnseqCompaction =
       IoTDBDescriptor.getInstance().getConfig().getMaxOpenFileNumInEachUnseqCompaction();
 
@@ -84,7 +78,7 @@ public abstract class TsFileManagement {
     this.storageGroupDir = storageGroupDir;
   }
 
-  public void setForceFullMerge(boolean forceFullMerge) {
+  public void setFullMerge(boolean forceFullMerge) {
     isForceFullMerge = forceFullMerge;
   }
 
@@ -176,11 +170,11 @@ public abstract class TsFileManagement {
     }
   }
 
-  public class CompactionRecoverTask implements Callable<Void> {
+  public class LevelCompactionRecoverTask implements Callable<Void> {
 
     private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
 
-    public CompactionRecoverTask(CloseCompactionMergeCallBack closeCompactionMergeCallBack) {
+    public LevelCompactionRecoverTask(CloseCompactionMergeCallBack closeCompactionMergeCallBack) {
       this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
     }
 
@@ -194,107 +188,63 @@ public abstract class TsFileManagement {
     }
   }
 
-  public synchronized void merge(
+  protected void doUnseqMerge(
       boolean fullMerge,
       List<TsFileResource> seqMergeList,
-      List<TsFileResource> unSeqMergeList,
-      long dataTTL) {
-    if (isUnseqMerging) {
-      if (logger.isInfoEnabled()) {
-        logger.info(
-            "{} Last merge is ongoing, currently consumed time: {}ms",
-            storageGroupName,
-            (System.currentTimeMillis() - mergeStartTime));
-      }
-      return;
-    }
-    // wait until seq merge has finished
-    while (isSeqMerging) {
-      try {
-        wait(200);
-      } catch (InterruptedException e) {
-        logger.error("{} [Compaction] shutdown", storageGroupName, e);
-        Thread.currentThread().interrupt();
-        return;
-      }
-    }
-    isUnseqMerging = true;
-
-    if (seqMergeList.isEmpty()) {
-      logger.info("{} no seq files to be merged", storageGroupName);
-      isUnseqMerging = false;
-      return;
-    }
+      List<TsFileResource> unSeqMergeList) {
 
-    if (unSeqMergeList.isEmpty()) {
-      logger.info("{} no unseq files to be merged", storageGroupName);
-      isUnseqMerging = false;
+    if (seqMergeList.isEmpty() || unSeqMergeList.isEmpty()) {
+      logger.info("{} no files to be merged, seqFiles={}, unseqFiles={}",
+          storageGroupName, seqMergeList.size(), unSeqMergeList.size());
       return;
     }
 
+    // the number of unseq files in one merge should not exceed maxOpenFileNumInEachUnseqCompaction
     if (unSeqMergeList.size() > maxOpenFileNumInEachUnseqCompaction) {
-      logger.info(
-          "{} too much unseq files to be merged, reduce it to {}",
-          storageGroupName,
-          maxOpenFileNumInEachUnseqCompaction);
       unSeqMergeList = unSeqMergeList.subList(0, maxOpenFileNumInEachUnseqCompaction);
     }
 
-    long budget = IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget();
-    long timeLowerBound = System.currentTimeMillis() - dataTTL;
+    long timeLowerBound = System.currentTimeMillis() - IoTDBDescriptor.getInstance().getConfig().getDefaultTTL();
     MergeResource mergeResource = new MergeResource(seqMergeList, unSeqMergeList, timeLowerBound);
 
-    IMergeFileSelector fileSelector = getMergeFileSelector(budget, mergeResource);
+    IMergeFileSelector fileSelector = getMergeFileSelector(mergeResource);
     try {
       List[] mergeFiles = fileSelector.select();
       if (mergeFiles.length == 0) {
         logger.info(
-            "{} cannot select merge candidates under the budget {}", storageGroupName, budget);
-        isUnseqMerging = false;
+            "{} cannot select merge candidates under the budget {}",
+            storageGroupName, IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget());
         return;
       }
-      // avoid pending tasks holds the metadata and streams
-      mergeResource.clear();
-      String taskName = storageGroupName + "-" + System.currentTimeMillis();
-      // do not cache metadata until true candidates are chosen, or too much metadata will be
-      // cached during selection
-      mergeResource.setCacheDeviceMeta(true);
-
-      for (TsFileResource tsFileResource : mergeResource.getSeqFiles()) {
-        tsFileResource.setMerging(true);
-      }
-      for (TsFileResource tsFileResource : mergeResource.getUnseqFiles()) {
-        tsFileResource.setMerging(true);
-      }
 
-      mergeStartTime = System.currentTimeMillis();
+      mergeResource.startMerging();
+
       MergeTask mergeTask =
           new MergeTask(
               mergeResource,
               storageGroupDir,
               this::mergeEndAction,
-              taskName,
               fullMerge,
               fileSelector.getConcurrentMergeNum(),
               storageGroupName);
+
       mergingModification =
           new ModificationFile(storageGroupDir + File.separator + MERGING_MODIFICATION_FILE_NAME);
-      MergeManager.getINSTANCE().submitMainTask(mergeTask);
-      if (logger.isInfoEnabled()) {
-        logger.info(
-            "{} submits a merge task {}, merging {} seqFiles, {} unseqFiles",
-            storageGroupName,
-            taskName,
-            mergeFiles[0].size(),
-            mergeFiles[1].size());
-      }
 
-    } catch (MergeException | IOException e) {
+      logger.info(
+          "{} start merge {} seqFiles, {} unseqFiles", storageGroupName,
+          mergeFiles[0].size(),
+          mergeFiles[1].size());
+
+      mergeTask.doMerge();
+
+    } catch (Exception e) {
       logger.error("{} cannot select file for merge", storageGroupName, e);
     }
   }
 
-  private IMergeFileSelector getMergeFileSelector(long budget, MergeResource resource) {
+  private IMergeFileSelector getMergeFileSelector(MergeResource resource) {
+    long budget = IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget();
     MergeFileStrategy strategy = IoTDBDescriptor.getInstance().getConfig().getMergeFileStrategy();
     switch (strategy) {
       case MAX_FILE_NUM:
@@ -402,14 +352,12 @@ public abstract class TsFileManagement {
 
     if (Thread.currentThread().isInterrupted() || unseqFiles.isEmpty()) {
       // merge task abort, or merge runtime exception arose, just end this merge
-      isUnseqMerging = false;
       logger.info("{} a merge task abnormally ends", storageGroupName);
       return;
     }
     removeUnseqFiles(unseqFiles);
 
-    for (int i = 0; i < seqFiles.size(); i++) {
-      TsFileResource seqFile = seqFiles.get(i);
+    for (TsFileResource seqFile : seqFiles) {
       // get both seqFile lock and merge lock
       doubleWriteLock(seqFile);
 
@@ -430,7 +378,6 @@ public abstract class TsFileManagement {
 
     try {
       removeMergingModification();
-      isUnseqMerging = false;
       Files.delete(mergeLog.toPath());
     } catch (IOException e) {
       logger.error(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 98f1c2e..0b6730d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -64,7 +64,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
   private static final Logger logger =
       LoggerFactory.getLogger(LevelCompactionTsFileManagement.class);
 
-  private final int seqLevelNum =
+  private final int totalSeqLevelNum =
       Math.max(IoTDBDescriptor.getInstance().getConfig().getSeqLevelNum(), 1);
   private final int seqFileNumInEachLevel =
       Math.max(IoTDBDescriptor.getInstance().getConfig().getSeqFileNumInEachLevel(), 1);
@@ -267,7 +267,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
       long timePartitionId = tsFileResource.getTimePartition();
       int level = getMergeLevel(tsFileResource.getTsFile());
       if (sequence) {
-        if (level <= seqLevelNum - 1) {
+        if (level <= totalSeqLevelNum - 1) {
           // current file has normal level
           sequenceTsFileResources
               .computeIfAbsent(timePartitionId, this::newSequenceTsFileResources)
@@ -277,7 +277,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
           // current file has too high level
           sequenceTsFileResources
               .computeIfAbsent(timePartitionId, this::newSequenceTsFileResources)
-              .get(seqLevelNum - 1)
+              .get(totalSeqLevelNum - 1)
               .add(tsFileResource);
         }
       } else {
@@ -397,7 +397,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
       if (sequence) {
         for (List<SortedSet<TsFileResource>> partitionSequenceTsFileResource :
             sequenceTsFileResources.values()) {
-          for (int i = seqLevelNum - 1; i >= 0; i--) {
+          for (int i = totalSeqLevelNum - 1; i >= 0; i--) {
             result += partitionSequenceTsFileResource.get(i).size();
           }
         }
@@ -564,7 +564,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
       forkTsFileList(
           forkedSequenceTsFileResources,
           sequenceTsFileResources.computeIfAbsent(timePartition, this::newSequenceTsFileResources),
-          seqLevelNum);
+          totalSeqLevelNum);
       // we have to copy all unseq file
       forkTsFileList(
           forkedUnSequenceTsFileResources,
@@ -594,47 +594,28 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
 
   @Override
   protected void merge(long timePartition) {
-    isMergeExecutedInCurrentTask =
-        merge(
-            forkedSequenceTsFileResources, true, timePartition, seqLevelNum, seqFileNumInEachLevel);
-    if (enableUnseqCompaction
-        && unseqLevelNum <= 1
-        && forkedUnSequenceTsFileResources.get(0).size() > 0) {
-      isMergeExecutedInCurrentTask = true;
-      merge(
-          isForceFullMerge,
-          getTsFileListByTimePartition(true, timePartition),
-          forkedUnSequenceTsFileResources.get(0),
-          Long.MAX_VALUE);
-    } else {
-      isMergeExecutedInCurrentTask =
-          merge(
-              forkedUnSequenceTsFileResources,
-              false,
-              timePartition,
-              unseqLevelNum,
-              unseqFileNumInEachLevel);
+    // do unseq compaction if has an unseq file in max unseq level
+    if (enableUnseqCompaction && forkedUnSequenceTsFileResources.get(unseqLevelNum - 1).size() > 0) {
+      doUnseqMerge(isForceFullMerge, getTsFileListByTimePartition(true, timePartition),
+          forkedUnSequenceTsFileResources.get(unseqLevelNum - 1));
     }
+
+    doLevelCompaction(forkedSequenceTsFileResources, true, timePartition, totalSeqLevelNum,
+        seqFileNumInEachLevel);
+
+    doLevelCompaction(forkedUnSequenceTsFileResources, false, timePartition, unseqLevelNum,
+        unseqFileNumInEachLevel);
+
   }
 
-  @SuppressWarnings("squid:S3776")
-  private boolean merge(
-      List<List<TsFileResource>> mergeResources,
+  @SuppressWarnings("squid:S3776") //MERGE TODO: move to a LevelCompactionExecutor
+  private boolean doLevelCompaction(
+      List<List<TsFileResource>> mergeResources, // each level is a List<TsFileResource>
       boolean sequence,
       long timePartition,
       int currMaxLevel,
       int currMaxFileNumInEachLevel) {
     // wait until unseq merge has finished
-    while (isUnseqMerging) {
-      try {
-        Thread.sleep(200);
-      } catch (InterruptedException e) {
-        logger.error("{} [Compaction] shutdown", storageGroupName, e);
-        Thread.currentThread().interrupt();
-        return false;
-      }
-    }
-    isSeqMerging = true;
     long startTimeMillis = System.currentTimeMillis();
     // whether execute merge chunk in the loop below
     boolean isMergeExecutedInCurrentTask = false;
@@ -646,83 +627,72 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
         if (currMaxFileNumInEachLevel <= currLevelTsFileResource.size()) {
           // just merge part of the file
           isMergeExecutedInCurrentTask = true;
-          // level is numbered from 0
-          if (enableUnseqCompaction && !sequence && i == currMaxLevel - 2) {
-            // do not merge current unseq file level to upper level and just merge all of them to
-            // seq file
-            isSeqMerging = false;
-            merge(
-                isForceFullMerge,
-                getTsFileListByTimePartition(true, timePartition),
-                mergeResources.get(i),
-                Long.MAX_VALUE);
-          } else {
-            compactionLogger = new CompactionLogger(storageGroupDir, storageGroupName);
-            // log source file list and target file for recover
-            for (TsFileResource mergeResource : mergeResources.get(i)) {
-              compactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile());
-            }
-            File newLevelFile =
-                TsFileResource.modifyTsFileNameMergeCnt(mergeResources.get(i).get(0).getTsFile());
-            compactionLogger.logSequence(sequence);
-            compactionLogger.logFile(TARGET_NAME, newLevelFile);
-            List<TsFileResource> toMergeTsFiles =
-                mergeResources.get(i).subList(0, currMaxFileNumInEachLevel);
+          compactionLogger = new CompactionLogger(storageGroupDir, storageGroupName);
+          // log source file list and target file for recover
+          for (TsFileResource mergeResource : mergeResources.get(i)) {
+            compactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile());
+          }
+          File newLevelFile =
+              TsFileResource.modifyTsFileNameMergeCnt(mergeResources.get(i).get(0).getTsFile());
+          compactionLogger.logSequence(sequence);
+          compactionLogger.logFile(TARGET_NAME, newLevelFile);
+          List<TsFileResource> toMergeTsFiles =
+              mergeResources.get(i).subList(0, currMaxFileNumInEachLevel);
+          logger.info(
+              "{} [Compaction] merge level-{}'s {} TsFiles to next level",
+              storageGroupName,
+              i,
+              toMergeTsFiles.size());
+          for (TsFileResource toMergeTsFile : toMergeTsFiles) {
             logger.info(
-                "{} [Compaction] merge level-{}'s {} TsFiles to next level",
-                storageGroupName,
-                i,
-                toMergeTsFiles.size());
-            for (TsFileResource toMergeTsFile : toMergeTsFiles) {
-              logger.info(
-                  "{} [Compaction] start to merge TsFile {}", storageGroupName, toMergeTsFile);
-            }
+                "{} [Compaction] start to merge TsFile {}", storageGroupName, toMergeTsFile);
+          }
 
-            TsFileResource newResource = new TsFileResource(newLevelFile);
-            List<Modification> modifications = new ArrayList<>();
-            // merge, read from source files and write to target file
-            CompactionUtils.merge(
-                newResource,
-                toMergeTsFiles,
-                storageGroupName,
-                compactionLogger,
-                new HashSet<>(),
-                sequence,
-                modifications);
-            logger.info(
-                "{} [Compaction] merged level-{}'s {} TsFiles to next level, and start to delete old files",
-                storageGroupName,
-                i,
-                toMergeTsFiles.size());
-            writeLock();
-            try {
-              if (Thread.currentThread().isInterrupted()) {
-                throw new InterruptedException(
-                    String.format("%s [Compaction] abort", storageGroupName));
-              }
+          TsFileResource newResource = new TsFileResource(newLevelFile);
+          List<Modification> modifications = new ArrayList<>();
+          // merge, read from source files and write to target file
+          CompactionUtils.merge(
+              newResource,
+              toMergeTsFiles,
+              storageGroupName,
+              compactionLogger,
+              new HashSet<>(),
+              sequence,
+              modifications);
+          logger.info(
+              "{} [Compaction] merged level-{}'s {} TsFiles to next level, and start to delete old files",
+              storageGroupName,
+              i,
+              toMergeTsFiles.size());
+          writeLock();
+          try {
+            if (Thread.currentThread().isInterrupted()) {
+              throw new InterruptedException(
+                  String.format("%s [Compaction] abort", storageGroupName));
+            }
 
-              if (sequence) {
-                sequenceTsFileResources.get(timePartition).get(i + 1).add(newResource);
-              } else {
-                unSequenceTsFileResources.get(timePartition).get(i + 1).add(newResource);
-              }
-              deleteLevelFilesInList(timePartition, toMergeTsFiles, i, sequence);
-              if (mergeResources.size() > i + 1) {
-                mergeResources.get(i + 1).add(newResource);
-              }
-            } finally {
-              writeUnlock();
+            if (sequence) {
+              sequenceTsFileResources.get(timePartition).get(i + 1).add(newResource);
+            } else {
+              unSequenceTsFileResources.get(timePartition).get(i + 1).add(newResource);
             }
-            deleteLevelFilesInDisk(toMergeTsFiles);
-            renameLevelFilesMods(modifications, toMergeTsFiles, newResource);
-            compactionLogger.close();
-            File logFile =
-                FSFactoryProducer.getFSFactory()
-                    .getFile(storageGroupDir, storageGroupName + COMPACTION_LOG_NAME);
-            if (logFile.exists()) {
-              Files.delete(logFile.toPath());
+            deleteLevelFilesInList(timePartition, toMergeTsFiles, i, sequence);
+            if (mergeResources.size() > i + 1) {
+              mergeResources.get(i + 1).add(newResource);
             }
+          } finally {
+            writeUnlock();
+          }
+          deleteLevelFilesInDisk(toMergeTsFiles);
+          renameLevelFilesMods(modifications, toMergeTsFiles, newResource);
+          compactionLogger.close();
+          File logFile =
+              FSFactoryProducer.getFSFactory()
+                  .getFile(storageGroupDir, storageGroupName + COMPACTION_LOG_NAME);
+          if (logFile.exists()) {
+            Files.delete(logFile.toPath());
           }
+          break;
         }
       }
     } catch (Exception e) {
@@ -736,7 +706,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
       restoreCompaction();
       logger.error("Error occurred in Compaction Merge thread", e);
     } finally {
-      isSeqMerging = false;
       // reset the merge working state to false
       logger.info(
           "{} [Compaction] merge end time isSeq = {}, consumption: {} ms",
@@ -749,7 +718,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
 
   private List<SortedSet<TsFileResource>> newSequenceTsFileResources(Long k) {
     List<SortedSet<TsFileResource>> newSequenceTsFileResources = new ArrayList<>();
-    for (int i = 0; i < seqLevelNum; i++) {
+    for (int i = 0; i < totalSeqLevelNum; i++) {
       newSequenceTsFileResources.add(
           new TreeSet<>(
               (o1, o2) -> {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
index f9112e1..84c6859 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
@@ -253,7 +253,7 @@ public class MergeManager implements IService, MergeManagerMBean {
   private void mergeAll() {
     try {
       StorageEngine.getInstance()
-          .mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+          .mergeAll(IoTDBDescriptor.getInstance().getConfig().isFullMerge());
     } catch (StorageEngineException e) {
       logger.error("Cannot perform a global merge because", e);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
index d7bc827..050d83e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.query.reader.resource.CachedUnseqResourceMergeReader;
 import org.apache.iotdb.db.utils.MergeUtils;
+import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
@@ -91,26 +92,27 @@ public class MergeResource {
     this.unseqFiles = unseqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
   }
 
-  public void clear() throws IOException {
-    for (TsFileSequenceReader sequenceReader : fileReaderCache.values()) {
-      sequenceReader.close();
-    }
-    for (RestorableTsFileIOWriter writer : fileWriterCache.values()) {
-      writer.close();
-    }
-
-    fileReaderCache.clear();
-    fileWriterCache.clear();
-    modificationCache.clear();
-    measurementSchemaMap.clear();
-    chunkWriterCache.clear();
-  }
-
   public IMeasurementSchema getSchema(PartialPath path) {
     return measurementSchemaMap.get(path);
   }
 
   /**
+   * startMerging() is called after selecting files
+   *
+   * do not cache metadata until true candidates are chosen, or too much metadata will be
+   * cached during selection
+   */
+  public void startMerging() {
+    cacheDeviceMeta = true;
+    for (TsFileResource tsFileResource : seqFiles) {
+      tsFileResource.setMerging(true);
+    }
+    for (TsFileResource tsFileResource : unseqFiles) {
+      tsFileResource.setMerging(true);
+    }
+  }
+
+  /**
    * Construct a new or get an existing RestorableTsFileIOWriter of a merge temp file for a SeqFile.
    * The path of the merge temp file will be the seqFile's + ".merge".
    *
@@ -290,4 +292,20 @@ public class MergeResource {
   public Map<String, Pair<Long, Long>> getStartEndTime(TsFileResource tsFileResource) {
     return startEndTimeCache.getOrDefault(tsFileResource, new HashMap<>());
   }
+
+  @TestOnly
+  public void clear() throws IOException {
+    for (TsFileSequenceReader sequenceReader : fileReaderCache.values()) {
+      sequenceReader.close();
+    }
+    for (RestorableTsFileIOWriter writer : fileWriterCache.values()) {
+      writer.close();
+    }
+
+    fileReaderCache.clear();
+    fileWriterCache.clear();
+    modificationCache.clear();
+    measurementSchemaMap.clear();
+    chunkWriterCache.clear();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java
index 9693d3c..e6f84fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java
@@ -78,6 +78,7 @@ public class LogAnalyzer {
 
   private Status status;
 
+  // MERGE TODO: add two methods: List<String> getSeqTsFiles()  List<String> getUnseqTsFiles()
   public LogAnalyzer(
       MergeResource resource, String taskName, File logFile, String storageGroupName) {
     this.resource = resource;
@@ -102,10 +103,11 @@ public class LogAnalyzer {
 
         analyzeUnseqFiles(bufferedReader);
 
-        List<PartialPath> storageGroupPaths =
+        // get all timeseries in this storage group
+        List<PartialPath> timeseriesPaths =
             IoTDB.metaManager.getAllTimeseriesPath(new PartialPath(storageGroupName + ".*"));
         unmergedPaths = new ArrayList<>();
-        unmergedPaths.addAll(storageGroupPaths);
+        unmergedPaths.addAll(timeseriesPaths);
 
         analyzeMergedSeries(bufferedReader, unmergedPaths);
 
@@ -120,6 +122,7 @@ public class LogAnalyzer {
       return;
     }
     long startTime = System.currentTimeMillis();
+    //
     List<TsFileResource> mergeSeqFiles = new ArrayList<>();
     while ((currLine = bufferedReader.readLine()) != null) {
       if (STR_UNSEQ_FILES.equals(currLine)) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/IMergeFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/IMergeFileSelector.java
index 077a58b..63fc04c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/IMergeFileSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/selector/IMergeFileSelector.java
@@ -29,6 +29,10 @@ import java.util.List;
  */
 public interface IMergeFileSelector {
 
+  /**
+   * @return seqFileList, unseqFileList
+   * @throws MergeException
+   */
   List[] select() throws MergeException;
 
   int getConcurrentMergeNum();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
index 381651c..89f4574 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
@@ -51,7 +51,7 @@ import java.util.concurrent.Callable;
  * merged chunks in the temp files back to the seqFiles or move the unmerged chunks in the seqFiles
  * into temp files and replace the seqFiles with the temp files. 3. remove unseqFiles
  */
-public class MergeTask implements Callable<Void> {
+public class MergeTask {
 
   public static final String MERGE_SUFFIX = ".merge";
   private static final Logger logger = LoggerFactory.getLogger(MergeTask.class);
@@ -67,7 +67,7 @@ public class MergeTask implements Callable<Void> {
   States states = States.START;
   MergeMultiChunkTask chunkTask;
   MergeFileTask fileTask;
-  private MergeCallback callback;
+  protected MergeCallback callback;
 
   MergeTask(
       List<TsFileResource> seqFiles,
@@ -90,29 +90,18 @@ public class MergeTask implements Callable<Void> {
       MergeResource mergeResource,
       String storageGroupSysDir,
       MergeCallback callback,
-      String taskName,
       boolean fullMerge,
       int concurrentMergeSeriesNum,
       String storageGroupName) {
     this.resource = mergeResource;
     this.storageGroupSysDir = storageGroupSysDir;
     this.callback = callback;
-    this.taskName = taskName;
+    this.taskName = storageGroupName;
     this.fullMerge = fullMerge;
     this.concurrentMergeSeriesNum = concurrentMergeSeriesNum;
     this.storageGroupName = storageGroupName;
   }
 
-  @Override
-  public Void call() throws Exception {
-    try {
-      doMerge();
-    } catch (Exception e) {
-      logger.error("Runtime exception in merge {}", taskName, e);
-      abort();
-    }
-    return null;
-  }
 
   private void abort() throws IOException {
     states = States.ABORTED;
@@ -125,101 +114,103 @@ public class MergeTask implements Callable<Void> {
         new File(storageGroupSysDir, MergeLogger.MERGE_LOG_NAME));
   }
 
-  private void doMerge() throws IOException, MetadataException {
-    if (resource.getSeqFiles().isEmpty()) {
-      logger.info("{} no sequence file to merge into, so will abort task.", taskName);
-      abort();
-      return;
-    }
-    if (logger.isInfoEnabled()) {
-      logger.info(
-          "{} starts to merge {} seqFiles, {} unseqFiles",
-          taskName,
-          resource.getSeqFiles().size(),
-          resource.getUnseqFiles().size());
-    }
-    long startTime = System.currentTimeMillis();
-    long totalFileSize =
-        MergeUtils.collectFileSizes(resource.getSeqFiles(), resource.getUnseqFiles());
-    mergeLogger = new MergeLogger(storageGroupSysDir);
+  public void doMerge() throws IOException {
+    try {
+      if (resource.getSeqFiles().isEmpty()) {
+        logger.info("{} no sequence file to merge into, so will abort task.", taskName);
+        abort();
+        return;
+      }
+      if (logger.isInfoEnabled()) {
+        logger.info(
+            "{} starts to merge {} seqFiles, {} unseqFiles",
+            taskName,
+            resource.getSeqFiles().size(),
+            resource.getUnseqFiles().size());
+      }
+      long startTime = System.currentTimeMillis();
+      long totalFileSize =
+          MergeUtils.collectFileSizes(resource.getSeqFiles(), resource.getUnseqFiles());
+      mergeLogger = new MergeLogger(storageGroupSysDir);
 
-    mergeLogger.logFiles(resource);
+      mergeLogger.logFiles(resource);
 
-    Set<PartialPath> devices = IoTDB.metaManager.getDevices(new PartialPath(storageGroupName));
-    Map<PartialPath, IMeasurementSchema> measurementSchemaMap = new HashMap<>();
-    List<PartialPath> unmergedSeries = new ArrayList<>();
-    for (PartialPath device : devices) {
-      MNode deviceNode = IoTDB.metaManager.getNodeByPath(device);
-      // todo add template merge logic
-      for (Entry<String, MNode> entry : deviceNode.getChildren().entrySet()) {
-        PartialPath path = device.concatNode(entry.getKey());
-        measurementSchemaMap.put(path, ((MeasurementMNode) entry.getValue()).getSchema());
-        unmergedSeries.add(path);
+      Set<PartialPath> devices = IoTDB.metaManager.getDevices(new PartialPath(storageGroupName));
+      Map<PartialPath, IMeasurementSchema> measurementSchemaMap = new HashMap<>();
+      List<PartialPath> unmergedSeries = new ArrayList<>();
+      for (PartialPath device : devices) {
+        MNode deviceNode = IoTDB.metaManager.getNodeByPath(device);
+        // todo add template merge logic
+        for (Entry<String, MNode> entry : deviceNode.getChildren().entrySet()) {
+          PartialPath path = device.concatNode(entry.getKey());
+          measurementSchemaMap.put(path, ((MeasurementMNode) entry.getValue()).getSchema());
+          unmergedSeries.add(path);
+        }
       }
-    }
-    resource.setMeasurementSchemaMap(measurementSchemaMap);
+      resource.setMeasurementSchemaMap(measurementSchemaMap);
 
-    mergeLogger.logMergeStart();
+      mergeLogger.logMergeStart();
 
-    chunkTask =
-        new MergeMultiChunkTask(
-            mergeContext,
-            taskName,
-            mergeLogger,
-            resource,
-            fullMerge,
-            unmergedSeries,
-            concurrentMergeSeriesNum,
-            storageGroupName);
-    states = States.MERGE_CHUNKS;
-    chunkTask.mergeSeries();
-    if (Thread.interrupted()) {
-      logger.info("Merge task {} aborted", taskName);
-      abort();
-      return;
-    }
+      chunkTask =
+          new MergeMultiChunkTask(
+              mergeContext,
+              taskName,
+              mergeLogger,
+              resource,
+              fullMerge,
+              unmergedSeries,
+              concurrentMergeSeriesNum,
+              storageGroupName);
+      states = States.MERGE_CHUNKS;
+      chunkTask.mergeSeries();
+      if (Thread.interrupted()) {
+        logger.info("Merge task {} aborted", taskName);
+        abort();
+        return;
+      }
 
-    fileTask =
-        new MergeFileTask(taskName, mergeContext, mergeLogger, resource, resource.getSeqFiles());
-    states = States.MERGE_FILES;
-    chunkTask = null;
-    fileTask.mergeFiles();
-    if (Thread.interrupted()) {
-      logger.info("Merge task {} aborted", taskName);
-      abort();
-      return;
-    }
+      fileTask =
+          new MergeFileTask(taskName, mergeContext, mergeLogger, resource, resource.getSeqFiles());
+      states = States.MERGE_FILES;
+      chunkTask = null;
+      fileTask.mergeFiles();
+      if (Thread.interrupted()) {
+        logger.info("Merge task {} aborted", taskName);
+        abort();
+        return;
+      }
 
-    states = States.CLEAN_UP;
-    fileTask = null;
-    cleanUp(true);
-    if (logger.isInfoEnabled()) {
-      double elapsedTime = (double) (System.currentTimeMillis() - startTime) / 1000.0;
-      double byteRate = totalFileSize / elapsedTime / 1024 / 1024;
-      double seriesRate = unmergedSeries.size() / elapsedTime;
-      double chunkRate = mergeContext.getTotalChunkWritten() / elapsedTime;
-      double fileRate =
-          (resource.getSeqFiles().size() + resource.getUnseqFiles().size()) / elapsedTime;
-      double ptRate = mergeContext.getTotalPointWritten() / elapsedTime;
-      logger.info(
-          "{} ends after {}s, byteRate: {}MB/s, seriesRate {}/s, chunkRate: {}/s, "
-              + "fileRate: {}/s, ptRate: {}/s",
-          taskName,
-          elapsedTime,
-          byteRate,
-          seriesRate,
-          chunkRate,
-          fileRate,
-          ptRate);
+      states = States.CLEAN_UP;
+      fileTask = null;
+      cleanUp(true);
+      if (logger.isInfoEnabled()) {
+        double elapsedTime = (double) (System.currentTimeMillis() - startTime) / 1000.0;
+        double byteRate = totalFileSize / elapsedTime / 1024 / 1024;
+        double seriesRate = unmergedSeries.size() / elapsedTime;
+        double chunkRate = mergeContext.getTotalChunkWritten() / elapsedTime;
+        double fileRate =
+            (resource.getSeqFiles().size() + resource.getUnseqFiles().size()) / elapsedTime;
+        double ptRate = mergeContext.getTotalPointWritten() / elapsedTime;
+        logger.info(
+            "{} ends after {}s, byteRate: {}MB/s, seriesRate {}/s, chunkRate: {}/s, "
+                + "fileRate: {}/s, ptRate: {}/s",
+            taskName,
+            elapsedTime,
+            byteRate,
+            seriesRate,
+            chunkRate,
+            fileRate,
+            ptRate);
+      }
+    } catch (Exception e) {
+      logger.error("Runtime exception in merge {}", taskName, e);
+      abort();
     }
   }
 
   void cleanUp(boolean executeCallback) throws IOException {
     logger.info("{} is cleaning up", taskName);
 
-    resource.clear();
-    mergeContext.clear();
-
     if (mergeLogger != null) {
       mergeLogger.close();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
index 32cd897..cb1e39e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
@@ -53,6 +53,7 @@ public class RecoverMergeTask extends MergeTask {
 
   private LogAnalyzer analyzer;
 
+  // MERGE TODO: get seqFiles and unseqFiles to be recovered from merge.log)
   public RecoverMergeTask(
       List<TsFileResource> seqFiles,
       List<TsFileResource> unseqFiles,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 26261df..75b2175 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.TsFileManagement;
 import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
@@ -174,8 +174,8 @@ public class StorageGroupProcessor {
   private final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors = new TreeMap<>();
   /** time partition id in the storage group -> tsFileProcessor for this time partition */
   private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors = new TreeMap<>();
-  /** compactionMergeWorking is used to wait for last compaction to be done. */
-  private volatile boolean compactionMergeWorking = false;
+  /** isCompactionWorking is used to wait for last compaction to be done. */
+  private volatile boolean isCompactionWorking = false;
   // upgrading sequence TsFile resource list
   private List<TsFileResource> upgradeSeqFileList = new LinkedList<>();
 
@@ -446,33 +446,37 @@ public class StorageGroupProcessor {
         recoverTsFiles(value, false);
       }
 
-      String taskName =
+      String unseqMergeTaskName =
           logicalStorageGroupName + "-" + virtualStorageGroupId + "-" + System.currentTimeMillis();
       File mergingMods =
           SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, MERGING_MODIFICATION_FILE_NAME);
       if (mergingMods.exists()) {
         this.tsFileManagement.mergingModification = new ModificationFile(mergingMods.getPath());
       }
+
+      // MERGE TODO: only pass the tsfileManagement into RecoverMergeTask
       RecoverMergeTask recoverMergeTask =
           new RecoverMergeTask(
               new ArrayList<>(tsFileManagement.getTsFileList(true)),
               tsFileManagement.getTsFileList(false),
               storageGroupSysDir.getPath(),
               tsFileManagement::mergeEndAction,
-              taskName,
-              IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(),
+              unseqMergeTaskName,
+              IoTDBDescriptor.getInstance().getConfig().isFullMerge(),
               logicalStorageGroupName + "-" + virtualStorageGroupId);
       logger.info(
           "{} - {} a RecoverMergeTask {} starts...",
           logicalStorageGroupName,
           virtualStorageGroupId,
-          taskName);
+          unseqMergeTaskName);
       recoverMergeTask.recoverMerge(
           IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
       if (!IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()) {
         mergingMods.delete();
       }
-      recoverCompaction();
+
+      // MERGE TODO: move unseqMergeRecover into TsFileManagement.CompactionRecoverTask()
+      recoverLevelCompaction();
       for (TsFileResource resource : tsFileManagement.getTsFileList(true)) {
         long partitionNum = resource.getTimePartition();
         updatePartitionFileVersion(partitionNum, resource.getVersion());
@@ -511,27 +515,25 @@ public class StorageGroupProcessor {
       globalLatestFlushedTimeForEachDevice.putAll(endTimeMap);
     }
 
-    if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()
-        && seqTsFileResources.size() > 0) {
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
       for (long timePartitionId : timePartitionIdVersionControllerMap.keySet()) {
-        executeCompaction(
-            timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+        executeCompaction(timePartitionId, IoTDBDescriptor.getInstance().getConfig().isFullMerge());
       }
     }
   }
 
-  private void recoverCompaction() {
-    if (!CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
-      compactionMergeWorking = true;
+  private void recoverLevelCompaction() {
+    if (!CompactionTaskManager.getInstance().isTerminated()) {
+      isCompactionWorking = true;
       logger.info(
           "{} - {} submit a compaction recover merge task",
           logicalStorageGroupName,
           virtualStorageGroupId);
       try {
-        CompactionMergeTaskPoolManager.getInstance()
+        CompactionTaskManager.getInstance()
             .submitTask(
                 logicalStorageGroupName,
-                tsFileManagement.new CompactionRecoverTask(this::closeCompactionMergeCallBack));
+                tsFileManagement.new LevelCompactionRecoverTask(this::closeCompactionMergeCallBack));
       } catch (RejectedExecutionException e) {
         this.closeCompactionMergeCallBack(false, 0);
         logger.error(
@@ -1971,20 +1973,20 @@ public class StorageGroupProcessor {
 
     executeCompaction(
         tsFileProcessor.getTimeRangeId(),
-        IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+        IoTDBDescriptor.getInstance().getConfig().isFullMerge());
   }
 
   private void executeCompaction(long timePartition, boolean fullMerge) {
-    if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
-      compactionMergeWorking = true;
+    if (!isCompactionWorking && !CompactionTaskManager.getInstance().isTerminated()) {
+      isCompactionWorking = true;
       logger.info(
           "{} submit a compaction merge task",
           logicalStorageGroupName + "-" + virtualStorageGroupId);
       try {
         // fork and filter current tsfile, then commit then to compaction merge
         tsFileManagement.forkCurrentFileList(timePartition);
-        tsFileManagement.setForceFullMerge(fullMerge);
-        CompactionMergeTaskPoolManager.getInstance()
+        tsFileManagement.setFullMerge(fullMerge);
+        CompactionTaskManager.getInstance()
             .submitTask(
                 logicalStorageGroupName,
                 tsFileManagement
@@ -2007,9 +2009,9 @@ public class StorageGroupProcessor {
   private void closeCompactionMergeCallBack(boolean isMerge, long timePartitionId) {
     if (isMerge && IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
       executeCompaction(
-          timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+          timePartitionId, IoTDBDescriptor.getInstance().getConfig().isFullMerge());
     } else {
-      this.compactionMergeWorking = false;
+      this.isCompactionWorking = false;
     }
   }
 
@@ -2842,7 +2844,7 @@ public class StorageGroupProcessor {
     writeLock();
     try {
       // abort ongoing comapctions and merges
-      CompactionMergeTaskPoolManager.getInstance().abortCompaction(logicalStorageGroupName);
+      CompactionTaskManager.getInstance().abortCompaction(logicalStorageGroupName);
       MergeManager.getINSTANCE().abortMerge(logicalStorageGroupName);
       // close all working files that should be removed
       removePartitions(filter, workSequenceTsFileProcessors.entrySet());
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 51a64ac..e4458ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.cost.statistic.Measurement;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor;
-import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.flush.FlushManager;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
@@ -108,7 +108,7 @@ public class IoTDB implements IoTDBMBean {
     registerManager.register(TVListAllocator.getInstance());
     registerManager.register(CacheHitRatioMonitor.getInstance());
     registerManager.register(MergeManager.getINSTANCE());
-    registerManager.register(CompactionMergeTaskPoolManager.getInstance());
+    registerManager.register(CompactionTaskManager.getInstance());
     JMXService.registerMBean(getInstance(), mbeanName);
     registerManager.register(StorageEngine.getInstance());
     registerManager.register(TemporaryQueryDataFileService.getInstance());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
index cf7441f..1c0dfeb 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
@@ -69,11 +69,10 @@ public class MergeLogTest extends MergeTest {
             new MergeResource(seqResources.subList(0, 1), unseqResources.subList(0, 1)),
             tempSGDir.getPath(),
             this::testCallBack,
-            "test",
             false,
             1,
             MERGE_TEST_SG);
-    mergeTask.call();
+    mergeTask.doMerge();
   }
 
   private void testCallBack(
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java
index 9842f53..2232521 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java
@@ -99,20 +99,11 @@ public class MergeManagerTest extends MergeTest {
     private String progress = "0";
 
     public FakedMainMergeTask(int serialNum) {
-      super(null, null, null, null, false, 0, null);
+      super(null, null, null, false, 0, null);
       this.serialNum = serialNum;
     }
 
     @Override
-    public Void call() {
-      while (!Thread.currentThread().isInterrupted()) {
-        // wait until interrupt
-      }
-      progress = "1";
-      return null;
-    }
-
-    @Override
     public String getStorageGroupName() {
       return "test";
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
index 6c70fe7..0684f46 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
@@ -185,11 +185,10 @@ public class MergeOverLapTest extends MergeTest {
             new MergeResource(seqResources, unseqResources),
             tempSGDir.getPath(),
             (k, v, l) -> {},
-            "test",
             true,
             1,
             MERGE_TEST_SG);
-    mergeTask.call();
+    mergeTask.doMerge();
 
     QueryContext context = new QueryContext();
     PartialPath path =
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java
index ca816cf..84d4c5f 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java
@@ -48,8 +48,8 @@ public class MergePerfTest extends MergeTest {
     resource.setCacheDeviceMeta(true);
     MergeTask mergeTask =
         new MergeTask(
-            resource, tempSGDir.getPath(), (k, v, l) -> {}, "test", fullMerge, 100, MERGE_TEST_SG);
-    mergeTask.call();
+            resource, tempSGDir.getPath(), (k, v, l) -> {}, fullMerge, 100, MERGE_TEST_SG);
+    mergeTask.doMerge();
     timeConsumption = System.currentTimeMillis() - timeConsumption;
     tearDown();
     FileUtils.deleteDirectory(tempSGDir);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
index d4f5dfe..5f7c036 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
@@ -79,11 +79,10 @@ public class MergeTaskTest extends MergeTest {
             new MergeResource(seqResources, unseqResources),
             tempSGDir.getPath(),
             (k, v, l) -> {},
-            "test",
             false,
             1,
             MERGE_TEST_SG);
-    mergeTask.call();
+    mergeTask.doMerge();
 
     QueryContext context = new QueryContext();
     PartialPath path =
@@ -123,11 +122,10 @@ public class MergeTaskTest extends MergeTest {
             (k, v, l) -> {
               assertEquals(499, k.get(2).getEndTime("root.mergeTest.device1"));
             },
-            "test",
             false,
             1,
             MERGE_TEST_SG);
-    mergeTask.call();
+    mergeTask.doMerge();
   }
 
   @Test
@@ -177,11 +175,10 @@ public class MergeTaskTest extends MergeTest {
             (k, v, l) -> {
               assertEquals(49, k.get(0).getEndTime("root.mergeTest.device1"));
             },
-            "test",
             false,
             1,
             MERGE_TEST_SG);
-    mergeTask.call();
+    mergeTask.doMerge();
   }
 
   @Test
@@ -191,11 +188,10 @@ public class MergeTaskTest extends MergeTest {
             new MergeResource(seqResources, unseqResources),
             tempSGDir.getPath(),
             (k, v, l) -> {},
-            "test",
             true,
             1,
             MERGE_TEST_SG);
-    mergeTask.call();
+    mergeTask.doMerge();
 
     QueryContext context = new QueryContext();
     PartialPath path =
@@ -235,11 +231,10 @@ public class MergeTaskTest extends MergeTest {
             new MergeResource(seqResources, unseqResources),
             tempSGDir.getPath(),
             (k, v, l) -> {},
-            "test",
             false,
             1,
             MERGE_TEST_SG);
-    mergeTask.call();
+    mergeTask.doMerge();
 
     QueryContext context = new QueryContext();
     PartialPath path =
@@ -275,11 +270,10 @@ public class MergeTaskTest extends MergeTest {
             new MergeResource(seqResources, unseqResources.subList(0, 1)),
             tempSGDir.getPath(),
             (k, v, l) -> {},
-            "test",
             false,
             1,
             MERGE_TEST_SG);
-    mergeTask.call();
+    mergeTask.doMerge();
 
     QueryContext context = new QueryContext();
     PartialPath path =
@@ -319,11 +313,10 @@ public class MergeTaskTest extends MergeTest {
             new MergeResource(seqResources, unseqResources.subList(5, 6)),
             tempSGDir.getPath(),
             (k, v, l) -> {},
-            "test",
             false,
             1,
             MERGE_TEST_SG);
-    mergeTask.call();
+    mergeTask.doMerge();
 
     QueryContext context = new QueryContext();
     PartialPath path =
@@ -359,11 +352,10 @@ public class MergeTaskTest extends MergeTest {
             new MergeResource(seqResources, unseqResources.subList(0, 5)),
             tempSGDir.getPath(),
             (k, v, l) -> {},
-            "test",
             false,
             1,
             MERGE_TEST_SG);
-    mergeTask.call();
+    mergeTask.doMerge();
 
     QueryContext context = new QueryContext();
     PartialPath path =
@@ -424,11 +416,10 @@ public class MergeTaskTest extends MergeTest {
                 e.printStackTrace();
               }
             },
-            "test",
             false,
             1,
             MERGE_TEST_SG);
-    mergeTask.call();
+    mergeTask.doMerge();
 
     QueryContext context = new QueryContext();
     PartialPath path =
@@ -474,11 +465,10 @@ public class MergeTaskTest extends MergeTest {
             new MergeResource(testSeqResources, testUnseqResource),
             tempSGDir.getPath(),
             (k, v, l) -> {},
-            "test",
             false,
             1,
             MERGE_TEST_SG);
-    mergeTask.call();
+    mergeTask.doMerge();
 
     QueryContext context = new QueryContext();
     PartialPath path =
@@ -540,11 +530,10 @@ public class MergeTaskTest extends MergeTest {
             (k, v, l) -> {
               assertEquals(99, k.get(0).getEndTime("root.mergeTest.device1"));
             },
-            "test",
             false,
             1,
             MERGE_TEST_SG);
-    mergeTask.call();
+    mergeTask.doMerge();
   }
 
   private void prepareFileWithLastSensor(
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 3d7e2b6..8ad49c7 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -610,7 +610,7 @@ public class StorageGroupProcessorTest {
     }
 
     processor.syncCloseAllWorkingTsFileProcessors();
-    processor.merge(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+    processor.merge(IoTDBDescriptor.getInstance().getConfig().isFullMerge());
     while (processor.getTsFileManagement().isUnseqMerging) {
       // wait
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
index d01b9d9..289ce56 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.integration;
 
 import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.jdbc.Config;
@@ -379,7 +379,7 @@ public class IoTDBRestartIT {
     }
 
     try {
-      CompactionMergeTaskPoolManager.getInstance().waitAllCompactionFinish();
+      CompactionTaskManager.getInstance().waitAllCompactionFinish();
       Thread.sleep(10000);
       EnvironmentUtils.restartDaemon();
     } catch (Exception e) {
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index fc29451..9892780 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
-import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.TriggerManagementException;
@@ -86,7 +86,7 @@ public class EnvironmentUtils {
 
   public static void cleanEnv() throws IOException, StorageEngineException {
     // wait all compaction finished
-    CompactionMergeTaskPoolManager.getInstance().waitAllCompactionFinish();
+    CompactionTaskManager.getInstance().waitAllCompactionFinish();
 
     // deregister all user defined classes
     try {