You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/12/29 10:03:21 UTC

[iotdb] branch rel/0.11 updated: [To rel/0.11] compaction not block flush (#2364)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.11 by this push:
     new 093a9e5  [To rel/0.11] compaction not block flush (#2364)
093a9e5 is described below

commit 093a9e5141fa0bcd50f79c76998e4d292c18c800
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Tue Dec 29 18:03:07 2020 +0800

    [To rel/0.11] compaction not block flush (#2364)
    
    * compaction_not_block_flush_to_0.11
    
    * remove useless
    
    Co-authored-by: zhanglingzhe <su...@foxmail.com>
---
 .../compaction/CompactionMergeTaskPoolManager.java |  26 ++
 .../db/engine/compaction/TsFileManagement.java     |   5 -
 .../level/LevelCompactionTsFileManagement.java     | 319 ++++++++++++---------
 .../no/NoCompactionTsFileManagement.java           |   5 -
 .../engine/compaction/utils/CompactionUtils.java   |   4 +-
 .../engine/storagegroup/StorageGroupProcessor.java |  19 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   3 +
 7 files changed, 213 insertions(+), 168 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index 3473d54..218be58 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.db.engine.compaction;
 
+import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPACTION_LOG_NAME;
+
+import java.io.File;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -27,6 +30,9 @@ import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.utils.FilePathUtils;
+import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,6 +80,26 @@ public class CompactionMergeTaskPoolManager implements IService {
     }
   }
 
+  @TestOnly
+  public void waitAllCompactionFinish() {
+    if (pool != null) {
+      File sgDir = FSFactoryProducer.getFSFactory().getFile(
+          FilePathUtils.regularizePath(IoTDBDescriptor.getInstance().getConfig().getSystemDir())
+              + "storage_groups");
+      File[] subDirList = sgDir.listFiles();
+      if(subDirList!=null) {
+        for (File subDir : subDirList) {
+          while (FSFactoryProducer.getFSFactory().getFile(
+              subDir.getAbsoluteFile() + File.separator + subDir.getName() + COMPACTION_LOG_NAME)
+              .exists()) {
+            // wait
+          }
+        }
+      }
+      logger.info("All compaction task finish");
+    }
+  }
+
   private void waitTermination() {
     long startTime = System.currentTimeMillis();
     while (!pool.isTerminated()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index f7f68c3..096abec 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -74,11 +74,6 @@ public abstract class TsFileManagement {
   }
 
   /**
-   * get the TsFile list which has been completed hot compacted
-   */
-  public abstract List<TsFileResource> getStableTsFileList(boolean sequence);
-
-  /**
    * get the TsFile list in sequence
    */
   public abstract List<TsFileResource> getTsFileList(boolean sequence);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index e6ff2ec..33e4e11 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -31,21 +31,24 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.compaction.TsFileManagement;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionLogAnalyzer;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionLogger;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionUtils;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
@@ -60,21 +63,21 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
   private static final Logger logger = LoggerFactory
       .getLogger(LevelCompactionTsFileManagement.class);
 
-
-  private final int seqLevelNum = IoTDBDescriptor.getInstance().getConfig().getSeqLevelNum();
-  private final int seqFileNumInEachLevel = IoTDBDescriptor.getInstance().getConfig()
-      .getSeqFileNumInEachLevel();
-  private final int unseqLevelNum = IoTDBDescriptor.getInstance().getConfig()
-      .getUnseqLevelNum();
-  private final int unseqFileNumInEachLevel = IoTDBDescriptor.getInstance().getConfig()
-      .getSeqFileNumInEachLevel();
+  private final int seqLevelNum = Math
+      .max(IoTDBDescriptor.getInstance().getConfig().getSeqLevelNum(), 1);
+  private final int seqFileNumInEachLevel = Math
+      .max(IoTDBDescriptor.getInstance().getConfig().getSeqFileNumInEachLevel(), 1);
+  private final int unseqLevelNum = Math
+      .max(IoTDBDescriptor.getInstance().getConfig().getUnseqLevelNum(), 1);
+  private final int unseqFileNumInEachLevel = Math
+      .max(IoTDBDescriptor.getInstance().getConfig().getUnseqFileNumInEachLevel(), 1);
 
   private final boolean enableUnseqCompaction = IoTDBDescriptor.getInstance().getConfig()
       .isEnableUnseqCompaction();
   private final boolean isForceFullMerge = IoTDBDescriptor.getInstance().getConfig()
       .isForceFullMerge();
   // First map is partition list; Second list is level list; Third list is file list in level;
-  private final Map<Long, List<TreeSet<TsFileResource>>> sequenceTsFileResources = new ConcurrentSkipListMap<>();
+  private final Map<Long, List<SortedSet<TsFileResource>>> sequenceTsFileResources = new ConcurrentSkipListMap<>();
   private final Map<Long, List<List<TsFileResource>>> unSequenceTsFileResources = new ConcurrentSkipListMap<>();
   private final List<List<TsFileResource>> forkedSequenceTsFileResources = new ArrayList<>();
   private final List<List<TsFileResource>> forkedUnSequenceTsFileResources = new ArrayList<>();
@@ -99,13 +102,17 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
     if (sequence) {
       if (sequenceTsFileResources.containsKey(timePartitionId)) {
         if (sequenceTsFileResources.get(timePartitionId).size() > level) {
-          sequenceTsFileResources.get(timePartitionId).get(level).removeAll(mergeTsFiles);
+          synchronized (sequenceTsFileResources) {
+            sequenceTsFileResources.get(timePartitionId).get(level).removeAll(mergeTsFiles);
+          }
         }
       }
     } else {
       if (unSequenceTsFileResources.containsKey(timePartitionId)) {
         if (unSequenceTsFileResources.get(timePartitionId).size() > level) {
-          unSequenceTsFileResources.get(timePartitionId).get(level).removeAll(mergeTsFiles);
+          synchronized (unSequenceTsFileResources) {
+            unSequenceTsFileResources.get(timePartitionId).get(level).removeAll(mergeTsFiles);
+          }
         }
       }
     }
@@ -126,33 +133,23 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
   }
 
   @Override
-  public List<TsFileResource> getStableTsFileList(boolean sequence) {
-    List<TsFileResource> result = new ArrayList<>();
-    if (sequence) {
-      for (List<TreeSet<TsFileResource>> sequenceTsFileList : sequenceTsFileResources.values()) {
-        result.addAll(sequenceTsFileList.get(seqLevelNum - 1));
-      }
-    } else {
-      for (List<List<TsFileResource>> unSequenceTsFileList : unSequenceTsFileResources.values()) {
-        result.addAll(unSequenceTsFileList.get(unseqLevelNum - 1));
-      }
-    }
-    return result;
-  }
-
-  @Override
   public List<TsFileResource> getTsFileList(boolean sequence) {
     List<TsFileResource> result = new ArrayList<>();
     if (sequence) {
-      for (List<TreeSet<TsFileResource>> sequenceTsFileList : sequenceTsFileResources.values()) {
-        for (int i = sequenceTsFileList.size() - 1; i >= 0; i--) {
-          result.addAll(sequenceTsFileList.get(i));
+      synchronized (sequenceTsFileResources) {
+        for (List<SortedSet<TsFileResource>> sequenceTsFileList : sequenceTsFileResources
+            .values()) {
+          for (int i = sequenceTsFileList.size() - 1; i >= 0; i--) {
+            result.addAll(sequenceTsFileList.get(i));
+          }
         }
       }
     } else {
-      for (List<List<TsFileResource>> unSequenceTsFileList : unSequenceTsFileResources.values()) {
-        for (int i = unSequenceTsFileList.size() - 1; i >= 0; i--) {
-          result.addAll(unSequenceTsFileList.get(i));
+      synchronized (unSequenceTsFileResources) {
+        for (List<List<TsFileResource>> unSequenceTsFileList : unSequenceTsFileResources.values()) {
+          for (int i = unSequenceTsFileList.size() - 1; i >= 0; i--) {
+            result.addAll(unSequenceTsFileList.get(i));
+          }
         }
       }
     }
@@ -167,14 +164,18 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
   @Override
   public void remove(TsFileResource tsFileResource, boolean sequence) {
     if (sequence) {
-      for (TreeSet<TsFileResource> sequenceTsFileResource : sequenceTsFileResources
-          .get(tsFileResource.getTimePartition())) {
-        sequenceTsFileResource.remove(tsFileResource);
+      synchronized (sequenceTsFileResources) {
+        for (SortedSet<TsFileResource> sequenceTsFileResource : sequenceTsFileResources
+            .get(tsFileResource.getTimePartition())) {
+          sequenceTsFileResource.remove(tsFileResource);
+        }
       }
     } else {
-      for (List<TsFileResource> unSequenceTsFileResource : unSequenceTsFileResources
-          .get(tsFileResource.getTimePartition())) {
-        unSequenceTsFileResource.remove(tsFileResource);
+      synchronized (unSequenceTsFileResources) {
+        for (List<TsFileResource> unSequenceTsFileResource : unSequenceTsFileResources
+            .get(tsFileResource.getTimePartition())) {
+          unSequenceTsFileResource.remove(tsFileResource);
+        }
       }
     }
   }
@@ -182,17 +183,21 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
   @Override
   public void removeAll(List<TsFileResource> tsFileResourceList, boolean sequence) {
     if (sequence) {
-      for (List<TreeSet<TsFileResource>> partitionSequenceTsFileResource : sequenceTsFileResources
-          .values()) {
-        for (TreeSet<TsFileResource> levelTsFileResource : partitionSequenceTsFileResource) {
-          levelTsFileResource.removeAll(tsFileResourceList);
+      synchronized (sequenceTsFileResources) {
+        for (List<SortedSet<TsFileResource>> partitionSequenceTsFileResource : sequenceTsFileResources
+            .values()) {
+          for (SortedSet<TsFileResource> levelTsFileResource : partitionSequenceTsFileResource) {
+            levelTsFileResource.removeAll(tsFileResourceList);
+          }
         }
       }
     } else {
-      for (List<List<TsFileResource>> partitionUnSequenceTsFileResource : unSequenceTsFileResources
-          .values()) {
-        for (List<TsFileResource> levelTsFileResource : partitionUnSequenceTsFileResource) {
-          levelTsFileResource.removeAll(tsFileResourceList);
+      synchronized (unSequenceTsFileResources) {
+        for (List<List<TsFileResource>> partitionUnSequenceTsFileResource : unSequenceTsFileResources
+            .values()) {
+          for (List<TsFileResource> levelTsFileResource : partitionUnSequenceTsFileResource) {
+            levelTsFileResource.removeAll(tsFileResourceList);
+          }
         }
       }
     }
@@ -203,28 +208,33 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
     long timePartitionId = tsFileResource.getTimePartition();
     int level = getMergeLevel(tsFileResource.getTsFile());
     if (sequence) {
-      if (level <= seqLevelNum - 1) {
-        // current file has too high level
-        sequenceTsFileResources
-            .computeIfAbsent(timePartitionId, this::newSequenceTsFileResources).get(level)
-            .add(tsFileResource);
-      } else {
-        // current file has normal level
-        sequenceTsFileResources
-            .computeIfAbsent(timePartitionId, this::newSequenceTsFileResources).get(seqLevelNum - 1)
-            .add(tsFileResource);
+      synchronized (sequenceTsFileResources) {
+        if (level <= seqLevelNum - 1) {
+          // current file has normal level
+          sequenceTsFileResources
+              .computeIfAbsent(timePartitionId, this::newSequenceTsFileResources).get(level)
+              .add(tsFileResource);
+        } else {
+          // current file has too high level
+          sequenceTsFileResources
+              .computeIfAbsent(timePartitionId, this::newSequenceTsFileResources)
+              .get(seqLevelNum - 1)
+              .add(tsFileResource);
+        }
       }
     } else {
-      if (level <= unseqLevelNum - 1) {
-        // current file has too high level
-        unSequenceTsFileResources
-            .computeIfAbsent(timePartitionId, this::newUnSequenceTsFileResources).get(level)
-            .add(tsFileResource);
-      } else {
-        // current file has normal level
-        unSequenceTsFileResources
-            .computeIfAbsent(timePartitionId, this::newUnSequenceTsFileResources)
-            .get(unseqLevelNum - 1).add(tsFileResource);
+      synchronized (unSequenceTsFileResources) {
+        if (level <= unseqLevelNum - 1) {
+          // current file has normal level
+          unSequenceTsFileResources
+              .computeIfAbsent(timePartitionId, this::newUnSequenceTsFileResources).get(level)
+              .add(tsFileResource);
+        } else {
+          // current file has too high level
+          unSequenceTsFileResources
+              .computeIfAbsent(timePartitionId, this::newUnSequenceTsFileResources)
+              .get(unseqLevelNum - 1).add(tsFileResource);
+        }
       }
     }
   }
@@ -239,7 +249,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
   @Override
   public boolean contains(TsFileResource tsFileResource, boolean sequence) {
     if (sequence) {
-      for (TreeSet<TsFileResource> sequenceTsFileResource : sequenceTsFileResources
+      for (SortedSet<TsFileResource> sequenceTsFileResource : sequenceTsFileResources
           .computeIfAbsent(tsFileResource.getTimePartition(), this::newSequenceTsFileResources)) {
         if (sequenceTsFileResource.contains(tsFileResource)) {
           return true;
@@ -266,9 +276,9 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
   @SuppressWarnings("squid:S3776")
   public boolean isEmpty(boolean sequence) {
     if (sequence) {
-      for (List<TreeSet<TsFileResource>> partitionSequenceTsFileResource : sequenceTsFileResources
+      for (List<SortedSet<TsFileResource>> partitionSequenceTsFileResource : sequenceTsFileResources
           .values()) {
-        for (TreeSet<TsFileResource> sequenceTsFileResource : partitionSequenceTsFileResource) {
+        for (SortedSet<TsFileResource> sequenceTsFileResource : partitionSequenceTsFileResource) {
           if (!sequenceTsFileResource.isEmpty()) {
             return false;
           }
@@ -291,7 +301,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
   public int size(boolean sequence) {
     int result = 0;
     if (sequence) {
-      for (List<TreeSet<TsFileResource>> partitionSequenceTsFileResource : sequenceTsFileResources
+      for (List<SortedSet<TsFileResource>> partitionSequenceTsFileResource : sequenceTsFileResources
           .values()) {
         for (int i = seqLevelNum - 1; i >= 0; i--) {
           result += partitionSequenceTsFileResource.get(i).size();
@@ -324,75 +334,71 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
         List<String> sourceFileList = logAnalyzer.getSourceFiles();
         long offset = logAnalyzer.getOffset();
         String targetFile = logAnalyzer.getTargetFile();
-        boolean isMergeFinished = logAnalyzer.isMergeFinished();
         boolean fullMerge = logAnalyzer.isFullMerge();
         boolean isSeq = logAnalyzer.isSeq();
-        if (targetFile == null) {
+        if (targetFile == null || sourceFileList.isEmpty()) {
+          return;
+        }
+        File target = new File(targetFile);
+        if (deviceSet.isEmpty()) {
+          // if not in compaction, just delete the target file
+          if (target.exists()) {
+            Files.delete(target.toPath());
+          }
           return;
         }
         if (fullMerge) {
-          if (!isMergeFinished) {
-            RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(new File(targetFile));
-            writer.getIOWriterOut().truncate(offset - 1);
+          // get tsfile resource from list, as they have been recovered in StorageGroupProcessor
+          TsFileResource targetTsFileResource = getTsFileResource(targetFile, isSeq);
+          long timePartition = targetTsFileResource.getTimePartition();
+          RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(target);
+          // if not complete compaction, resume merge
+          if (writer.hasCrashed()) {
+            if (offset > 0) {
+              writer.getIOWriterOut().truncate(offset - 1);
+            }
             writer.close();
-            TsFileResource targetTsFileResource = getTsFileResource(targetFile, isSeq);
-            long timePartition = targetTsFileResource.getTimePartition();
+            CompactionLogger compactionLogger = new CompactionLogger(storageGroupDir,
+                storageGroupName);
             CompactionUtils
                 .merge(targetTsFileResource, getTsFileList(isSeq), storageGroupName,
-                    new CompactionLogger(storageGroupDir, storageGroupName), deviceSet, isSeq);
-            if (isSeq) {
-              for (int level = 0; level < sequenceTsFileResources.get(timePartition).size();
-                  level++) {
-                TreeSet<TsFileResource> currLevelMergeFile = sequenceTsFileResources
-                    .get(timePartition).get(level);
-                deleteLevelFilesInDisk(currLevelMergeFile);
-                deleteLevelFilesInList(timePartition, currLevelMergeFile, level, isSeq);
-              }
-            } else {
-              for (int level = 0; level < unSequenceTsFileResources.get(timePartition).size();
-                  level++) {
-                TreeSet<TsFileResource> currLevelMergeFile = sequenceTsFileResources
-                    .get(timePartition).get(level);
-                deleteLevelFilesInDisk(currLevelMergeFile);
-                deleteLevelFilesInList(timePartition, currLevelMergeFile, level, isSeq);
-              }
-            }
+                    compactionLogger, deviceSet, isSeq);
+            compactionLogger.close();
+          } else {
+            writer.close();
           }
+          // complete compaction and delete source file
+          deleteAllSubLevelFiles(isSeq, timePartition);
         } else {
+          // get tsfile resource from list, as they have been recovered in StorageGroupProcessor
           TsFileResource targetResource = getTsFileResource(targetFile, isSeq);
           long timePartition = targetResource.getTimePartition();
-          RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(new File(targetFile));
           List<TsFileResource> sourceTsFileResources = new ArrayList<>();
           for (String file : sourceFileList) {
+            // get tsfile resource from list, as they have been recovered in StorageGroupProcessor
             sourceTsFileResources.add(getTsFileResource(file, isSeq));
           }
-          if (sourceFileList.isEmpty()) {
-            return;
-          }
           int level = getMergeLevel(new File(sourceFileList.get(0)));
-          if (!isMergeFinished) {
-            if (deviceSet.isEmpty()) {
-              Files.delete(new File(targetFile).toPath());
-            } else {
+          RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(target);
+          // if not complete compaction, resume merge
+          if (writer.hasCrashed()) {
+            if (offset > 0) {
               writer.getIOWriterOut().truncate(offset - 1);
-              writer.close();
-              if (isSeq) {
-                CompactionUtils
-                    .merge(targetResource, sourceTsFileResources, storageGroupName,
-                        new CompactionLogger(storageGroupDir, storageGroupName), deviceSet, true);
-                deleteLevelFilesInDisk(sourceTsFileResources);
-                deleteLevelFilesInList(timePartition, sourceTsFileResources, level, isSeq);
-                sequenceTsFileResources.get(timePartition).get(level + 1).add(targetResource);
-              } else {
-                CompactionUtils
-                    .merge(targetResource, sourceTsFileResources, storageGroupName,
-                        new CompactionLogger(storageGroupDir, storageGroupName), deviceSet, false);
-                deleteLevelFilesInDisk(sourceTsFileResources);
-                deleteLevelFilesInList(timePartition, sourceTsFileResources, level, isSeq);
-                unSequenceTsFileResources.get(timePartition).get(level + 1).add(targetResource);
-              }
             }
+            writer.close();
+            CompactionLogger compactionLogger = new CompactionLogger(storageGroupDir,
+                storageGroupName);
+            CompactionUtils
+                .merge(targetResource, sourceTsFileResources, storageGroupName,
+                    compactionLogger, deviceSet,
+                    isSeq);
+            compactionLogger.close();
+          } else {
+            writer.close();
           }
+          // complete compaction and delete source file
+          deleteLevelFilesInDisk(sourceTsFileResources);
+          deleteLevelFilesInList(timePartition, sourceTsFileResources, level, isSeq);
         }
       }
     } catch (IOException e) {
@@ -408,18 +414,42 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
     }
   }
 
+  private void deleteAllSubLevelFiles(boolean isSeq, long timePartition) {
+    if (isSeq) {
+      for (int level = 0; level < sequenceTsFileResources.get(timePartition).size();
+          level++) {
+        SortedSet<TsFileResource> currLevelMergeFile = sequenceTsFileResources
+            .get(timePartition).get(level);
+        deleteLevelFilesInDisk(currLevelMergeFile);
+        deleteLevelFilesInList(timePartition, currLevelMergeFile, level, isSeq);
+      }
+    } else {
+      for (int level = 0; level < unSequenceTsFileResources.get(timePartition).size();
+          level++) {
+        SortedSet<TsFileResource> currLevelMergeFile = sequenceTsFileResources
+            .get(timePartition).get(level);
+        deleteLevelFilesInDisk(currLevelMergeFile);
+        deleteLevelFilesInList(timePartition, currLevelMergeFile, level, isSeq);
+      }
+    }
+  }
+
   @Override
   public void forkCurrentFileList(long timePartition) {
-    forkTsFileList(
-        forkedSequenceTsFileResources,
-        sequenceTsFileResources.computeIfAbsent(timePartition, this::newSequenceTsFileResources),
-        seqLevelNum, seqFileNumInEachLevel);
+    synchronized (sequenceTsFileResources) {
+      forkTsFileList(
+          forkedSequenceTsFileResources,
+          sequenceTsFileResources.computeIfAbsent(timePartition, this::newSequenceTsFileResources),
+          seqLevelNum, seqFileNumInEachLevel);
+    }
     // we have to copy all unseq file
-    forkTsFileList(
-        forkedUnSequenceTsFileResources,
-        unSequenceTsFileResources
-            .computeIfAbsent(timePartition, this::newUnSequenceTsFileResources),
-        unseqLevelNum + 1, unseqFileNumInEachLevel);
+    synchronized (unSequenceTsFileResources) {
+      forkTsFileList(
+          forkedUnSequenceTsFileResources,
+          unSequenceTsFileResources
+              .computeIfAbsent(timePartition, this::newUnSequenceTsFileResources),
+          unseqLevelNum + 1, unseqFileNumInEachLevel);
+    }
   }
 
   private void forkTsFileList(
@@ -480,6 +510,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
           } else {
             CompactionLogger compactionLogger = new CompactionLogger(storageGroupDir,
                 storageGroupName);
+            // log source file list and target file for recover
             for (TsFileResource mergeResource : mergeResources.get(i)) {
               compactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile());
             }
@@ -496,6 +527,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
             }
 
             TsFileResource newResource = new TsFileResource(newLevelFile);
+            // merge, read from source files and write to target file
             CompactionUtils
                 .merge(newResource, toMergeTsFiles, storageGroupName, compactionLogger,
                     new HashSet<>(), sequence);
@@ -504,7 +536,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
                 storageGroupName, i, toMergeTsFiles.size());
             writeLock();
             try {
-              compactionLogger.logMergeFinish();
               if (sequence) {
                 sequenceTsFileResources.get(timePartition).get(i + 1).add(newResource);
               } else {
@@ -546,17 +577,21 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
     return new File(prefixPath + level + TSFILE_SUFFIX);
   }
 
-  private List<TreeSet<TsFileResource>> newSequenceTsFileResources(Long k) {
-    List<TreeSet<TsFileResource>> newSequenceTsFileResources = new CopyOnWriteArrayList<>();
+  private List<SortedSet<TsFileResource>> newSequenceTsFileResources(Long k) {
+    List<SortedSet<TsFileResource>> newSequenceTsFileResources = new CopyOnWriteArrayList<>();
     for (int i = 0; i < seqLevelNum; i++) {
-      newSequenceTsFileResources.add(new TreeSet<>(
+      newSequenceTsFileResources.add(Collections.synchronizedSortedSet(new TreeSet<>(
           (o1, o2) -> {
-            int rangeCompare = Long
-                .compare(Long.parseLong(o1.getTsFile().getParentFile().getName()),
-                    Long.parseLong(o2.getTsFile().getParentFile().getName()));
-            return rangeCompare == 0 ? compareFileName(o1.getTsFile(), o2.getTsFile())
-                : rangeCompare;
-          }));
+            try {
+              int rangeCompare = Long
+                  .compare(Long.parseLong(o1.getTsFile().getParentFile().getName()),
+                      Long.parseLong(o2.getTsFile().getParentFile().getName()));
+              return rangeCompare == 0 ? compareFileName(o1.getTsFile(), o2.getTsFile())
+                  : rangeCompare;
+            } catch (NumberFormatException e) {
+              return compareFileName(o1.getTsFile(), o2.getTsFile());
+            }
+          })));
     }
     return newSequenceTsFileResources;
   }
@@ -578,9 +613,9 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
 
   private TsFileResource getTsFileResource(String filePath, boolean isSeq) throws IOException {
     if (isSeq) {
-      for (List<TreeSet<TsFileResource>> tsFileResourcesWithLevel : sequenceTsFileResources
+      for (List<SortedSet<TsFileResource>> tsFileResourcesWithLevel : sequenceTsFileResources
           .values()) {
-        for (TreeSet<TsFileResource> tsFileResources : tsFileResourcesWithLevel) {
+        for (SortedSet<TsFileResource> tsFileResources : tsFileResourcesWithLevel) {
           for (TsFileResource tsFileResource : tsFileResources) {
             if (tsFileResource.getTsFile().getAbsolutePath().equals(filePath)) {
               return tsFileResource;
@@ -603,4 +638,4 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
     logger.error("cannot get tsfile resource path: {}", filePath);
     throw new IOException();
   }
-}
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
index dec92e0..d80a41a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
@@ -47,11 +47,6 @@ public class NoCompactionTsFileManagement extends TsFileManagement {
   }
 
   @Override
-  public List<TsFileResource> getStableTsFileList(boolean sequence) {
-    return getTsFileList(sequence);
-  }
-
-  @Override
   public List<TsFileResource> getTsFileList(boolean sequence) {
     if (sequence) {
       return new ArrayList<>(sequenceFileTreeSet);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 80c31cb..8152cc8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -149,7 +149,9 @@ public class CompactionUtils {
       chunkWriter = new ChunkWriterImpl(
           IoTDB.metaManager.getSeriesSchema(new PartialPath(device), entry.getKey()));
     } catch (MetadataException e) {
-      throw new IOException(e);
+      // this may caused in IT by restart
+      logger.error("{} get schema {} error,skip this sensor", device, entry.getKey());
+      return maxVersion;
     }
     for (TimeValuePair timeValuePair : timeValuePairMap.values()) {
       writeTVPair(timeValuePair, chunkWriter);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index a91661b..5fb31f6 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1317,14 +1317,6 @@ public class StorageGroupProcessor {
                 (System.currentTimeMillis() - startTime) / 1000);
           }
         }
-        while (compactionMergeWorking) {
-          closeStorageGroupCondition.wait(100);
-          if (System.currentTimeMillis() - startTime > 60_000) {
-            logger
-                .warn("{} has spent {}s to wait for closing compaction.", this.storageGroupName,
-                    (System.currentTimeMillis() - startTime) / 1000);
-          }
-        }
       } catch (InterruptedException e) {
         logger.error("CloseFileNodeCondition error occurs while waiting for closing the storage "
             + "group {}", storageGroupName, e);
@@ -1726,6 +1718,10 @@ public class StorageGroupProcessor {
     } else {
       closingUnSequenceTsFileProcessor.remove(tsFileProcessor);
     }
+    synchronized (closeStorageGroupCondition) {
+      closeStorageGroupCondition.notifyAll();
+    }
+    logger.info("signal closing storage group condition in {}", storageGroupName);
     if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance()
         .isTerminated()) {
       compactionMergeWorking = true;
@@ -1745,10 +1741,6 @@ public class StorageGroupProcessor {
       logger.info("{} last compaction merge task is working, skip current merge",
           storageGroupName);
     }
-    synchronized (closeStorageGroupCondition) {
-      closeStorageGroupCondition.notifyAll();
-    }
-    logger.info("signal closing storage group condition in {}", storageGroupName);
   }
 
   /**
@@ -1756,9 +1748,6 @@ public class StorageGroupProcessor {
    */
   private void closeCompactionMergeCallBack() {
     this.compactionMergeWorking = false;
-    synchronized (closeStorageGroupCondition) {
-      closeStorageGroupCondition.notifyAll();
-    }
   }
 
   /**
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index c1ea53e..00db06a 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
@@ -78,6 +79,8 @@ public class EnvironmentUtils {
       .parseBoolean(System.getProperty("test.port.closed", "false"));
 
   public static void cleanEnv() throws IOException, StorageEngineException {
+    // wait all compaction finished
+    CompactionMergeTaskPoolManager.getInstance().waitAllCompactionFinish();
     logger.warn("EnvironmentUtil cleanEnv...");
     if (daemon != null) {
       daemon.stop();