You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/08/09 15:47:40 UTC

[iotdb] branch rel/0.12 updated: [To rel/0.12] Avoid Compaction blocking Flush (Fix CI not stop problem) (#3695)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new e9aea37  [To rel/0.12] Avoid Compaction blocking Flush (Fix CI not stop problem)  (#3695)
e9aea37 is described below

commit e9aea37a0f8e00b5455a664a2e08ee58022a9f80
Author: liuxuxin <37...@users.noreply.github.com>
AuthorDate: Mon Aug 9 23:47:12 2021 +0800

    [To rel/0.12] Avoid Compaction blocking Flush (Fix CI not stop problem)  (#3695)
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  4 +-
 .../compaction/CompactionMergeTaskPoolManager.java | 41 +++-------
 .../db/engine/compaction/TsFileManagement.java     | 32 ++++----
 .../level/LevelCompactionTsFileManagement.java     | 12 ++-
 .../iotdb/db/engine/merge/manage/MergeManager.java | 16 ----
 .../merge/task/CompactionMergeRecoverTask.java     | 91 ++++++++++++++++++++++
 .../iotdb/db/engine/merge/task/MergeFileTask.java  |  8 +-
 .../db/engine/merge/task/RecoverMergeTask.java     |  4 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 87 ++++++---------------
 .../version/SimpleFileVersionController.java       |  2 +-
 .../db/integration/IoTDBAutoCreateSchemaIT.java    |  4 +-
 .../db/integration/IoTDBCreateStorageGroupIT.java  |  2 +-
 .../db/integration/IoTDBCreateTimeseriesIT.java    |  4 +-
 .../db/integration/IoTDBNewTsFileCompactionIT.java | 58 +++++++++++++-
 .../aggregation/IoTDBAggregationSmallDataIT.java   |  5 +-
 .../apache/iotdb/db/writelog/PerformanceTest.java  |  2 +-
 17 files changed, 227 insertions(+), 147 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 999b240..7e7be3e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -384,7 +384,7 @@ public class IoTDBConfig {
   private int queryTimeoutThreshold = 60000;
 
   /** compaction interval in ms */
-  private long compactionInterval = 30000;
+  private long compactionInterval = 10000;
 
   /** Replace implementation class of JDBC service */
   private String rpcImplClassName = TSServiceImpl.class.getName();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index b4f79d3..96933e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -284,9 +284,7 @@ public class StorageEngine implements IService {
                       storageGroup.getFullPath());
                 } catch (Exception e) {
                   logger.error(
-                      "meet error when recovering storage group: {}",
-                      storageGroup.getFullPath(),
-                      e);
+                      "meet error when recovered storage group: {}", storageGroup.getFullPath(), e);
                 }
                 return null;
               }));
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index e347300..cdbf180 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -37,11 +37,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPACTION_LOG_NAME;
@@ -53,8 +51,7 @@ public class CompactionMergeTaskPoolManager implements IService {
       LoggerFactory.getLogger(CompactionMergeTaskPoolManager.class);
   private static final CompactionMergeTaskPoolManager INSTANCE =
       new CompactionMergeTaskPoolManager();
-  private ScheduledExecutorService scheduledPool;
-  private ExecutorService pool;
+  private ScheduledThreadPoolExecutor pool;
   private Map<String, List<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
@@ -68,13 +65,10 @@ public class CompactionMergeTaskPoolManager implements IService {
   public void start() {
     if (pool == null) {
       this.pool =
-          IoTDBThreadPoolFactory.newFixedThreadPool(
-              IoTDBDescriptor.getInstance().getConfig().getCompactionThreadNum(),
-              ThreadName.COMPACTION_SERVICE.getName());
-      this.scheduledPool =
-          IoTDBThreadPoolFactory.newScheduledThreadPool(
-              IoTDBDescriptor.getInstance().getConfig().getCompactionThreadNum(),
-              ThreadName.COMPACTION_SERVICE.getName());
+          (ScheduledThreadPoolExecutor)
+              IoTDBThreadPoolFactory.newScheduledThreadPool(
+                  IoTDBDescriptor.getInstance().getConfig().getCompactionThreadNum(),
+                  ThreadName.COMPACTION_SERVICE.getName());
     }
     logger.info("Compaction task manager started.");
   }
@@ -83,7 +77,6 @@ public class CompactionMergeTaskPoolManager implements IService {
   public void stop() {
     if (pool != null) {
       pool.shutdownNow();
-      scheduledPool.shutdownNow();
       logger.info("Waiting for task pool to shut down");
       waitTermination();
       storageGroupTasks.clear();
@@ -93,7 +86,6 @@ public class CompactionMergeTaskPoolManager implements IService {
   @Override
   public void waitAndStop(long milliseconds) {
     if (pool != null) {
-      awaitTermination(scheduledPool, milliseconds);
       awaitTermination(pool, milliseconds);
       logger.info("Waiting for task pool to shut down");
       waitTermination();
@@ -149,7 +141,6 @@ public class CompactionMergeTaskPoolManager implements IService {
       }
     }
     pool = null;
-    scheduledPool = null;
     storageGroupTasks.clear();
     logger.info("CompactionManager stopped");
   }
@@ -195,25 +186,13 @@ public class CompactionMergeTaskPoolManager implements IService {
   }
 
   public void init(Runnable function) {
-    scheduledPool.scheduleWithFixedDelay(
+    pool.scheduleWithFixedDelay(
         function, 1000, config.getCompactionInterval(), TimeUnit.MILLISECONDS);
   }
 
-  public synchronized void submitTask(StorageGroupCompactionTask storageGroupCompactionTask)
-      throws RejectedExecutionException {
-    if (pool != null && !pool.isTerminated()) {
-      String storageGroup = storageGroupCompactionTask.getStorageGroupName();
-      boolean isCompacting = sgCompactionStatus.computeIfAbsent(storageGroup, k -> false);
-      if (isCompacting) {
-        return;
-      }
-      storageGroupCompactionTask.setSgCompactionStatus(sgCompactionStatus);
-      sgCompactionStatus.put(storageGroup, true);
-      Future<Void> future = pool.submit(storageGroupCompactionTask);
-      storageGroupTasks
-          .computeIfAbsent(storageGroup, k -> new CopyOnWriteArrayList<>())
-          .add(future);
-    }
+  @TestOnly
+  public synchronized int getCompactionTaskNum() {
+    return pool.getActiveCount();
   }
 
   public boolean isTerminated() {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index dde54a7..838006c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -63,14 +63,13 @@ public abstract class TsFileManagement {
 
   public volatile boolean isUnseqMerging = false;
   public volatile boolean isSeqMerging = false;
+  public volatile boolean recovered = false;
   /**
    * This is the modification file of the result of the current merge. Because the merged file may
    * be invisible at this moment, without this, deletion/update during merge could be lost.
    */
   public ModificationFile mergingModification;
 
-  private long mergeStartTime;
-
   /** whether execute merge chunk in this task */
   protected boolean isMergeExecutedInCurrentTask = false;
 
@@ -258,7 +257,7 @@ public abstract class TsFileManagement {
           tsFileResource.setMerging(true);
         }
 
-        mergeStartTime = System.currentTimeMillis();
+        long mergeStartTime = System.currentTimeMillis();
         MergeTask mergeTask =
             new MergeTask(
                 mergeResource,
@@ -279,17 +278,6 @@ public abstract class TsFileManagement {
               mergeFiles[0].size(),
               mergeFiles[1].size());
         }
-        // wait until unseq merge has finished
-        while (isUnseqMerging) {
-          try {
-            Thread.sleep(200);
-          } catch (InterruptedException e) {
-            logger.error("{} [Compaction] shutdown", storageGroupName, e);
-            Thread.currentThread().interrupt();
-            return false;
-          }
-        }
-        return true;
       } catch (MergeException | IOException e) {
         logger.error("{} cannot select file for merge", storageGroupName, e);
         return false;
@@ -297,6 +285,17 @@ public abstract class TsFileManagement {
     } finally {
       writeUnlock();
     }
+    // wait until unseq merge has finished
+    while (isUnseqMerging) {
+      try {
+        Thread.sleep(200);
+      } catch (InterruptedException e) {
+        logger.error("{} [Compaction] shutdown", storageGroupName, e);
+        Thread.currentThread().interrupt();
+        return false;
+      }
+    }
+    return true;
   }
 
   private IMergeFileSelector getMergeFileSelector(long budget, MergeResource resource) {
@@ -423,7 +422,10 @@ public abstract class TsFileManagement {
         File mergedFile =
             FSFactoryProducer.getFSFactory().getFile(seqFile.getTsFilePath() + MERGE_SUFFIX);
         if (mergedFile.exists()) {
-          mergedFile.delete();
+          boolean deletionSuccess = mergedFile.delete();
+          if (!deletionSuccess) {
+            logger.warn("fail to delete {}", mergedFile);
+          }
         }
         updateMergeModification(seqFile);
       } finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 5f260f3..52141cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -444,6 +444,13 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
         if (fullMerge) {
           // get tsfile resource from list, as they have been recovered in StorageGroupProcessor
           TsFileResource targetTsFileResource = getRecoverTsFileResource(targetFile, isSeq);
+          if (targetTsFileResource == null) {
+            targetTsFileResource = getTsFileResource(targetFile, isSeq);
+            if (targetTsFileResource == null) {
+              logger.warn("get null targetTsFileResource");
+              return;
+            }
+          }
           long timePartition = targetTsFileResource.getTimePartition();
           RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(target);
           // if not complete compaction, resume merge
@@ -612,6 +619,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
 
   @Override
   protected void merge(long timePartition) {
+    // compacting sequence file in one time partition
     isMergeExecutedInCurrentTask =
         merge(
             forkedSequenceTsFileResources, true, timePartition, seqLevelNum, seqFileNumInEachLevel);
@@ -888,7 +896,9 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
           if (targetFile.exists()) {
             logger.error(
                 "{} restore delete target file {} ", storageGroupName, targetFile.getName());
-            targetFile.delete();
+            if (!targetFile.delete()) {
+              logger.warn("fail to delete {}", targetFile);
+            }
           }
         }
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
index f9112e1..aef6e2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
@@ -68,7 +68,6 @@ public class MergeManager implements IService, MergeManagerMBean {
   private AtomicInteger threadCnt = new AtomicInteger();
   private ThreadPoolExecutor mergeTaskPool;
   private ThreadPoolExecutor mergeChunkSubTaskPool;
-  private ScheduledExecutorService timedMergeThreadPool;
   private ScheduledExecutorService taskCleanerThreadPool;
 
   private Map<String, Set<MergeFuture>> storageGroupMainTasks = new ConcurrentHashMap<>();
@@ -143,13 +142,6 @@ public class MergeManager implements IService, MergeManagerMBean {
           new MergeThreadPool(
               threadNum * chunkSubThreadNum,
               r -> new Thread(r, "MergeChunkSubThread-" + threadCnt.getAndIncrement()));
-      long mergeInterval = IoTDBDescriptor.getInstance().getConfig().getMergeIntervalSec();
-      if (mergeInterval > 0) {
-        timedMergeThreadPool =
-            Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "TimedMergeThread"));
-        timedMergeThreadPool.scheduleAtFixedRate(
-            this::mergeAll, mergeInterval, mergeInterval, TimeUnit.SECONDS);
-      }
 
       taskCleanerThreadPool =
           Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "MergeTaskCleaner"));
@@ -161,10 +153,6 @@ public class MergeManager implements IService, MergeManagerMBean {
   @Override
   public void stop() {
     if (mergeTaskPool != null) {
-      if (timedMergeThreadPool != null) {
-        timedMergeThreadPool.shutdownNow();
-        timedMergeThreadPool = null;
-      }
       taskCleanerThreadPool.shutdownNow();
       taskCleanerThreadPool = null;
       mergeTaskPool.shutdownNow();
@@ -199,10 +187,6 @@ public class MergeManager implements IService, MergeManagerMBean {
   @Override
   public void waitAndStop(long milliseconds) {
     if (mergeTaskPool != null) {
-      if (timedMergeThreadPool != null) {
-        awaitTermination(timedMergeThreadPool, milliseconds);
-        timedMergeThreadPool = null;
-      }
       awaitTermination(taskCleanerThreadPool, milliseconds);
       taskCleanerThreadPool = null;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/CompactionMergeRecoverTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/CompactionMergeRecoverTask.java
new file mode 100644
index 0000000..8385934
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/CompactionMergeRecoverTask.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.task;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.TsFileManagement;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+public class CompactionMergeRecoverTask implements Runnable {
+
+  private static final Logger logger = LoggerFactory.getLogger(CompactionMergeRecoverTask.class);
+
+  private TsFileManagement.CompactionRecoverTask compactionRecoverTask;
+  private RecoverMergeTask recoverMergeTask;
+  private TsFileManagement tsFileManagement;
+  private String storageGroupSysDir;
+  private String storageGroupName;
+
+  public CompactionMergeRecoverTask(
+      TsFileManagement tsFileManagement,
+      List<TsFileResource> seqFiles,
+      List<TsFileResource> unseqFiles,
+      String storageGroupSysDir,
+      MergeCallback callback,
+      String taskName,
+      boolean fullMerge,
+      String storageGroupName,
+      StorageGroupProcessor.CloseCompactionMergeCallBack closeCompactionMergeCallBack) {
+    this.tsFileManagement = tsFileManagement;
+    this.compactionRecoverTask =
+        this.tsFileManagement.new CompactionRecoverTask(closeCompactionMergeCallBack);
+    this.storageGroupSysDir = storageGroupSysDir;
+    this.storageGroupName = storageGroupName;
+    this.recoverMergeTask =
+        new RecoverMergeTask(
+            seqFiles,
+            unseqFiles,
+            storageGroupSysDir,
+            callback,
+            taskName,
+            fullMerge,
+            storageGroupName);
+  }
+
+  @Override
+  public void run() {
+    tsFileManagement.recovered = false;
+    try {
+      recoverMergeTask.recoverMerge(
+          IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
+      File mergingMods =
+          SystemFileFactory.INSTANCE.getFile(
+              storageGroupSysDir, StorageGroupProcessor.MERGING_MODIFICATION_FILE_NAME);
+      if (!IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()) {
+        mergingMods.delete();
+      }
+    } catch (MetadataException | IOException e) {
+      logger.error(e.getMessage(), e);
+    }
+    compactionRecoverTask.call();
+    tsFileManagement.recovered = true;
+    logger.info("{} Compaction recover finish", storageGroupName);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index 0c4241b..34735c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -207,7 +207,9 @@ public class MergeFileTask {
       mergeLogger.logFileMergeEnd();
       logger.debug("{} moved merged chunks of {} to the old file", taskName, seqFile);
 
-      newFileWriter.getFile().delete();
+      if (!newFileWriter.getFile().delete()) {
+        logger.warn("fail to delete {}", newFileWriter.getFile());
+      }
       // change tsFile name
       File nextMergeVersionFile = modifyTsFileNameUnseqMergCnt(seqFile.getTsFile());
       fsFactory.moveFile(seqFile.getTsFile(), nextMergeVersionFile);
@@ -358,7 +360,9 @@ public class MergeFileTask {
       FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());
 
       // change tsFile name
-      seqFile.getTsFile().delete();
+      if (!seqFile.getTsFile().delete()) {
+        logger.warn("fail to delete {}", seqFile.getTsFile());
+      }
       File nextMergeVersionFile = modifyTsFileNameUnseqMergCnt(seqFile.getTsFile());
       fsFactory.moveFile(fileWriter.getFile(), nextMergeVersionFile);
       fsFactory.moveFile(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
index 32cd897..976c374 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
@@ -205,11 +205,11 @@ public class RecoverMergeTask extends MergeTask {
   // scan the metadata to compute how many chunks are merged/unmerged so at last we can decide to
   // move the merged chunks or the unmerged chunks
   private void recoverChunkCounts() throws IOException {
-    logger.info("{} recovering chunk counts", taskName);
+    logger.info("{} recovered chunk counts", taskName);
     int fileCnt = 1;
     for (TsFileResource tsFileResource : resource.getSeqFiles()) {
       logger.info(
-          "{} recovering {}  {}/{}",
+          "{} recovered {}  {}/{}",
           taskName,
           tsFileResource.getTsFile().getName(),
           fileCnt,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 789092a..840a482 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -32,7 +32,7 @@ import org.apache.iotdb.db.engine.flush.CloseFileListener;
 import org.apache.iotdb.db.engine.flush.FlushListener;
 import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager;
-import org.apache.iotdb.db.engine.merge.task.RecoverMergeTask;
+import org.apache.iotdb.db.engine.merge.task.CompactionMergeRecoverTask;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -103,7 +103,6 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -487,27 +486,21 @@ public class StorageGroupProcessor {
         this.tsFileManagement.mergingModification = new ModificationFile(mergingMods.getPath());
       }
 
-      RecoverMergeTask recoverMergeTask =
-          new RecoverMergeTask(
+      CompactionMergeRecoverTask recoverTask =
+          new CompactionMergeRecoverTask(
+              tsFileManagement,
               new ArrayList<>(tsFileManagement.getTsFileList(true)),
               tsFileManagement.getTsFileList(false),
               storageGroupSysDir.getPath(),
               tsFileManagement::mergeEndAction,
               taskName,
               IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(),
-              logicalStorageGroupName);
-      logger.info(
-          "{} - {} a RecoverMergeTask {} starts...",
-          logicalStorageGroupName,
-          virtualStorageGroupId,
-          taskName);
-      recoverMergeTask.recoverMerge(
-          IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
-      if (!IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()) {
-        mergingMods.delete();
-      }
-      recoverCompaction();
-    } catch (IOException | MetadataException e) {
+              logicalStorageGroupName,
+              this::closeCompactionRecoverCallBack);
+
+      new Thread(recoverTask).start();
+      logger.info("submit a compaction merge recover task");
+    } catch (IOException e) {
       throw new StorageGroupProcessorException(e);
     }
   }
@@ -529,35 +522,6 @@ public class StorageGroupProcessor {
     }
   }
 
-  private void recoverCompaction() {
-    if (IoTDBDescriptor.getInstance().getConfig().getCompactionStrategy()
-        == CompactionStrategy.NO_COMPACTION) {
-      return;
-    }
-    if (!CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
-      logger.info(
-          "{} - {} submit a compaction recover merge task",
-          logicalStorageGroupName,
-          virtualStorageGroupId);
-      try {
-        CompactionMergeTaskPoolManager.getInstance()
-            .submitTask(
-                tsFileManagement.new CompactionRecoverTask(this::closeCompactionRecoverCallBack));
-      } catch (RejectedExecutionException e) {
-        this.closeCompactionRecoverCallBack(false, 0);
-        logger.error(
-            "{} - {} compaction submit task failed",
-            logicalStorageGroupName,
-            virtualStorageGroupId,
-            e);
-      }
-    } else {
-      logger.error(
-          "{} compaction pool not started ,recover failed",
-          logicalStorageGroupName + "-" + virtualStorageGroupId);
-    }
-  }
-
   private void updatePartitionFileVersion(long partitionNum, long fileVersion) {
     long oldVersion = partitionMaxFileVersions.getOrDefault(partitionNum, 0L);
     if (fileVersion > oldVersion) {
@@ -626,7 +590,7 @@ public class StorageGroupProcessor {
       }
 
       // old version
-      // some TsFileResource may be being persisted when the system crashed, try recovering such
+      // some TsFileResource may be being persisted when the system crashed, try recovered such
       // resources
       continueFailedRenames(fileFolder, TEMP_SUFFIX);
 
@@ -640,7 +604,7 @@ public class StorageGroupProcessor {
           if (!partitionFolder.isDirectory()) {
             logger.warn("{} is not a directory.", partitionFolder.getAbsolutePath());
           } else if (!partitionFolder.getName().equals(IoTDBConstant.UPGRADE_FOLDER_NAME)) {
-            // some TsFileResource may be being persisted when the system crashed, try recovering
+            // some TsFileResource may be being persisted when the system crashed, try recovered
             // such
             // resources
             continueFailedRenames(partitionFolder, TEMP_SUFFIX);
@@ -1956,14 +1920,6 @@ public class StorageGroupProcessor {
     logger.info(
         "signal closing storage group condition in {}",
         logicalStorageGroupName + "-" + virtualStorageGroupId);
-
-    if (IoTDBDescriptor.getInstance().getConfig().getCompactionStrategy()
-        == CompactionStrategy.LEVEL_COMPACTION) {
-      CompactionMergeTaskPoolManager.getInstance()
-          .submitTask(
-              new CompactionOnePartitionTask(
-                  logicalStorageGroupName, tsFileProcessor.getTimeRangeId()));
-    }
   }
 
   public class CompactionOnePartitionTask extends StorageGroupCompactionTask {
@@ -2005,14 +1961,12 @@ public class StorageGroupProcessor {
       return;
     }
     CompactionMergeTaskPoolManager.getInstance().clearCompactionStatus(logicalStorageGroupName);
-    if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
-      logger.info(
-          "{}-{} recover finished, submit continuous compaction task",
-          logicalStorageGroupName,
-          virtualStorageGroupId);
+    logger.info(
+        "{}-{} recover finished, submit scheduled compaction task",
+        logicalStorageGroupName,
+        virtualStorageGroupId);
 
-      CompactionMergeTaskPoolManager.getInstance().init(this::merge);
-    }
+    CompactionMergeTaskPoolManager.getInstance().init(this::merge);
   }
 
   /** close compaction merge callback, to release some locks */
@@ -2119,9 +2073,12 @@ public class StorageGroupProcessor {
   }
 
   public void merge() {
+    if (!tsFileManagement.recovered) {
+      // doing recovered task
+      return;
+    }
     if (config.getCompactionStrategy() == CompactionStrategy.LEVEL_COMPACTION) {
-      CompactionMergeTaskPoolManager.getInstance()
-          .submitTask(new CompactionAllPartitionTask(logicalStorageGroupName));
+      new CompactionAllPartitionTask(logicalStorageGroupName).call();
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java b/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
index 0a73f16..aafe0ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
@@ -36,7 +36,7 @@ public class SimpleFileVersionController implements VersionController {
   public static final String UPGRADE_DIR = "upgrade";
   /**
    * Every time currVersion - prevVersion >= saveInterval, currVersion is persisted and prevVersion
-   * is set to currVersion. When recovering from file, the version number is automatically increased
+   * is set to currVersion. When recovered from file, the version number is automatically increased
    * by saveInterval to avoid conflicts.
    */
   private static long saveInterval = 100;
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAutoCreateSchemaIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAutoCreateSchemaIT.java
index abac508..912fa33 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAutoCreateSchemaIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAutoCreateSchemaIT.java
@@ -174,7 +174,7 @@ public class IoTDBAutoCreateSchemaIT {
     EnvironmentUtils.stopDaemon();
     setUp();
 
-    // ensure that insert data in cache is right after recovering.
+    // ensure that insert data in cache is right after recovered.
     insertAutoCreate1Tool();
   }
 
@@ -222,7 +222,7 @@ public class IoTDBAutoCreateSchemaIT {
     EnvironmentUtils.stopDaemon();
     setUp();
 
-    // ensure that storage group in cache is right after recovering.
+    // ensure that storage group in cache is right after recovered.
     InsertAutoCreate2Tool(storageGroup, timeSeriesPrefix);
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCreateStorageGroupIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCreateStorageGroupIT.java
index 28d75d1..4333199 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCreateStorageGroupIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCreateStorageGroupIT.java
@@ -75,7 +75,7 @@ public class IoTDBCreateStorageGroupIT {
     EnvironmentUtils.stopDaemon();
     setUp();
 
-    // ensure StorageGroup in cache is right after recovering.
+    // ensure StorageGroup in cache is right after recovered.
     createStorageGroupTool(storageGroups);
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCreateTimeseriesIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCreateTimeseriesIT.java
index 5174419..ba076bb 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCreateTimeseriesIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCreateTimeseriesIT.java
@@ -82,7 +82,7 @@ public class IoTDBCreateTimeseriesIT {
     EnvironmentUtils.stopDaemon();
     setUp();
 
-    // ensure timeseries in cache is right after recovering.
+    // ensure timeseries in cache is right after recovered.
     createTimeSeries1Tool(timeSeriesArray);
   }
 
@@ -129,7 +129,7 @@ public class IoTDBCreateTimeseriesIT {
     EnvironmentUtils.stopDaemon();
     setUp();
 
-    // ensure storage group in cache is right after recovering.
+    // ensure storage group in cache is right after recovered.
     createTimeSeries2Tool(storageGroup);
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBNewTsFileCompactionIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBNewTsFileCompactionIT.java
index c327ef1..7bc0c81 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBNewTsFileCompactionIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBNewTsFileCompactionIT.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.integration;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
 import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
 import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
@@ -32,6 +33,8 @@ import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -46,6 +49,8 @@ import static org.junit.Assert.fail;
 
 public class IoTDBNewTsFileCompactionIT {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBNewTsFileCompactionIT.class);
+
   private int prevSeqLevelFileNum;
   private int prevSeqLevelNum;
   private int prevMergePagePointNumber;
@@ -53,7 +58,7 @@ public class IoTDBNewTsFileCompactionIT {
   private CompactionStrategy preCompactionStrategy;
   private PartialPath storageGroupPath;
   // the unit is ns
-  private static final long MAX_WAIT_TIME_FOR_MERGE = Long.MAX_VALUE;
+  private static final long MAX_WAIT_TIME_FOR_MERGE = 1L * 60L * 1000L * 1000L * 1000L;
   private static final float FLOAT_DELTA = 0.00001f;
 
   @Before
@@ -74,6 +79,7 @@ public class IoTDBNewTsFileCompactionIT {
     IoTDBDescriptor.getInstance()
         .getConfig()
         .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+    IoTDBDescriptor.getInstance().getConfig().setCompactionThreadNum(10);
     EnvironmentUtils.envSetUp();
     Class.forName(Config.JDBC_DRIVER_NAME);
 
@@ -127,7 +133,9 @@ public class IoTDBNewTsFileCompactionIT {
       statement.execute("INSERT INTO root.sg1.d1(time,s1) values(2, 2)");
       statement.execute("FLUSH");
 
+      LOGGER.warn("Waiting for merge to finish");
       assertTrue(waitForMergeFinish());
+      LOGGER.warn("Merge finish");
 
       int cnt;
       try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -181,7 +189,9 @@ public class IoTDBNewTsFileCompactionIT {
       statement.execute("INSERT INTO root.sg1.d1(time,s1) values(3, 3)");
       statement.execute("FLUSH");
 
+      LOGGER.warn("Waiting for merge to finish");
       assertTrue(waitForMergeFinish());
+      LOGGER.warn("Merge Finish");
 
       int cnt;
       try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -234,7 +244,9 @@ public class IoTDBNewTsFileCompactionIT {
       statement.execute("INSERT INTO root.sg1.d1(time,s1) values(3, 3)");
       statement.execute("FLUSH");
 
+      LOGGER.warn("Waiting for merge to finish");
       assertTrue(waitForMergeFinish());
+      LOGGER.warn("Merge Finish");
 
       int cnt;
       try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -293,7 +305,9 @@ public class IoTDBNewTsFileCompactionIT {
       statement.execute("INSERT INTO root.sg1.d1(time,s1) values(5, 5)");
       statement.execute("FLUSH");
 
+      LOGGER.warn("Waiting for merge to finish");
       assertTrue(waitForMergeFinish());
+      LOGGER.warn("Merge Finish");
 
       int cnt;
       try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -347,7 +361,9 @@ public class IoTDBNewTsFileCompactionIT {
       statement.execute("INSERT INTO root.sg1.d1(time,s1) values(3, 3)");
       statement.execute("FLUSH");
 
+      LOGGER.warn("Waiting for merge to finish");
       assertTrue(waitForMergeFinish());
+      LOGGER.warn("Merge Finish");
 
       int cnt;
       try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -406,7 +422,9 @@ public class IoTDBNewTsFileCompactionIT {
 
       statement.execute("FLUSH");
 
+      LOGGER.warn("Waiting for merge to finish");
       assertTrue(waitForMergeFinish());
+      LOGGER.warn("Merge Finish");
 
       int cnt;
       try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -461,7 +479,9 @@ public class IoTDBNewTsFileCompactionIT {
       statement.execute("INSERT INTO root.sg1.d1(time,s1) values(4, 4)");
       statement.execute("FLUSH");
 
+      LOGGER.warn("Waiting for merge to finish");
       assertTrue(waitForMergeFinish());
+      LOGGER.warn("Merge Finish");
 
       int cnt;
       try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -523,7 +543,9 @@ public class IoTDBNewTsFileCompactionIT {
       statement.execute("INSERT INTO root.sg1.d1(time,s1) values(6, 6)");
       statement.execute("FLUSH");
 
+      LOGGER.warn("Waiting for merge to finish");
       assertTrue(waitForMergeFinish());
+      LOGGER.warn("Merge Finish");
 
       int cnt;
       try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -578,7 +600,9 @@ public class IoTDBNewTsFileCompactionIT {
       statement.execute("INSERT INTO root.sg1.d1(time,s1) values(3, 3)");
       statement.execute("FLUSH");
 
+      LOGGER.warn("Waiting for merge to finish");
       assertTrue(waitForMergeFinish());
+      LOGGER.warn("Merge Finish");
 
       int cnt;
       try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -635,7 +659,9 @@ public class IoTDBNewTsFileCompactionIT {
       statement.execute("INSERT INTO root.sg1.d1(time,s1) values(4, 4)");
       statement.execute("FLUSH");
 
+      LOGGER.warn("Waiting for merge to finish");
       assertTrue(waitForMergeFinish());
+      LOGGER.warn("Merge Finish");
 
       int cnt;
       try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -697,7 +723,9 @@ public class IoTDBNewTsFileCompactionIT {
       statement.execute("INSERT INTO root.sg1.d1(time,s1) values(4, 4)");
       statement.execute("FLUSH");
 
+      LOGGER.warn("Waiting for merge to finish");
       assertTrue(waitForMergeFinish());
+      LOGGER.warn("Merge Finish");
 
       int cnt;
       try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -759,7 +787,9 @@ public class IoTDBNewTsFileCompactionIT {
       statement.execute("INSERT INTO root.sg1.d1(time,s1) values(6, 6)");
       statement.execute("FLUSH");
 
+      LOGGER.warn("Waiting for merge to finish");
       assertTrue(waitForMergeFinish());
+      LOGGER.warn("Merge Finish");
 
       int cnt;
       try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -819,7 +849,9 @@ public class IoTDBNewTsFileCompactionIT {
       statement.execute("INSERT INTO root.sg1.d1(time,s1) values(5, 5)");
       statement.execute("FLUSH");
 
+      LOGGER.warn("Waiting for merge to finish");
       assertTrue(waitForMergeFinish());
+      LOGGER.warn("Merge Finish");
 
       int cnt;
       try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -882,7 +914,9 @@ public class IoTDBNewTsFileCompactionIT {
       statement.execute("INSERT INTO root.sg1.d1(time,s1) values(6, 6)");
       statement.execute("FLUSH");
 
+      LOGGER.warn("Waiting for merge to finish");
       assertTrue(waitForMergeFinish());
+      LOGGER.warn("Merge Finish");
 
       int cnt;
       try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -945,7 +979,9 @@ public class IoTDBNewTsFileCompactionIT {
       statement.execute("INSERT INTO root.sg1.d1(time,s1) values(6, 6)");
       statement.execute("FLUSH");
 
+      LOGGER.warn("Waiting for merge to finish");
       assertTrue(waitForMergeFinish());
+      LOGGER.warn("Merge Finish");
 
       int cnt;
       try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -1011,7 +1047,9 @@ public class IoTDBNewTsFileCompactionIT {
       statement.execute("INSERT INTO root.sg1.d1(time,s1) values(8, 8)");
       statement.execute("FLUSH");
 
+      LOGGER.warn("Waiting for merge to finish");
       assertTrue(waitForMergeFinish());
+      LOGGER.warn("Merge Finish");
 
       int cnt;
       try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
@@ -1043,14 +1081,32 @@ public class IoTDBNewTsFileCompactionIT {
         (LevelCompactionTsFileManagement) storageGroupProcessor.getTsFileManagement();
 
     long startTime = System.nanoTime();
+    long intervalTime = startTime;
     // get the size of level 1's tsfile list to judge whether merge is finished
     while (tsFileManagement.getSequenceTsFileResources().get(0L).size() < 2
         || tsFileManagement.getSequenceTsFileResources().get(0L).get(1).size() != 1) {
       TimeUnit.MILLISECONDS.sleep(100);
       // wait too long, just break
       if ((System.nanoTime() - startTime) >= MAX_WAIT_TIME_FOR_MERGE) {
+        LOGGER.error("Unable to wait for compaction finish");
+        fail();
         break;
       }
+      if ((System.nanoTime() - intervalTime) >= 20L * 1000L * 1000L * 1000L) {
+        intervalTime = System.nanoTime();
+        LOGGER.warn(
+            "The number of tsfile level: {}",
+            tsFileManagement.getSequenceTsFileResources().get(0L).size());
+        LOGGER.warn(
+            "The number of tsfile in level 0: {}",
+            tsFileManagement.getSequenceTsFileResources().get(0L).get(0).size());
+        LOGGER.warn(
+            "The number of tsfile in level 1: {}",
+            tsFileManagement.getSequenceTsFileResources().get(0L).get(1).size());
+        LOGGER.warn(
+            "The number of current compaction task num {}",
+            CompactionMergeTaskPoolManager.getInstance().getCompactionTaskNum());
+      }
     }
     return tsFileManagement.getSequenceTsFileResources().get(0L).get(1).size() == 1;
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationSmallDataIT.java b/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationSmallDataIT.java
index a077104..cca305d 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationSmallDataIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationSmallDataIT.java
@@ -225,9 +225,8 @@ public class IoTDBAggregationSmallDataIT {
         fail();
       } catch (IoTDBSQLException e) {
         Assert.assertTrue(
-            e.toString()
-                .contains(
-                    "500: [INTERNAL_SERVER_ERROR] Exception occurred while executing executeStatement. Binary statistics does not support: max"));
+            e.toString().contains("500: [INTERNAL_SERVER_ERROR] Exception occurred while executing")
+                && e.toString().contains("Binary statistics does not support: max"));
       }
 
       boolean hasResultSet =
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
index 6a1df39..e726496 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java
@@ -205,7 +205,7 @@ public class PerformanceTest {
       logNode.forceSync();
       long time = System.currentTimeMillis();
       System.out.println(
-          3000000 + " logs use " + (System.currentTimeMillis() - time) + "ms when recovering ");
+          3000000 + " logs use " + (System.currentTimeMillis() - time) + "ms when recovered ");
     } finally {
       ByteBuffer[] array = logNode.delete();
       for (ByteBuffer byteBuffer : array) {