You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/03/31 02:57:23 UTC

[iotdb] branch rel/0.11 updated: [To rel/0.11] Add level compaction to merge command (#2948)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.11 by this push:
     new eb9c591  [To rel/0.11] Add level compaction to merge command (#2948)
eb9c591 is described below

commit eb9c5918a92aab3dd6f9e88a674b45b9a0dc6eab
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Wed Mar 31 10:57:00 2021 +0800

    [To rel/0.11] Add level compaction to merge command (#2948)
    
    cherry-picked from master(0.12.0-SNAPSHOT)
---
 .../db/engine/compaction/TsFileManagement.java     |  141 ++-
 .../level/LevelCompactionTsFileManagement.java     |    2 -
 .../engine/storagegroup/StorageGroupProcessor.java | 1252 +++++++++++---------
 .../storagegroup/StorageGroupProcessorTest.java    |  154 ++-
 4 files changed, 844 insertions(+), 705 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 0522066..afda967 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -55,9 +55,7 @@ public abstract class TsFileManagement {
   protected String storageGroupName;
   protected String storageGroupDir;
 
-  /**
-   * Serialize queries, delete resource files, compaction cleanup files
-   */
+  /** Serialize queries, delete resource files, compaction cleanup files */
   private final ReadWriteLock compactionMergeLock = new ReentrantReadWriteLock();
 
   public volatile boolean isUnseqMerging = false;
@@ -67,76 +65,57 @@ public abstract class TsFileManagement {
    * be invisible at this moment, without this, deletion/update during merge could be lost.
    */
   public ModificationFile mergingModification;
+
   private long mergeStartTime;
 
+  protected boolean isForceFullMerge = IoTDBDescriptor.getInstance().getConfig().isForceFullMerge();
+
   public TsFileManagement(String storageGroupName, String storageGroupDir) {
     this.storageGroupName = storageGroupName;
     this.storageGroupDir = storageGroupDir;
   }
 
-  /**
-   * get the TsFile list in sequence
-   */
+  public void setForceFullMerge(boolean forceFullMerge) {
+    isForceFullMerge = forceFullMerge;
+  }
+
+  /** get the TsFile list in sequence */
   public abstract List<TsFileResource> getTsFileList(boolean sequence);
 
-  /**
-   * get the TsFile list iterator in sequence
-   */
+  /** get the TsFile list iterator in sequence */
   public abstract Iterator<TsFileResource> getIterator(boolean sequence);
 
-  /**
-   * remove one TsFile from list
-   */
+  /** remove one TsFile from list */
   public abstract void remove(TsFileResource tsFileResource, boolean sequence);
 
-  /**
-   * remove some TsFiles from list
-   */
+  /** remove some TsFiles from list */
   public abstract void removeAll(List<TsFileResource> tsFileResourceList, boolean sequence);
 
-  /**
-   * add one TsFile to list
-   */
+  /** add one TsFile to list */
   public abstract void add(TsFileResource tsFileResource, boolean sequence);
 
-  /**
-   * add one TsFile to list for recover
-   */
+  /** add one TsFile to list for recover */
   public abstract void addRecover(TsFileResource tsFileResource, boolean sequence);
 
-  /**
-   * add some TsFiles to list
-   */
+  /** add some TsFiles to list */
   public abstract void addAll(List<TsFileResource> tsFileResourceList, boolean sequence);
 
-  /**
-   * is one TsFile contained in list
-   */
+  /** is one TsFile contained in list */
   public abstract boolean contains(TsFileResource tsFileResource, boolean sequence);
 
-  /**
-   * clear list
-   */
+  /** clear list */
   public abstract void clear();
 
-  /**
-   * is the list empty
-   */
+  /** is the list empty */
   public abstract boolean isEmpty(boolean sequence);
 
-  /**
-   * return TsFile list size
-   */
+  /** return TsFile list size */
   public abstract int size(boolean sequence);
 
-  /**
-   * recover TsFile list
-   */
+  /** recover TsFile list */
   public abstract void recover();
 
-  /**
-   * fork current TsFile list (call this before merge)
-   */
+  /** fork current TsFile list (call this before merge) */
   public abstract void forkCurrentFileList(long timePartition) throws IOException;
 
   public void readLock() {
@@ -166,8 +145,8 @@ public abstract class TsFileManagement {
     private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
     private long timePartitionId;
 
-    public CompactionMergeTask(CloseCompactionMergeCallBack closeCompactionMergeCallBack,
-        long timePartitionId) {
+    public CompactionMergeTask(
+        CloseCompactionMergeCallBack closeCompactionMergeCallBack, long timePartitionId) {
       this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
       this.timePartitionId = timePartitionId;
     }
@@ -194,11 +173,16 @@ public abstract class TsFileManagement {
     }
   }
 
-  public void merge(boolean fullMerge, List<TsFileResource> seqMergeList,
-      List<TsFileResource> unSeqMergeList, long dataTTL) {
+  public void merge(
+      boolean fullMerge,
+      List<TsFileResource> seqMergeList,
+      List<TsFileResource> unSeqMergeList,
+      long dataTTL) {
     if (isUnseqMerging) {
       if (logger.isInfoEnabled()) {
-        logger.info("{} Last merge is ongoing, currently consumed time: {}ms", storageGroupName,
+        logger.info(
+            "{} Last merge is ongoing, currently consumed time: {}ms",
+            storageGroupName,
             (System.currentTimeMillis() - mergeStartTime));
       }
       return;
@@ -235,8 +219,8 @@ public abstract class TsFileManagement {
     try {
       List[] mergeFiles = fileSelector.select();
       if (mergeFiles.length == 0) {
-        logger.info("{} cannot select merge candidates under the budget {}", storageGroupName,
-            budget);
+        logger.info(
+            "{} cannot select merge candidates under the budget {}", storageGroupName, budget);
         isUnseqMerging = false;
         return;
       }
@@ -255,15 +239,25 @@ public abstract class TsFileManagement {
       }
 
       mergeStartTime = System.currentTimeMillis();
-      MergeTask mergeTask = new MergeTask(mergeResource, storageGroupDir,
-          this::mergeEndAction, taskName, fullMerge, fileSelector.getConcurrentMergeNum(),
-          storageGroupName);
-      mergingModification = new ModificationFile(
-          storageGroupDir + File.separator + MERGING_MODIFICATION_FILE_NAME);
+      MergeTask mergeTask =
+          new MergeTask(
+              mergeResource,
+              storageGroupDir,
+              this::mergeEndAction,
+              taskName,
+              fullMerge,
+              fileSelector.getConcurrentMergeNum(),
+              storageGroupName);
+      mergingModification =
+          new ModificationFile(storageGroupDir + File.separator + MERGING_MODIFICATION_FILE_NAME);
       MergeManager.getINSTANCE().submitMainTask(mergeTask);
       if (logger.isInfoEnabled()) {
-        logger.info("{} submits a merge task {}, merging {} seqFiles, {} unseqFiles",
-            storageGroupName, taskName, mergeFiles[0].size(), mergeFiles[1].size());
+        logger.info(
+            "{} submits a merge task {}, merging {} seqFiles, {} unseqFiles",
+            storageGroupName,
+            taskName,
+            mergeFiles[0].size(),
+            mergeFiles[1].size());
       }
 
     } catch (MergeException | IOException e) {
@@ -283,9 +277,7 @@ public abstract class TsFileManagement {
     }
   }
 
-  /**
-   * acquire the write locks of the resource , the merge lock and the compaction lock
-   */
+  /** acquire the write locks of the resource , the merge lock and the compaction lock */
   private void doubleWriteLock(TsFileResource seqFile) {
     boolean fileLockGot;
     boolean compactionLockGot;
@@ -307,9 +299,7 @@ public abstract class TsFileManagement {
     }
   }
 
-  /**
-   * release the write locks of the resource , the merge lock and the compaction lock
-   */
+  /** release the write locks of the resource , the merge lock and the compaction lock */
   private void doubleWriteUnlock(TsFileResource seqFile) {
     writeUnlock();
     seqFile.writeUnlock();
@@ -351,13 +341,16 @@ public abstract class TsFileManagement {
         try {
           seqFile.getModFile().close();
         } catch (IOException e) {
-          logger
-              .error("Cannot close the ModificationFile {}", seqFile.getModFile().getFilePath(), e);
+          logger.error(
+              "Cannot close the ModificationFile {}", seqFile.getModFile().getFilePath(), e);
         }
       }
     } catch (IOException e) {
-      logger.error("{} cannot clean the ModificationFile of {} after merge", storageGroupName,
-          seqFile.getTsFile(), e);
+      logger.error(
+          "{} cannot clean the ModificationFile of {} after merge",
+          storageGroupName,
+          seqFile.getTsFile(),
+          e);
     }
   }
 
@@ -372,8 +365,8 @@ public abstract class TsFileManagement {
     }
   }
 
-  public void mergeEndAction(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles,
-      File mergeLog) {
+  public void mergeEndAction(
+      List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles, File mergeLog) {
     logger.info("{} a merge task is ending...", storageGroupName);
 
     if (unseqFiles.isEmpty()) {
@@ -392,14 +385,14 @@ public abstract class TsFileManagement {
       try {
         updateMergeModification(seqFile);
         if (i == seqFiles.size() - 1) {
-          //FIXME if there is an exception, the the modification file will be not closed.
+          // FIXME if there is an exception, the the modification file will be not closed.
           removeMergingModification();
           isUnseqMerging = false;
           Files.delete(mergeLog.toPath());
         }
       } catch (IOException e) {
-        logger.error("{} a merge task ends but cannot delete log {}", storageGroupName,
-            mergeLog.toPath());
+        logger.error(
+            "{} a merge task ends but cannot delete log {}", storageGroupName, mergeLog.toPath());
       } finally {
         doubleWriteUnlock(seqFile);
       }
@@ -410,10 +403,8 @@ public abstract class TsFileManagement {
 
   // ({systemTime}-{versionNum}-{mergeNum}.tsfile)
   public static int compareFileName(File o1, File o2) {
-    String[] items1 = o1.getName().replace(TSFILE_SUFFIX, "")
-        .split(FILE_NAME_SEPARATOR);
-    String[] items2 = o2.getName().replace(TSFILE_SUFFIX, "")
-        .split(FILE_NAME_SEPARATOR);
+    String[] items1 = o1.getName().replace(TSFILE_SUFFIX, "").split(FILE_NAME_SEPARATOR);
+    String[] items2 = o2.getName().replace(TSFILE_SUFFIX, "").split(FILE_NAME_SEPARATOR);
     long ver1 = Long.parseLong(items1[0]);
     long ver2 = Long.parseLong(items2[0]);
     int cmp = Long.compare(ver1, ver2);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index fbc7ef4..0cc543e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -76,8 +76,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
 
   private final boolean enableUnseqCompaction =
       IoTDBDescriptor.getInstance().getConfig().isEnableUnseqCompaction();
-  private final boolean isForceFullMerge =
-      IoTDBDescriptor.getInstance().getConfig().isForceFullMerge();
 
   // First map is partition list; Second list is level list; Third list is file list in level;
   private final Map<Long, List<SortedSet<TsFileResource>>> sequenceTsFileResources =
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 344f914..9c9db9f 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -101,22 +101,22 @@ import org.slf4j.LoggerFactory;
 
 /**
  * For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one
- * TsFileProcessor in the working status. <br/>
- * <p>
- * There are two situations to set the working TsFileProcessor to closing status:<br/>
- * <p>
- * (1) when inserting data into the TsFileProcessor, and the TsFileProcessor shouldFlush() (or
- * shouldClose())<br/>
- * <p>
- * (2) someone calls syncCloseAllWorkingTsFileProcessors(). (up to now, only flush command from cli
- * will call this method)<br/>
- * <p>
- * UnSequence data has the similar process as above.
- * <p>
- * When a sequence TsFileProcessor is submitted to be flushed, the updateLatestFlushTimeCallback()
- * method will be called as a callback.<br/>
- * <p>
- * When a TsFileProcessor is closed, the closeUnsealedTsFileProcessorCallBack() method will be
+ * TsFileProcessor in the working status. <br>
+ *
+ * <p>There are two situations to set the working TsFileProcessor to closing status:<br>
+ *
+ * <p>(1) when inserting data into the TsFileProcessor, and the TsFileProcessor shouldFlush() (or
+ * shouldClose())<br>
+ *
+ * <p>(2) someone calls syncCloseAllWorkingTsFileProcessors(). (up to now, only flush command from
+ * cli will call this method)<br>
+ *
+ * <p>UnSequence data has the similar process as above.
+ *
+ * <p>When a sequence TsFileProcessor is submitted to be flushed, the
+ * updateLatestFlushTimeCallback() method will be called as a callback.<br>
+ *
+ * <p>When a TsFileProcessor is closed, the closeUnsealedTsFileProcessorCallBack() method will be
  * called as a callback.
  */
 public class StorageGroupProcessor {
@@ -136,13 +136,9 @@ public class StorageGroupProcessor {
   private static final Logger logger = LoggerFactory.getLogger(StorageGroupProcessor.class);
 
   private final boolean enableMemControl = config.isEnableMemControl();
-  /**
-   * indicating the file to be loaded already exists locally.
-   */
+  /** indicating the file to be loaded already exists locally. */
   private static final int POS_ALREADY_EXIST = -2;
-  /**
-   * indicating the file to be loaded overlap with some files.
-   */
+  /** indicating the file to be loaded overlap with some files. */
   private static final int POS_OVERLAP = -3;
   /**
    * a read write lock for guaranteeing concurrent safety when accessing all fields in this class
@@ -151,35 +147,29 @@ public class StorageGroupProcessor {
    * partitionLatestFlushedTimeForEachDevice)
    */
   private final ReadWriteLock insertLock = new ReentrantReadWriteLock();
-  /**
-   * closeStorageGroupCondition is used to wait for all currently closing TsFiles to be done.
-   */
+  /** closeStorageGroupCondition is used to wait for all currently closing TsFiles to be done. */
   private final Object closeStorageGroupCondition = new Object();
   /**
    * avoid some tsfileResource is changed (e.g., from unsealed to sealed) when a query is executed.
    */
   private final ReadWriteLock closeQueryLock = new ReentrantReadWriteLock();
-  /**
-   * time partition id in the storage group -> tsFileProcessor for this time partition
-   */
+  /** time partition id in the storage group -> tsFileProcessor for this time partition */
   private final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors = new TreeMap<>();
-  /**
-   * time partition id in the storage group -> tsFileProcessor for this time partition
-   */
+  /** time partition id in the storage group -> tsFileProcessor for this time partition */
   private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors = new TreeMap<>();
-  /**
-   * compactionMergeWorking is used to wait for last compaction to be done.
-   */
+  /** compactionMergeWorking is used to wait for last compaction to be done. */
   private volatile boolean compactionMergeWorking = false;
   // upgrading sequence TsFile resource list
   private List<TsFileResource> upgradeSeqFileList = new LinkedList<>();
 
-  private CopyOnReadLinkedList<TsFileProcessor> closingSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
+  private CopyOnReadLinkedList<TsFileProcessor> closingSequenceTsFileProcessor =
+      new CopyOnReadLinkedList<>();
 
   // upgrading unsequence TsFile resource list
   private List<TsFileResource> upgradeUnseqFileList = new LinkedList<>();
 
-  private CopyOnReadLinkedList<TsFileProcessor> closingUnSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
+  private CopyOnReadLinkedList<TsFileProcessor> closingUnSequenceTsFileProcessor =
+      new CopyOnReadLinkedList<>();
   /*
    * time partition id -> map, which contains
    * device -> global latest timestamp of each device latestTimeForEachDevice caches non-flushed
@@ -196,16 +186,17 @@ public class StorageGroupProcessor {
    */
   private Map<Long, Map<String, Long>> partitionLatestFlushedTimeForEachDevice = new HashMap<>();
 
-  /**
-   * used to record the latest flush time while upgrading and inserting
-   */
-  private Map<Long, Map<String, Long>> newlyFlushedPartitionLatestFlushedTimeForEachDevice = new HashMap<>();
+  /** used to record the latest flush time while upgrading and inserting */
+  private Map<Long, Map<String, Long>> newlyFlushedPartitionLatestFlushedTimeForEachDevice =
+      new HashMap<>();
   /**
    * global mapping of device -> largest timestamp of the latest memtable to * be submitted to
    * asyncTryToFlush, globalLatestFlushedTimeForEachDevice is utilized to maintain global
-   * latestFlushedTime of devices and will be updated along with partitionLatestFlushedTimeForEachDevice
+   * latestFlushedTime of devices and will be updated along with
+   * partitionLatestFlushedTimeForEachDevice
    */
   private Map<String, Long> globalLatestFlushedTimeForEachDevice = new HashMap<>();
+
   private String storageGroupName;
   private File storageGroupSysDir;
 
@@ -224,6 +215,7 @@ public class StorageGroupProcessor {
    * eventually removed.
    */
   private long dataTTL = Long.MAX_VALUE;
+
   private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
   private TsFileFlushPolicy fileFlushPolicy;
 
@@ -260,24 +252,27 @@ public class StorageGroupProcessor {
   private List<CloseFileListener> customCloseFileListeners = Collections.emptyList();
   private List<FlushListener> customFlushListeners = Collections.emptyList();
 
-  public StorageGroupProcessor(String systemDir, String storageGroupName,
-      TsFileFlushPolicy fileFlushPolicy) throws StorageGroupProcessorException {
+  public StorageGroupProcessor(
+      String systemDir, String storageGroupName, TsFileFlushPolicy fileFlushPolicy)
+      throws StorageGroupProcessorException {
     this.storageGroupName = storageGroupName;
     this.fileFlushPolicy = fileFlushPolicy;
 
     storageGroupSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, storageGroupName);
     if (storageGroupSysDir.mkdirs()) {
-      logger.info("Storage Group system Directory {} doesn't exist, create it",
+      logger.info(
+          "Storage Group system Directory {} doesn't exist, create it",
           storageGroupSysDir.getPath());
     } else if (!storageGroupSysDir.exists()) {
-      logger.error("create Storage Group system Directory {} failed",
-          storageGroupSysDir.getPath());
+      logger.error("create Storage Group system Directory {} failed", storageGroupSysDir.getPath());
     }
-    this.tsFileManagement = IoTDBDescriptor.getInstance().getConfig().getCompactionStrategy()
-        .getTsFileManagement(storageGroupName, storageGroupSysDir.getAbsolutePath());
+    this.tsFileManagement =
+        IoTDBDescriptor.getInstance()
+            .getConfig()
+            .getCompactionStrategy()
+            .getTsFileManagement(storageGroupName, storageGroupSysDir.getAbsolutePath());
 
     recover();
-
   }
 
   private Map<Long, List<TsFileResource>> splitResourcesByPartition(
@@ -294,23 +289,23 @@ public class StorageGroupProcessor {
 
     try {
       // collect candidate TsFiles from sequential and unsequential data directory
-      Pair<List<TsFileResource>, List<TsFileResource>> seqTsFilesPair = getAllFiles(
-          DirectoryManager.getInstance().getAllSequenceFileFolders());
+      Pair<List<TsFileResource>, List<TsFileResource>> seqTsFilesPair =
+          getAllFiles(DirectoryManager.getInstance().getAllSequenceFileFolders());
       List<TsFileResource> tmpSeqTsFiles = seqTsFilesPair.left;
       List<TsFileResource> oldSeqTsFiles = seqTsFilesPair.right;
       upgradeSeqFileList.addAll(oldSeqTsFiles);
-      Pair<List<TsFileResource>, List<TsFileResource>> unseqTsFilesPair = getAllFiles(
-          DirectoryManager.getInstance().getAllUnSequenceFileFolders());
+      Pair<List<TsFileResource>, List<TsFileResource>> unseqTsFilesPair =
+          getAllFiles(DirectoryManager.getInstance().getAllUnSequenceFileFolders());
       List<TsFileResource> tmpUnseqTsFiles = unseqTsFilesPair.left;
       List<TsFileResource> oldUnseqTsFiles = unseqTsFilesPair.right;
       upgradeUnseqFileList.addAll(oldUnseqTsFiles);
 
       // split by partition so that we can find the last file of each partition and decide to
       // close it or not
-      Map<Long, List<TsFileResource>> partitionTmpSeqTsFiles = splitResourcesByPartition(
-          tmpSeqTsFiles);
-      Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles = splitResourcesByPartition(
-          tmpUnseqTsFiles);
+      Map<Long, List<TsFileResource>> partitionTmpSeqTsFiles =
+          splitResourcesByPartition(tmpSeqTsFiles);
+      Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles =
+          splitResourcesByPartition(tmpUnseqTsFiles);
       for (List<TsFileResource> value : partitionTmpSeqTsFiles.values()) {
         recoverTsFiles(value, true);
       }
@@ -319,33 +314,38 @@ public class StorageGroupProcessor {
       }
 
       String taskName = storageGroupName + "-" + System.currentTimeMillis();
-      File mergingMods = SystemFileFactory.INSTANCE.getFile(storageGroupSysDir,
-          MERGING_MODIFICATION_FILE_NAME);
+      File mergingMods =
+          SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, MERGING_MODIFICATION_FILE_NAME);
       if (mergingMods.exists()) {
         this.tsFileManagement.mergingModification = new ModificationFile(mergingMods.getPath());
       }
-      RecoverMergeTask recoverMergeTask = new RecoverMergeTask(
-          new ArrayList<>(tsFileManagement.getTsFileList(true)),
-          tsFileManagement.getTsFileList(false), storageGroupSysDir.getPath(),
-          tsFileManagement::mergeEndAction,
-          taskName,
-          IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(), storageGroupName);
+      RecoverMergeTask recoverMergeTask =
+          new RecoverMergeTask(
+              new ArrayList<>(tsFileManagement.getTsFileList(true)),
+              tsFileManagement.getTsFileList(false),
+              storageGroupSysDir.getPath(),
+              tsFileManagement::mergeEndAction,
+              taskName,
+              IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(),
+              storageGroupName);
       logger.info("{} a RecoverMergeTask {} starts...", storageGroupName, taskName);
-      recoverMergeTask
-          .recoverMerge(IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
+      recoverMergeTask.recoverMerge(
+          IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
       if (!IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()) {
         mergingMods.delete();
       }
       recoverCompaction();
       for (TsFileResource resource : tsFileManagement.getTsFileList(true)) {
         long partitionNum = resource.getTimePartition();
-        partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
+        partitionDirectFileVersions
+            .computeIfAbsent(partitionNum, p -> new HashSet<>())
             .addAll(resource.getHistoricalVersions());
         updatePartitionFileVersion(partitionNum, resource.getMaxVersion());
       }
       for (TsFileResource resource : tsFileManagement.getTsFileList(false)) {
         long partitionNum = resource.getTimePartition();
-        partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
+        partitionDirectFileVersions
+            .computeIfAbsent(partitionNum, p -> new HashSet<>())
             .addAll(resource.getHistoricalVersions());
         updatePartitionFileVersion(partitionNum, resource.getMaxVersion());
       }
@@ -363,7 +363,8 @@ public class StorageGroupProcessor {
         long endTime = resource.getEndTime(index);
         endTimeMap.put(deviceId, endTime);
       }
-      latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>())
+      latestTimeForEachDevice
+          .computeIfAbsent(timePartitionId, l -> new HashMap<>())
           .putAll(endTimeMap);
       partitionLatestFlushedTimeForEachDevice
           .computeIfAbsent(timePartitionId, id -> new HashMap<>())
@@ -385,8 +386,7 @@ public class StorageGroupProcessor {
         logger.error("{} compaction submit task failed", storageGroupName);
       }
     } else {
-      logger.error("{} compaction pool not started ,recover failed",
-          storageGroupName);
+      logger.error("{} compaction pool not started ,recover failed", storageGroupName);
     }
   }
 
@@ -404,8 +404,8 @@ public class StorageGroupProcessor {
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   private void updateLatestFlushedTime() throws IOException {
 
-    VersionController versionController = new SimpleFileVersionController(
-        storageGroupSysDir.getPath());
+    VersionController versionController =
+        new SimpleFileVersionController(storageGroupSysDir.getPath());
     long currentVersion = versionController.currVersion();
     for (TsFileResource resource : upgradeSeqFileList) {
       for (Entry<String, Integer> entry : resource.getDeviceToIndexMap().entrySet()) {
@@ -413,27 +413,31 @@ public class StorageGroupProcessor {
         int index = entry.getValue();
         long endTime = resource.getEndTime(index);
         long endTimePartitionId = StorageEngine.getTimePartition(endTime);
-        latestTimeForEachDevice.computeIfAbsent(endTimePartitionId, l -> new HashMap<>())
+        latestTimeForEachDevice
+            .computeIfAbsent(endTimePartitionId, l -> new HashMap<>())
             .put(deviceId, endTime);
         globalLatestFlushedTimeForEachDevice.put(deviceId, endTime);
 
         // set all the covered partition's LatestFlushedTime to Long.MAX_VALUE
         long partitionId = StorageEngine.getTimePartition(resource.getStartTime(index));
         while (partitionId <= endTimePartitionId) {
-          partitionLatestFlushedTimeForEachDevice.computeIfAbsent(partitionId, l -> new HashMap<>())
+          partitionLatestFlushedTimeForEachDevice
+              .computeIfAbsent(partitionId, l -> new HashMap<>())
               .put(deviceId, Long.MAX_VALUE);
           if (!timePartitionIdVersionControllerMap.containsKey(partitionId)) {
-            File directory = SystemFileFactory.INSTANCE
-                .getFile(storageGroupSysDir, String.valueOf(partitionId));
+            File directory =
+                SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, String.valueOf(partitionId));
             if (!directory.exists()) {
               directory.mkdirs();
             }
-            File versionFile = SystemFileFactory.INSTANCE
-                .getFile(directory, SimpleFileVersionController.FILE_PREFIX + currentVersion);
+            File versionFile =
+                SystemFileFactory.INSTANCE.getFile(
+                    directory, SimpleFileVersionController.FILE_PREFIX + currentVersion);
             if (!versionFile.createNewFile()) {
               logger.warn("Version file {} has already been created ", versionFile);
             }
-            timePartitionIdVersionControllerMap.put(partitionId,
+            timePartitionIdVersionControllerMap.put(
+                partitionId,
                 new SimpleFileVersionController(storageGroupSysDir.getPath(), partitionId));
           }
           partitionId++;
@@ -449,7 +453,8 @@ public class StorageGroupProcessor {
    * @return version controller
    */
   private VersionController getVersionControllerByTimePartitionId(long timePartitionId) {
-    return timePartitionIdVersionControllerMap.computeIfAbsent(timePartitionId,
+    return timePartitionIdVersionControllerMap.computeIfAbsent(
+        timePartitionId,
         id -> {
           try {
             return new SimpleFileVersionController(storageGroupSysDir.getPath(), timePartitionId);
@@ -480,22 +485,20 @@ public class StorageGroupProcessor {
       // the process was interrupted before the merged files could be named
       continueFailedRenames(fileFolder, MERGE_SUFFIX);
 
-      File[] oldTsfileArray = fsFactory
-          .listFilesBySuffix(fileFolder.getAbsolutePath(), TSFILE_SUFFIX);
-      File[] oldResourceFileArray = fsFactory
-          .listFilesBySuffix(fileFolder.getAbsolutePath(), TsFileResource.RESOURCE_SUFFIX);
-      File[] oldModificationFileArray = fsFactory
-          .listFilesBySuffix(fileFolder.getAbsolutePath(), ModificationFile.FILE_SUFFIX);
+      File[] oldTsfileArray =
+          fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), TSFILE_SUFFIX);
+      File[] oldResourceFileArray =
+          fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), TsFileResource.RESOURCE_SUFFIX);
+      File[] oldModificationFileArray =
+          fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), ModificationFile.FILE_SUFFIX);
       File upgradeFolder = fsFactory.getFile(fileFolder, IoTDBConstant.UPGRADE_FOLDER_NAME);
       // move the old files to upgrade folder if exists
       if (oldTsfileArray.length != 0 || oldResourceFileArray.length != 0) {
         // create upgrade directory if not exist
         if (upgradeFolder.mkdirs()) {
-          logger.info("Upgrade Directory {} doesn't exist, create it",
-              upgradeFolder.getPath());
+          logger.info("Upgrade Directory {} doesn't exist, create it", upgradeFolder.getPath());
         } else if (!upgradeFolder.exists()) {
-          logger.error("Create upgrade Directory {} failed",
-              upgradeFolder.getPath());
+          logger.error("Create upgrade Directory {} failed", upgradeFolder.getPath());
         }
         // move .tsfile to upgrade folder
         for (File file : oldTsfileArray) {
@@ -516,12 +519,14 @@ public class StorageGroupProcessor {
           }
         }
 
-        Collections.addAll(upgradeFiles,
+        Collections.addAll(
+            upgradeFiles,
             fsFactory.listFilesBySuffix(upgradeFolder.getAbsolutePath(), TSFILE_SUFFIX));
       }
-      // if already move old files to upgradeFolder 
+      // if already move old files to upgradeFolder
       else if (upgradeFolder.exists()) {
-        Collections.addAll(upgradeFiles,
+        Collections.addAll(
+            upgradeFiles,
             fsFactory.listFilesBySuffix(upgradeFolder.getAbsolutePath(), TSFILE_SUFFIX));
       }
 
@@ -531,20 +536,22 @@ public class StorageGroupProcessor {
           if (!partitionFolder.isDirectory()) {
             logger.warn("{} is not a directory.", partitionFolder.getAbsolutePath());
           } else if (!partitionFolder.getName().equals(IoTDBConstant.UPGRADE_FOLDER_NAME)) {
-            // some TsFileResource may be being persisted when the system crashed, try recovering such
+            // some TsFileResource may be being persisted when the system crashed, try recovering
+            // such
             // resources
             continueFailedRenames(partitionFolder, TEMP_SUFFIX);
 
-            // some TsFiles were going to be replaced by the merged files when the system crashed and
+            // some TsFiles were going to be replaced by the merged files when the system crashed
+            // and
             // the process was interrupted before the merged files could be named
             continueFailedRenames(partitionFolder, MERGE_SUFFIX);
 
-            Collections.addAll(tsFiles,
+            Collections.addAll(
+                tsFiles,
                 fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), TSFILE_SUFFIX));
           }
         }
       }
-
     }
     tsFiles.sort(this::compareFileName);
     List<TsFileResource> ret = new ArrayList<>();
@@ -579,10 +586,13 @@ public class StorageGroupProcessor {
     for (int i = 0; i < tsFiles.size(); i++) {
       TsFileResource tsFileResource = tsFiles.get(i);
       long timePartitionId = tsFileResource.getTimePartition();
-      TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(
-          storageGroupName + FILE_NAME_SEPARATOR,
-          getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, isSeq,
-          i == tsFiles.size() - 1);
+      TsFileRecoverPerformer recoverPerformer =
+          new TsFileRecoverPerformer(
+              storageGroupName + FILE_NAME_SEPARATOR,
+              getVersionControllerByTimePartitionId(timePartitionId),
+              tsFileResource,
+              isSeq,
+              i == tsFiles.size() - 1);
 
       RestorableTsFileIOWriter writer;
       try {
@@ -600,8 +610,8 @@ public class StorageGroupProcessor {
           writer = recoverPerformer.recover(true);
         }
       } catch (StorageGroupProcessorException e) {
-        logger.warn("Skip TsFile: {} because of error in recover: ", tsFileResource.getTsFilePath(),
-            e);
+        logger.warn(
+            "Skip TsFile: {} because of error in recover: ", tsFileResource.getTsFilePath(), e);
         continue;
       }
 
@@ -612,29 +622,41 @@ public class StorageGroupProcessor {
         // the last file is not closed, continue writing to in
         TsFileProcessor tsFileProcessor;
         if (isSeq) {
-          tsFileProcessor = new TsFileProcessor(storageGroupName, storageGroupInfo, tsFileResource,
-              getVersionControllerByTimePartitionId(timePartitionId),
-              this::closeUnsealedTsFileProcessorCallBack, this::updateLatestFlushTimeCallback,
-              true, writer);
+          tsFileProcessor =
+              new TsFileProcessor(
+                  storageGroupName,
+                  storageGroupInfo,
+                  tsFileResource,
+                  getVersionControllerByTimePartitionId(timePartitionId),
+                  this::closeUnsealedTsFileProcessorCallBack,
+                  this::updateLatestFlushTimeCallback,
+                  true,
+                  writer);
           if (enableMemControl) {
             TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(storageGroupInfo);
             tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
             this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
-            tsFileProcessorInfo.addTSPMemCost(tsFileProcessor
-                .getTsFileResource().calculateRamSize());
+            tsFileProcessorInfo.addTSPMemCost(
+                tsFileProcessor.getTsFileResource().calculateRamSize());
           }
           workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
         } else {
-          tsFileProcessor = new TsFileProcessor(storageGroupName, storageGroupInfo, tsFileResource,
-              getVersionControllerByTimePartitionId(timePartitionId),
-              this::closeUnsealedTsFileProcessorCallBack, this::unsequenceFlushCallback, false,
-              writer);
+          tsFileProcessor =
+              new TsFileProcessor(
+                  storageGroupName,
+                  storageGroupInfo,
+                  tsFileResource,
+                  getVersionControllerByTimePartitionId(timePartitionId),
+                  this::closeUnsealedTsFileProcessorCallBack,
+                  this::unsequenceFlushCallback,
+                  false,
+                  writer);
           if (enableMemControl) {
             TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(storageGroupInfo);
             tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
             this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
-            tsFileProcessorInfo.addTSPMemCost(tsFileProcessor
-                .getTsFileResource().calculateRamSize());
+            tsFileProcessorInfo.addTSPMemCost(
+                tsFileProcessor.getTsFileResource().calculateRamSize());
           }
           workUnsequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
         }
@@ -661,10 +683,8 @@ public class StorageGroupProcessor {
 
   // ({systemTime}-{versionNum}-{mergeNum}.tsfile)
   private int compareFileName(File o1, File o2) {
-    String[] items1 = o1.getName().replace(TSFILE_SUFFIX, "")
-        .split(FILE_NAME_SEPARATOR);
-    String[] items2 = o2.getName().replace(TSFILE_SUFFIX, "")
-        .split(FILE_NAME_SEPARATOR);
+    String[] items1 = o1.getName().replace(TSFILE_SUFFIX, "").split(FILE_NAME_SEPARATOR);
+    String[] items2 = o2.getName().replace(TSFILE_SUFFIX, "").split(FILE_NAME_SEPARATOR);
     long ver1 = Long.parseLong(items1[0]);
     long ver2 = Long.parseLong(items2[0]);
     int cmp = Long.compare(ver1, ver2);
@@ -688,16 +708,18 @@ public class StorageGroupProcessor {
       // init map
       long timePartitionId = StorageEngine.getTimePartition(insertRowPlan.getTime());
 
-      partitionLatestFlushedTimeForEachDevice
-          .computeIfAbsent(timePartitionId, id -> new HashMap<>());
+      partitionLatestFlushedTimeForEachDevice.computeIfAbsent(
+          timePartitionId, id -> new HashMap<>());
 
       boolean isSequence =
-          insertRowPlan.getTime() > partitionLatestFlushedTimeForEachDevice.get(timePartitionId)
-              .getOrDefault(insertRowPlan.getDeviceId().getFullPath(), Long.MIN_VALUE);
-
-      //is unsequence and user set config to discard out of order data
-      if (!isSequence && IoTDBDescriptor.getInstance().getConfig()
-          .isEnableDiscardOutOfOrderData()) {
+          insertRowPlan.getTime()
+              > partitionLatestFlushedTimeForEachDevice
+                  .get(timePartitionId)
+                  .getOrDefault(insertRowPlan.getDeviceId().getFullPath(), Long.MIN_VALUE);
+
+      // is unsequence and user set config to discard out of order data
+      if (!isSequence
+          && IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
         return;
       }
 
@@ -740,8 +762,10 @@ public class StorageGroupProcessor {
         long currTime = insertTabletPlan.getTimes()[loc];
         // skip points that do not satisfy TTL
         if (!isAlive(currTime)) {
-          results[loc] = RpcUtils.getStatus(TSStatusCode.OUT_OF_TTL_ERROR,
-              "time " + currTime + " in current line is out of TTL: " + dataTTL);
+          results[loc] =
+              RpcUtils.getStatus(
+                  TSStatusCode.OUT_OF_TTL_ERROR,
+                  "time " + currTime + " in current line is out of TTL: " + dataTTL);
           loc++;
           noFailure = false;
         } else {
@@ -755,12 +779,13 @@ public class StorageGroupProcessor {
       // before is first start point
       int before = loc;
       // before time partition
-      long beforeTimePartition = StorageEngine
-          .getTimePartition(insertTabletPlan.getTimes()[before]);
+      long beforeTimePartition =
+          StorageEngine.getTimePartition(insertTabletPlan.getTimes()[before]);
       // init map
-      long lastFlushTime = partitionLatestFlushedTimeForEachDevice.
-          computeIfAbsent(beforeTimePartition, id -> new HashMap<>()).
-          computeIfAbsent(insertTabletPlan.getDeviceId().getFullPath(), id -> Long.MIN_VALUE);
+      long lastFlushTime =
+          partitionLatestFlushedTimeForEachDevice
+              .computeIfAbsent(beforeTimePartition, id -> new HashMap<>())
+              .computeIfAbsent(insertTabletPlan.getDeviceId().getFullPath(), id -> Long.MIN_VALUE);
       // if is sequence
       boolean isSequence = false;
       while (loc < insertTabletPlan.getRowCount()) {
@@ -769,18 +794,21 @@ public class StorageGroupProcessor {
         // start next partition
         if (curTimePartition != beforeTimePartition) {
           // insert last time partition
-          if (isSequence || !IoTDBDescriptor.getInstance().getConfig()
-              .isEnableDiscardOutOfOrderData()) {
-            noFailure = insertTabletToTsFileProcessor(insertTabletPlan, before, loc, isSequence,
-                results,
-                beforeTimePartition) && noFailure;
+          if (isSequence
+              || !IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
+            noFailure =
+                insertTabletToTsFileProcessor(
+                        insertTabletPlan, before, loc, isSequence, results, beforeTimePartition)
+                    && noFailure;
           }
           // re initialize
           before = loc;
           beforeTimePartition = curTimePartition;
-          lastFlushTime = partitionLatestFlushedTimeForEachDevice.
-              computeIfAbsent(beforeTimePartition, id -> new HashMap<>()).
-              computeIfAbsent(insertTabletPlan.getDeviceId().getFullPath(), id -> Long.MIN_VALUE);
+          lastFlushTime =
+              partitionLatestFlushedTimeForEachDevice
+                  .computeIfAbsent(beforeTimePartition, id -> new HashMap<>())
+                  .computeIfAbsent(
+                      insertTabletPlan.getDeviceId().getFullPath(), id -> Long.MIN_VALUE);
           isSequence = false;
         }
         // still in this partition
@@ -790,8 +818,9 @@ public class StorageGroupProcessor {
             // insert into unsequence and then start sequence
             if (!IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
               noFailure =
-                  insertTabletToTsFileProcessor(insertTabletPlan, before, loc, false, results,
-                      beforeTimePartition) && noFailure;
+                  insertTabletToTsFileProcessor(
+                          insertTabletPlan, before, loc, false, results, beforeTimePartition)
+                      && noFailure;
             }
             before = loc;
             isSequence = true;
@@ -801,13 +830,17 @@ public class StorageGroupProcessor {
       }
 
       // do not forget last part
-      if (before < loc && (isSequence || !IoTDBDescriptor.getInstance().getConfig()
-          .isEnableDiscardOutOfOrderData())) {
-        noFailure = insertTabletToTsFileProcessor(insertTabletPlan, before, loc, isSequence,
-            results, beforeTimePartition) && noFailure;
-      }
-      long globalLatestFlushedTime = globalLatestFlushedTimeForEachDevice.getOrDefault(
-          insertTabletPlan.getDeviceId().getFullPath(), Long.MIN_VALUE);
+      if (before < loc
+          && (isSequence
+              || !IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData())) {
+        noFailure =
+            insertTabletToTsFileProcessor(
+                    insertTabletPlan, before, loc, isSequence, results, beforeTimePartition)
+                && noFailure;
+      }
+      long globalLatestFlushedTime =
+          globalLatestFlushedTimeForEachDevice.getOrDefault(
+              insertTabletPlan.getDeviceId().getFullPath(), Long.MIN_VALUE);
       tryToUpdateBatchInsertLastCache(insertTabletPlan, globalLatestFlushedTime);
 
       if (!noFailure) {
@@ -818,9 +851,7 @@ public class StorageGroupProcessor {
     }
   }
 
-  /**
-   * @return whether the given time falls in ttl
-   */
+  /** @return whether the given time falls in ttl */
   private boolean isAlive(long time) {
     return dataTTL == Long.MAX_VALUE || (System.currentTimeMillis() - time) <= dataTTL;
   }
@@ -837,8 +868,13 @@ public class StorageGroupProcessor {
    * @param timePartitionId time partition id
    * @return false if any failure occurs when inserting the tablet, true otherwise
    */
-  private boolean insertTabletToTsFileProcessor(InsertTabletPlan insertTabletPlan,
-      int start, int end, boolean sequence, TSStatus[] results, long timePartitionId) {
+  private boolean insertTabletToTsFileProcessor(
+      InsertTabletPlan insertTabletPlan,
+      int start,
+      int end,
+      boolean sequence,
+      TSStatus[] results,
+      long timePartitionId) {
     // return when start >= end
     if (start >= end) {
       return true;
@@ -847,8 +883,10 @@ public class StorageGroupProcessor {
     TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence);
     if (tsFileProcessor == null) {
       for (int i = start; i < end; i++) {
-        results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR,
-            "can not create TsFileProcessor, timePartitionId: " + timePartitionId);
+        results[i] =
+            RpcUtils.getStatus(
+                TSStatusCode.INTERNAL_SERVER_ERROR,
+                "can not create TsFileProcessor, timePartitionId: " + timePartitionId);
       }
       return false;
     }
@@ -865,10 +903,13 @@ public class StorageGroupProcessor {
 
     latestTimeForEachDevice.computeIfAbsent(timePartitionId, t -> new HashMap<>());
     // try to update the latest time of the device of this tsRecord
-    if (sequence && latestTimeForEachDevice.get(timePartitionId)
-        .getOrDefault(insertTabletPlan.getDeviceId().getFullPath(), Long.MIN_VALUE)
-        < insertTabletPlan.getTimes()[end - 1]) {
-      latestTimeForEachDevice.get(timePartitionId)
+    if (sequence
+        && latestTimeForEachDevice
+                .get(timePartitionId)
+                .getOrDefault(insertTabletPlan.getDeviceId().getFullPath(), Long.MIN_VALUE)
+            < insertTabletPlan.getTimes()[end - 1]) {
+      latestTimeForEachDevice
+          .get(timePartitionId)
           .put(insertTabletPlan.getDeviceId().getFullPath(), insertTabletPlan.getTimes()[end - 1]);
     }
 
@@ -890,14 +931,18 @@ public class StorageGroupProcessor {
       }
       // Update cached last value with high priority
       if (mNodes[i] != null) {
-        // in stand alone version, the seriesPath is not needed, just use measurementMNodes[i] to update last cache
-        IoTDB.metaManager.updateLastCache(null,
-            plan.composeLastTimeValuePair(i), true, latestFlushedTime, mNodes[i]);
+        // in stand alone version, the seriesPath is not needed, just use measurementMNodes[i] to
+        // update last cache
+        IoTDB.metaManager.updateLastCache(
+            null, plan.composeLastTimeValuePair(i), true, latestFlushedTime, mNodes[i]);
       } else {
         // measurementMNodes[i] is null, use the path to update remote cache
-        IoTDB.metaManager
-            .updateLastCache(plan.getDeviceId().concatNode(plan.getMeasurements()[i]),
-                plan.composeLastTimeValuePair(i), true, latestFlushedTime, null);
+        IoTDB.metaManager.updateLastCache(
+            plan.getDeviceId().concatNode(plan.getMeasurements()[i]),
+            plan.composeLastTimeValuePair(i),
+            true,
+            latestFlushedTime,
+            null);
       }
     }
   }
@@ -915,15 +960,18 @@ public class StorageGroupProcessor {
     tsFileProcessor.insert(insertRowPlan);
 
     // try to update the latest time of the device of this tsRecord
-    if (latestTimeForEachDevice.get(timePartitionId)
-        .getOrDefault(insertRowPlan.getDeviceId().getFullPath(), Long.MIN_VALUE) < insertRowPlan
-        .getTime()) {
-      latestTimeForEachDevice.get(timePartitionId)
+    if (latestTimeForEachDevice
+            .get(timePartitionId)
+            .getOrDefault(insertRowPlan.getDeviceId().getFullPath(), Long.MIN_VALUE)
+        < insertRowPlan.getTime()) {
+      latestTimeForEachDevice
+          .get(timePartitionId)
           .put(insertRowPlan.getDeviceId().getFullPath(), insertRowPlan.getTime());
     }
 
-    long globalLatestFlushTime = globalLatestFlushedTimeForEachDevice.getOrDefault(
-        insertRowPlan.getDeviceId().getFullPath(), Long.MIN_VALUE);
+    long globalLatestFlushTime =
+        globalLatestFlushedTimeForEachDevice.getOrDefault(
+            insertRowPlan.getDeviceId().getFullPath(), Long.MIN_VALUE);
 
     tryToUpdateInsertLastCache(insertRowPlan, globalLatestFlushTime);
 
@@ -944,13 +992,17 @@ public class StorageGroupProcessor {
       }
       // Update cached last value with high priority
       if (mNodes[i] != null) {
-        // in stand alone version, the seriesPath is not needed, just use measurementMNodes[i] to update last cache
-        IoTDB.metaManager.updateLastCache(null,
-            plan.composeTimeValuePair(i), true, latestFlushedTime, mNodes[i]);
+        // in stand alone version, the seriesPath is not needed, just use measurementMNodes[i] to
+        // update last cache
+        IoTDB.metaManager.updateLastCache(
+            null, plan.composeTimeValuePair(i), true, latestFlushedTime, mNodes[i]);
       } else {
-        IoTDB.metaManager
-            .updateLastCache(plan.getDeviceId().concatNode(plan.getMeasurements()[i]),
-                plan.composeTimeValuePair(i), true, latestFlushedTime, null);
+        IoTDB.metaManager.updateLastCache(
+            plan.getDeviceId().concatNode(plan.getMeasurements()[i]),
+            plan.composeTimeValuePair(i),
+            true,
+            latestFlushedTime,
+            null);
       }
     }
   }
@@ -958,8 +1010,8 @@ public class StorageGroupProcessor {
   public void asyncFlushMemTableInTsFileProcessor(TsFileProcessor tsFileProcessor) {
     writeLock();
     try {
-      if (!closingSequenceTsFileProcessor.contains(tsFileProcessor) &&
-          !closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
+      if (!closingSequenceTsFileProcessor.contains(tsFileProcessor)
+          && !closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
         fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
       }
     } finally {
@@ -971,11 +1023,11 @@ public class StorageGroupProcessor {
     TsFileProcessor tsFileProcessor = null;
     try {
       if (sequence) {
-        tsFileProcessor = getOrCreateTsFileProcessorIntern(timeRangeId,
-            workSequenceTsFileProcessors, true);
+        tsFileProcessor =
+            getOrCreateTsFileProcessorIntern(timeRangeId, workSequenceTsFileProcessors, true);
       } else {
-        tsFileProcessor = getOrCreateTsFileProcessorIntern(timeRangeId,
-            workUnsequenceTsFileProcessors, false);
+        tsFileProcessor =
+            getOrCreateTsFileProcessorIntern(timeRangeId, workUnsequenceTsFileProcessors, false);
       }
     } catch (DiskSpaceInsufficientException e) {
       logger.error(
@@ -983,9 +1035,8 @@ public class StorageGroupProcessor {
           e);
       IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
     } catch (IOException e) {
-      logger
-          .error("meet IOException when creating TsFileProcessor, change system mode to read-only",
-              e);
+      logger.error(
+          "meet IOException when creating TsFileProcessor, change system mode to read-only", e);
       IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
     }
     return tsFileProcessor;
@@ -998,9 +1049,8 @@ public class StorageGroupProcessor {
    * @param tsFileProcessorTreeMap tsFileProcessorTreeMap
    * @param sequence whether is sequence or not
    */
-  private TsFileProcessor getOrCreateTsFileProcessorIntern(long timeRangeId,
-      TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap,
-      boolean sequence)
+  private TsFileProcessor getOrCreateTsFileProcessorIntern(
+      long timeRangeId, TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap, boolean sequence)
       throws IOException, DiskSpaceInsufficientException {
 
     TsFileProcessor res;
@@ -1015,7 +1065,8 @@ public class StorageGroupProcessor {
           Map.Entry<Long, TsFileProcessor> processorEntry = tsFileProcessorTreeMap.firstEntry();
           logger.info(
               "will close a {} TsFile because too many active partitions ({} > {}) in the storage group {},",
-              sequence, tsFileProcessorTreeMap.size(),
+              sequence,
+              tsFileProcessorTreeMap.size(),
               IoTDBDescriptor.getInstance().getConfig().getConcurrentWritingTimePartition(),
               storageGroupName);
           asyncCloseOneTsFileProcessor(sequence, processorEntry.getValue());
@@ -1038,7 +1089,6 @@ public class StorageGroupProcessor {
     return res;
   }
 
-
   private TsFileProcessor createTsFileProcessor(boolean sequence, long timePartitionId)
       throws IOException, DiskSpaceInsufficientException {
     String baseDir;
@@ -1050,37 +1100,49 @@ public class StorageGroupProcessor {
     fsFactory.getFile(baseDir, storageGroupName).mkdirs();
 
     String filePath =
-        baseDir + File.separator + storageGroupName + File.separator + timePartitionId
+        baseDir
+            + File.separator
+            + storageGroupName
+            + File.separator
+            + timePartitionId
             + File.separator
             + getNewTsFileName(timePartitionId);
 
     TsFileProcessor tsFileProcessor;
     VersionController versionController = getVersionControllerByTimePartitionId(timePartitionId);
     if (sequence) {
-      tsFileProcessor = new TsFileProcessor(storageGroupName,
-          fsFactory.getFileWithParent(filePath), storageGroupInfo,
-          versionController, this::closeUnsealedTsFileProcessorCallBack,
-          this::updateLatestFlushTimeCallback, true,
-          partitionMaxFileVersions.getOrDefault(timePartitionId, 0L));
+      tsFileProcessor =
+          new TsFileProcessor(
+              storageGroupName,
+              fsFactory.getFileWithParent(filePath),
+              storageGroupInfo,
+              versionController,
+              this::closeUnsealedTsFileProcessorCallBack,
+              this::updateLatestFlushTimeCallback,
+              true,
+              partitionMaxFileVersions.getOrDefault(timePartitionId, 0L));
       if (enableMemControl) {
         TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(storageGroupInfo);
         tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
         this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
-        tsFileProcessorInfo.addTSPMemCost(tsFileProcessor
-            .getTsFileResource().calculateRamSize());
+        tsFileProcessorInfo.addTSPMemCost(tsFileProcessor.getTsFileResource().calculateRamSize());
       }
     } else {
-      tsFileProcessor = new TsFileProcessor(storageGroupName,
-          fsFactory.getFileWithParent(filePath), storageGroupInfo,
-          versionController, this::closeUnsealedTsFileProcessorCallBack,
-          this::unsequenceFlushCallback, false,
-          partitionMaxFileVersions.getOrDefault(timePartitionId, 0L));
+      tsFileProcessor =
+          new TsFileProcessor(
+              storageGroupName,
+              fsFactory.getFileWithParent(filePath),
+              storageGroupInfo,
+              versionController,
+              this::closeUnsealedTsFileProcessorCallBack,
+              this::unsequenceFlushCallback,
+              false,
+              partitionMaxFileVersions.getOrDefault(timePartitionId, 0L));
       if (enableMemControl) {
         TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(storageGroupInfo);
         tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
         this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
-        tsFileProcessorInfo.addTSPMemCost(tsFileProcessor
-            .getTsFileResource().calculateRamSize());
+        tsFileProcessorInfo.addTSPMemCost(tsFileProcessor.getTsFileResource().calculateRamSize());
       }
     }
     tsFileProcessor.addCloseFileListeners(customCloseFileListeners);
@@ -1103,8 +1165,7 @@ public class StorageGroupProcessor {
   }
 
   private String getNewTsFileName(long time, long version, int mergeCnt) {
-    return time + FILE_NAME_SEPARATOR + version
-        + FILE_NAME_SEPARATOR + mergeCnt + TSFILE_SUFFIX;
+    return time + FILE_NAME_SEPARATOR + version + FILE_NAME_SEPARATOR + mergeCnt + TSFILE_SUFFIX;
   }
 
   public void syncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) {
@@ -1116,31 +1177,33 @@ public class StorageGroupProcessor {
             || closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
           closeStorageGroupCondition.wait(60_000);
           if (System.currentTimeMillis() - startTime > 60_000) {
-            logger
-                .warn("{} has spent {}s to wait for closing one tsfile.", this.storageGroupName,
-                    (System.currentTimeMillis() - startTime) / 1000);
+            logger.warn(
+                "{} has spent {}s to wait for closing one tsfile.",
+                this.storageGroupName,
+                (System.currentTimeMillis() - startTime) / 1000);
           }
         }
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
-        logger
-            .error("syncCloseOneTsFileProcessor error occurs while waiting for closing the storage "
-                + "group {}", storageGroupName, e);
+        logger.error(
+            "syncCloseOneTsFileProcessor error occurs while waiting for closing the storage "
+                + "group {}",
+            storageGroupName,
+            e);
       }
     }
   }
 
-  /**
-   * thread-safety should be ensured by caller
-   */
+  /** thread-safety should be ensured by caller */
   public void asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) {
-    //for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed.
-    //for unsequence tsfile, we have maintained the endTimeMap when an insertion comes.
-    if (closingSequenceTsFileProcessor.contains(tsFileProcessor) ||
-        closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
+    // for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed.
+    // for unsequence tsfile, we have maintained the endTimeMap when an insertion comes.
+    if (closingSequenceTsFileProcessor.contains(tsFileProcessor)
+        || closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
       return;
     }
-    logger.info("Async close tsfile: {}",
+    logger.info(
+        "Async close tsfile: {}",
         tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath());
     if (sequence) {
       closingSequenceTsFileProcessor.add(tsFileProcessor);
@@ -1148,7 +1211,8 @@ public class StorageGroupProcessor {
       tsFileProcessor.asyncClose();
 
       workSequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
-      // if unsequence files don't contain this time range id, we should remove it's version controller
+      // if unsequence files don't contain this time range id, we should remove it's version
+      // controller
       if (!workUnsequenceTsFileProcessors.containsKey(tsFileProcessor.getTimeRangeId())) {
         timePartitionIdVersionControllerMap.remove(tsFileProcessor.getTimeRangeId());
       }
@@ -1158,16 +1222,15 @@ public class StorageGroupProcessor {
       tsFileProcessor.asyncClose();
 
       workUnsequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
-      // if sequence files don't contain this time range id, we should remove it's version controller
+      // if sequence files don't contain this time range id, we should remove it's version
+      // controller
       if (!workSequenceTsFileProcessors.containsKey(tsFileProcessor.getTimeRangeId())) {
         timePartitionIdVersionControllerMap.remove(tsFileProcessor.getTimeRangeId());
       }
     }
   }
 
-  /**
-   * delete the storageGroup's own folder in folder data/system/storage_groups
-   */
+  /** delete the storageGroup's own folder in folder data/system/storage_groups */
   public void deleteFolder(String systemDir) {
     logger.info("{} will close all files for deleting data folder {}", storageGroupName, systemDir);
     writeLock();
@@ -1205,16 +1268,17 @@ public class StorageGroupProcessor {
     logger.info("{} will close all files for deleting data files", storageGroupName);
     writeLock();
     syncCloseAllWorkingTsFileProcessors();
-    //normally, mergingModification is just need to be closed by after a merge task is finished.
-    //we close it here just for IT test.
+    // normally, mergingModification is just need to be closed by after a merge task is finished.
+    // we close it here just for IT test.
     if (this.tsFileManagement.mergingModification != null) {
       try {
         this.tsFileManagement.mergingModification.close();
       } catch (IOException e) {
-        logger.error("Cannot close the mergingMod file {}",
-            this.tsFileManagement.mergingModification.getFilePath(), e);
+        logger.error(
+            "Cannot close the mergingMod file {}",
+            this.tsFileManagement.mergingModification.getFilePath(),
+            e);
       }
-
     }
     try {
       closeAllResources();
@@ -1246,9 +1310,7 @@ public class StorageGroupProcessor {
     }
   }
 
-  /**
-   * Iterate each TsFile and try to lock and remove those out of TTL.
-   */
+  /** Iterate each TsFile and try to lock and remove those out of TTL. */
   public synchronized void checkFilesTTL() {
     if (dataTTL == Long.MAX_VALUE) {
       logger.debug("{}: TTL not set, ignore the check", storageGroupName);
@@ -1272,7 +1334,8 @@ public class StorageGroupProcessor {
   }
 
   private void checkFileTTL(TsFileResource resource, long timeLowerBound, boolean isSeq) {
-    if (resource.isMerging() || !resource.isClosed()
+    if (resource.isMerging()
+        || !resource.isClosed()
         || !resource.isDeleted() && resource.stillLives(timeLowerBound)) {
       return;
     }
@@ -1293,8 +1356,11 @@ public class StorageGroupProcessor {
           // physical removal
           resource.remove();
           if (logger.isInfoEnabled()) {
-            logger.info("Removed a file {} before {} by ttl ({}ms)", resource.getTsFilePath(),
-                new Date(timeLowerBound), dataTTL);
+            logger.info(
+                "Removed a file {} before {} by ttl ({}ms)",
+                resource.getTsFilePath(),
+                new Date(timeLowerBound),
+                dataTTL);
           }
           tsFileManagement.remove(resource, isSeq);
         } finally {
@@ -1306,25 +1372,28 @@ public class StorageGroupProcessor {
     }
   }
 
-  /**
-   * This method will be blocked until all tsfile processors are closed.
-   */
+  /** This method will be blocked until all tsfile processors are closed. */
   public void syncCloseAllWorkingTsFileProcessors() {
     synchronized (closeStorageGroupCondition) {
       try {
         asyncCloseAllWorkingTsFileProcessors();
         long startTime = System.currentTimeMillis();
-        while (!closingSequenceTsFileProcessor.isEmpty() || !closingUnSequenceTsFileProcessor
-            .isEmpty()) {
+        while (!closingSequenceTsFileProcessor.isEmpty()
+            || !closingUnSequenceTsFileProcessor.isEmpty()) {
           closeStorageGroupCondition.wait(60_000);
           if (System.currentTimeMillis() - startTime > 60_000) {
-            logger.warn("{} has spent {}s to wait for closing all TsFiles.", this.storageGroupName,
+            logger.warn(
+                "{} has spent {}s to wait for closing all TsFiles.",
+                this.storageGroupName,
                 (System.currentTimeMillis() - startTime) / 1000);
           }
         }
       } catch (InterruptedException e) {
-        logger.error("CloseFileNodeCondition error occurs while waiting for closing the storage "
-            + "group {}", storageGroupName, e);
+        logger.error(
+            "CloseFileNodeCondition error occurs while waiting for closing the storage "
+                + "group {}",
+            storageGroupName,
+            e);
         Thread.currentThread().interrupt();
       }
     }
@@ -1335,13 +1404,13 @@ public class StorageGroupProcessor {
     try {
       logger.info("async force close all files in storage group: {}", storageGroupName);
       // to avoid concurrent modification problem, we need a new array list
-      for (TsFileProcessor tsFileProcessor : new ArrayList<>(
-          workSequenceTsFileProcessors.values())) {
+      for (TsFileProcessor tsFileProcessor :
+          new ArrayList<>(workSequenceTsFileProcessors.values())) {
         asyncCloseOneTsFileProcessor(true, tsFileProcessor);
       }
       // to avoid concurrent modification problem, we need a new array list
-      for (TsFileProcessor tsFileProcessor : new ArrayList<>(
-          workUnsequenceTsFileProcessors.values())) {
+      for (TsFileProcessor tsFileProcessor :
+          new ArrayList<>(workUnsequenceTsFileProcessors.values())) {
         asyncCloseOneTsFileProcessor(false, tsFileProcessor);
       }
     } finally {
@@ -1354,13 +1423,13 @@ public class StorageGroupProcessor {
     try {
       logger.info("force close all processors in storage group: {}", storageGroupName);
       // to avoid concurrent modification problem, we need a new array list
-      for (TsFileProcessor tsFileProcessor : new ArrayList<>(
-          workSequenceTsFileProcessors.values())) {
+      for (TsFileProcessor tsFileProcessor :
+          new ArrayList<>(workSequenceTsFileProcessors.values())) {
         tsFileProcessor.putMemTableBackAndClose();
       }
       // to avoid concurrent modification problem, we need a new array list
-      for (TsFileProcessor tsFileProcessor : new ArrayList<>(
-          workUnsequenceTsFileProcessors.values())) {
+      for (TsFileProcessor tsFileProcessor :
+          new ArrayList<>(workUnsequenceTsFileProcessors.values())) {
         tsFileProcessor.putMemTableBackAndClose();
       }
     } finally {
@@ -1369,18 +1438,34 @@ public class StorageGroupProcessor {
   }
 
   // TODO need a read lock, please consider the concurrency with flush manager threads.
-  public QueryDataSource query(PartialPath deviceId, String measurementId, QueryContext context,
-      QueryFileManager filePathsManager, Filter timeFilter) throws QueryProcessException {
+  public QueryDataSource query(
+      PartialPath deviceId,
+      String measurementId,
+      QueryContext context,
+      QueryFileManager filePathsManager,
+      Filter timeFilter)
+      throws QueryProcessException {
     insertLock.readLock().lock();
     try {
-      List<TsFileResource> seqResources = getFileResourceListForQuery(
-          tsFileManagement.getTsFileList(true),
-          upgradeSeqFileList, deviceId, measurementId, context, timeFilter, true);
-      List<TsFileResource> unseqResources = getFileResourceListForQuery(
-          tsFileManagement.getTsFileList(false),
-          upgradeUnseqFileList, deviceId, measurementId, context, timeFilter, false);
-      QueryDataSource dataSource = new QueryDataSource(deviceId,
-          seqResources, unseqResources);
+      List<TsFileResource> seqResources =
+          getFileResourceListForQuery(
+              tsFileManagement.getTsFileList(true),
+              upgradeSeqFileList,
+              deviceId,
+              measurementId,
+              context,
+              timeFilter,
+              true);
+      List<TsFileResource> unseqResources =
+          getFileResourceListForQuery(
+              tsFileManagement.getTsFileList(false),
+              upgradeUnseqFileList,
+              deviceId,
+              measurementId,
+              context,
+              timeFilter,
+              false);
+      QueryDataSource dataSource = new QueryDataSource(deviceId, seqResources, unseqResources);
       // used files should be added before mergeLock is unlocked, or they may be deleted by
       // running merge
       // is null only in tests
@@ -1404,32 +1489,40 @@ public class StorageGroupProcessor {
     insertLock.writeLock().unlock();
   }
 
-
   /**
    * @param tsFileResources includes sealed and unsealed tsfile resources
    * @return fill unsealed tsfile resources with memory data and ChunkMetadataList of data in disk
    */
   private List<TsFileResource> getFileResourceListForQuery(
-      Collection<TsFileResource> tsFileResources, List<TsFileResource> upgradeTsFileResources,
-      PartialPath deviceId, String measurementId, QueryContext context, Filter timeFilter,
+      Collection<TsFileResource> tsFileResources,
+      List<TsFileResource> upgradeTsFileResources,
+      PartialPath deviceId,
+      String measurementId,
+      QueryContext context,
+      Filter timeFilter,
       boolean isSeq)
       throws MetadataException {
 
     if (context.isDebug()) {
-      DEBUG_LOGGER
-          .info("Path: {}.{}, get tsfile list: {} isSeq: {} timefilter: {}", deviceId.getFullPath(),
-              measurementId, tsFileResources, isSeq, (timeFilter == null ? "null" : timeFilter));
+      DEBUG_LOGGER.info(
+          "Path: {}.{}, get tsfile list: {} isSeq: {} timefilter: {}",
+          deviceId.getFullPath(),
+          measurementId,
+          tsFileResources,
+          isSeq,
+          (timeFilter == null ? "null" : timeFilter));
     }
 
     MeasurementSchema schema = IoTDB.metaManager.getSeriesSchema(deviceId, measurementId);
 
     List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
-    long timeLowerBound = dataTTL != Long.MAX_VALUE ? System.currentTimeMillis() - dataTTL : Long
-        .MIN_VALUE;
+    long timeLowerBound =
+        dataTTL != Long.MAX_VALUE ? System.currentTimeMillis() - dataTTL : Long.MIN_VALUE;
     context.setQueryTimeLowerBound(timeLowerBound);
 
     for (TsFileResource tsFileResource : tsFileResources) {
-      if (!isTsFileResourceSatisfied(tsFileResource, deviceId.getFullPath(), timeFilter, isSeq, context.isDebug())) {
+      if (!isTsFileResourceSatisfied(
+          tsFileResource, deviceId.getFullPath(), timeFilter, isSeq, context.isDebug())) {
         continue;
       }
       closeQueryLock.readLock().lock();
@@ -1437,10 +1530,16 @@ public class StorageGroupProcessor {
         if (tsFileResource.isClosed()) {
           tsfileResourcesForQuery.add(tsFileResource);
         } else {
-          tsFileResource.getUnsealedFileProcessor()
-              .query(deviceId.getFullPath(), measurementId, schema.getType(),
+          tsFileResource
+              .getUnsealedFileProcessor()
+              .query(
+                  deviceId.getFullPath(),
+                  measurementId,
+                  schema.getType(),
                   schema.getEncodingType(),
-                  schema.getProps(), context, tsfileResourcesForQuery);
+                  schema.getProps(),
+                  context,
+                  tsfileResourcesForQuery);
         }
       } catch (IOException e) {
         throw new MetadataException(e);
@@ -1450,7 +1549,8 @@ public class StorageGroupProcessor {
     }
     // for upgrade files and old files must be closed
     for (TsFileResource tsFileResource : upgradeTsFileResources) {
-      if (!isTsFileResourceSatisfied(tsFileResource, deviceId.getFullPath(), timeFilter, isSeq, context.isDebug())) {
+      if (!isTsFileResourceSatisfied(
+          tsFileResource, deviceId.getFullPath(), timeFilter, isSeq, context.isDebug())) {
         continue;
       }
       closeQueryLock.readLock().lock();
@@ -1463,28 +1563,32 @@ public class StorageGroupProcessor {
     return tsfileResourcesForQuery;
   }
 
-  /**
-   * @return true if the device is contained in the TsFile and it lives beyond TTL
-   */
-  private boolean isTsFileResourceSatisfied(TsFileResource tsFileResource, String deviceId,
-      Filter timeFilter, boolean isSeq, boolean debug) {
+  /** @return true if the device is contained in the TsFile and it lives beyond TTL */
+  private boolean isTsFileResourceSatisfied(
+      TsFileResource tsFileResource,
+      String deviceId,
+      Filter timeFilter,
+      boolean isSeq,
+      boolean debug) {
     if (!tsFileResource.containsDevice(deviceId)) {
       if (debug) {
-        DEBUG_LOGGER.info("Path: {} file {} is not satisfied because of no device!", deviceId,
-            tsFileResource);
+        DEBUG_LOGGER.info(
+            "Path: {} file {} is not satisfied because of no device!", deviceId, tsFileResource);
       }
       return false;
     }
 
     int deviceIndex = tsFileResource.getDeviceToIndexMap().get(deviceId);
     long startTime = tsFileResource.getStartTime(deviceIndex);
-    long endTime = tsFileResource.isClosed() || !isSeq ? tsFileResource.getEndTime(deviceIndex)
-        : Long.MAX_VALUE;
+    long endTime =
+        tsFileResource.isClosed() || !isSeq
+            ? tsFileResource.getEndTime(deviceIndex)
+            : Long.MAX_VALUE;
 
     if (!isAlive(endTime)) {
       if (debug) {
-        DEBUG_LOGGER
-            .info("Path: {} file {} is not satisfied because of ttl!", deviceId, tsFileResource);
+        DEBUG_LOGGER.info(
+            "Path: {} file {} is not satisfied because of ttl!", deviceId, tsFileResource);
       }
       return false;
     }
@@ -1492,8 +1596,8 @@ public class StorageGroupProcessor {
     if (timeFilter != null) {
       boolean res = timeFilter.satisfyStartEndTime(startTime, endTime);
       if (debug && !res) {
-        DEBUG_LOGGER.info("Path: {} file {} is not satisfied because of time filter!", deviceId,
-            tsFileResource);
+        DEBUG_LOGGER.info(
+            "Path: {} file {} is not satisfied because of time filter!", deviceId, tsFileResource);
       }
       return res;
     }
@@ -1512,7 +1616,7 @@ public class StorageGroupProcessor {
       throws IOException {
     // TODO: how to avoid partial deletion?
     // FIXME: notice that if we may remove a SGProcessor out of memory, we need to close all opened
-    //mod files in mergingModification, sequenceFileList, and unsequenceFileList
+    // mod files in mergingModification, sequenceFileList, and unsequenceFileList
     tsFileManagement.readLock();
     writeLock();
 
@@ -1535,10 +1639,10 @@ public class StorageGroupProcessor {
         updatedModFiles.add(tsFileManagement.mergingModification);
       }
 
-      deleteDataInFiles(tsFileManagement.getTsFileList(true), deletion, devicePaths,
-          updatedModFiles, planIndex);
-      deleteDataInFiles(tsFileManagement.getTsFileList(false), deletion, devicePaths,
-          updatedModFiles, planIndex);
+      deleteDataInFiles(
+          tsFileManagement.getTsFileList(true), deletion, devicePaths, updatedModFiles, planIndex);
+      deleteDataInFiles(
+          tsFileManagement.getTsFileList(false), deletion, devicePaths, updatedModFiles, planIndex);
 
     } catch (Exception e) {
       // roll back
@@ -1552,8 +1656,7 @@ public class StorageGroupProcessor {
     }
   }
 
-  private void logDeletion(long startTime, long endTime, PartialPath path)
-      throws IOException {
+  private void logDeletion(long startTime, long endTime, PartialPath path) throws IOException {
     long timePartitionStartId = StorageEngine.getTimePartition(startTime);
     long timePartitionEndId = StorageEngine.getTimePartition(endTime);
     if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
@@ -1572,25 +1675,32 @@ public class StorageGroupProcessor {
     }
   }
 
-  private boolean canSkipDelete(TsFileResource tsFileResource, Set<PartialPath> devicePaths,
-      long deleteStart, long deleteEnd) {
+  private boolean canSkipDelete(
+      TsFileResource tsFileResource,
+      Set<PartialPath> devicePaths,
+      long deleteStart,
+      long deleteEnd) {
     for (PartialPath device : devicePaths) {
-      if (tsFileResource.containsDevice(device.getFullPath()) &&
-          (deleteEnd >= tsFileResource.getStartTime(device.getFullPath()) &&
-              deleteStart <= tsFileResource
-                  .getOrDefaultEndTime(device.getFullPath(), Long.MAX_VALUE))) {
+      if (tsFileResource.containsDevice(device.getFullPath())
+          && (deleteEnd >= tsFileResource.getStartTime(device.getFullPath())
+              && deleteStart
+                  <= tsFileResource.getOrDefaultEndTime(device.getFullPath(), Long.MAX_VALUE))) {
         return false;
       }
     }
     return true;
   }
 
-  private void deleteDataInFiles(Collection<TsFileResource> tsFileResourceList, Deletion deletion,
-      Set<PartialPath> devicePaths, List<ModificationFile> updatedModFiles, long planIndex)
+  private void deleteDataInFiles(
+      Collection<TsFileResource> tsFileResourceList,
+      Deletion deletion,
+      Set<PartialPath> devicePaths,
+      List<ModificationFile> updatedModFiles,
+      long planIndex)
       throws IOException {
     for (TsFileResource tsFileResource : tsFileResourceList) {
-      if (canSkipDelete(tsFileResource, devicePaths, deletion.getStartTime(),
-          deletion.getEndTime())) {
+      if (canSkipDelete(
+          tsFileResource, devicePaths, deletion.getStartTime(), deletion.getEndTime())) {
         continue;
       }
 
@@ -1601,8 +1711,11 @@ public class StorageGroupProcessor {
       tsFileResource.getModFile().write(deletion);
       // remember to close mod file
       tsFileResource.getModFile().close();
-      logger.info("[Deletion] Deletion with path:{}, time:{}-{} written into mods file.",
-              deletion.getPath(), deletion.getStartTime(), deletion.getEndTime());
+      logger.info(
+          "[Deletion] Deletion with path:{}, time:{}-{} written into mods file.",
+          deletion.getPath(),
+          deletion.getStartTime(),
+          deletion.getEndTime());
 
       tsFileResource.updatePlanIndexes(planIndex);
 
@@ -1617,8 +1730,9 @@ public class StorageGroupProcessor {
     }
   }
 
-  private void tryToDeleteLastCache(PartialPath deviceId, PartialPath originalPath,
-      long startTime, long endTime) throws WriteProcessException {
+  private void tryToDeleteLastCache(
+      PartialPath deviceId, PartialPath originalPath, long startTime, long endTime)
+      throws WriteProcessException {
     if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) {
       return;
     }
@@ -1626,13 +1740,16 @@ public class StorageGroupProcessor {
       MNode node = IoTDB.metaManager.getDeviceNode(deviceId);
 
       for (MNode measurementNode : node.getChildren().values()) {
-        if (measurementNode != null && originalPath
-            .matchFullPath(measurementNode.getPartialPath())) {
+        if (measurementNode != null
+            && originalPath.matchFullPath(measurementNode.getPartialPath())) {
           TimeValuePair lastPair = ((MeasurementMNode) measurementNode).getCachedLast();
-          if (lastPair != null && startTime <= lastPair.getTimestamp()
+          if (lastPair != null
+              && startTime <= lastPair.getTimestamp()
               && lastPair.getTimestamp() <= endTime) {
             ((MeasurementMNode) measurementNode).resetCache();
-            logger.info("[tryToDeleteLastCache] Last cache for path: {} is set to null", measurementNode.getFullPath());
+            logger.info(
+                "[tryToDeleteLastCache] Last cache for path: {} is set to null",
+                measurementNode.getFullPath());
           }
         }
       }
@@ -1650,8 +1767,8 @@ public class StorageGroupProcessor {
     TsFileResource resource = tsFileProcessor.getTsFileResource();
     for (Entry<String, Integer> startTime : resource.getDeviceToIndexMap().entrySet()) {
       String deviceId = startTime.getKey();
-      resource.forceUpdateEndTime(deviceId,
-          latestTimeForEachDevice.get(tsFileProcessor.getTimeRangeId()).get(deviceId));
+      resource.forceUpdateEndTime(
+          deviceId, latestTimeForEachDevice.get(tsFileProcessor.getTimeRangeId()).get(deviceId));
     }
   }
 
@@ -1661,13 +1778,15 @@ public class StorageGroupProcessor {
 
   private boolean updateLatestFlushTimeCallback(TsFileProcessor processor) {
     // update the largest timestamp in the last flushing memtable
-    Map<String, Long> curPartitionDeviceLatestTime = latestTimeForEachDevice
-        .get(processor.getTimeRangeId());
+    Map<String, Long> curPartitionDeviceLatestTime =
+        latestTimeForEachDevice.get(processor.getTimeRangeId());
 
     if (curPartitionDeviceLatestTime == null) {
-      logger.warn("Partition: {} does't have latest time for each device. "
+      logger.warn(
+          "Partition: {} does't have latest time for each device. "
               + "No valid record is written into memtable. Flushing tsfile is: {}",
-          processor.getTimeRangeId(), processor.getTsFileResource().getTsFile());
+          processor.getTimeRangeId(),
+          processor.getTsFileResource().getTsFile());
       return false;
     }
 
@@ -1675,34 +1794,33 @@ public class StorageGroupProcessor {
       partitionLatestFlushedTimeForEachDevice
           .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
           .put(entry.getKey(), entry.getValue());
-      updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(),
-          entry.getKey(), entry.getValue());
-      if (globalLatestFlushedTimeForEachDevice
-          .getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
+      updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
+          processor.getTimeRangeId(), entry.getKey(), entry.getValue());
+      if (globalLatestFlushedTimeForEachDevice.getOrDefault(entry.getKey(), Long.MIN_VALUE)
+          < entry.getValue()) {
         globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
       }
     }
     return true;
   }
 
-
   /**
-   * <p>
-   *  update latest flush time for partition id
-   *  </>
+   * update latest flush time for partition id </>
+   *
    * @param partitionId partition id
    * @param latestFlushTime lastest flush time
    * @return true if update latest flush time success
    */
   private boolean updateLatestFlushTimeToPartition(long partitionId, long latestFlushTime) {
     // update the largest timestamp in the last flushing memtable
-    Map<String, Long> curPartitionDeviceLatestTime = latestTimeForEachDevice
-            .get(partitionId);
+    Map<String, Long> curPartitionDeviceLatestTime = latestTimeForEachDevice.get(partitionId);
 
     if (curPartitionDeviceLatestTime == null) {
-      logger.warn("Partition: {} does't have latest time for each device. "
-                      + "No valid record is written into memtable.  latest flush time is: {}",
-              partitionId, latestFlushTime);
+      logger.warn(
+          "Partition: {} does't have latest time for each device. "
+              + "No valid record is written into memtable.  latest flush time is: {}",
+          partitionId,
+          latestFlushTime);
       return false;
     }
 
@@ -1711,43 +1829,38 @@ public class StorageGroupProcessor {
       entry.setValue(latestFlushTime);
 
       partitionLatestFlushedTimeForEachDevice
-              .computeIfAbsent(partitionId, id -> new HashMap<>())
-              .put(entry.getKey(), entry.getValue());
+          .computeIfAbsent(partitionId, id -> new HashMap<>())
+          .put(entry.getKey(), entry.getValue());
       newlyFlushedPartitionLatestFlushedTimeForEachDevice
-              .computeIfAbsent(partitionId, id -> new HashMap<>())
-              .put(entry.getKey(), entry.getValue());
-      if (globalLatestFlushedTimeForEachDevice
-              .getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
+          .computeIfAbsent(partitionId, id -> new HashMap<>())
+          .put(entry.getKey(), entry.getValue());
+      if (globalLatestFlushedTimeForEachDevice.getOrDefault(entry.getKey(), Long.MIN_VALUE)
+          < entry.getValue()) {
         globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
       }
     }
     return true;
   }
 
-
-  /**
-   * used for upgrading
-   */
-  public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(long partitionId,
-      String deviceId, long time) {
+  /** used for upgrading */
+  public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
+      long partitionId, String deviceId, long time) {
     newlyFlushedPartitionLatestFlushedTimeForEachDevice
         .computeIfAbsent(partitionId, id -> new HashMap<>())
         .compute(deviceId, (k, v) -> v == null ? time : Math.max(v, time));
   }
 
-  /**
-   * put the memtable back to the MemTablePool and make the metadata in writer visible
-   */
+  /** put the memtable back to the MemTablePool and make the metadata in writer visible */
   // TODO please consider concurrency with query and insert method.
-  private void closeUnsealedTsFileProcessorCallBack(
-      TsFileProcessor tsFileProcessor) throws TsFileProcessorException {
+  private void closeUnsealedTsFileProcessorCallBack(TsFileProcessor tsFileProcessor)
+      throws TsFileProcessorException {
     closeQueryLock.writeLock().lock();
     try {
       tsFileProcessor.close();
     } finally {
       closeQueryLock.writeLock().unlock();
     }
-    //closingSequenceTsFileProcessor is a thread safety class.
+    // closingSequenceTsFileProcessor is a thread safety class.
     if (closingSequenceTsFileProcessor.contains(tsFileProcessor)) {
       closingSequenceTsFileProcessor.remove(tsFileProcessor);
     } else {
@@ -1757,30 +1870,34 @@ public class StorageGroupProcessor {
       closeStorageGroupCondition.notifyAll();
     }
     logger.info("signal closing storage group condition in {}", storageGroupName);
-    if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance()
-        .isTerminated()) {
+
+    executeCompaction(
+        tsFileProcessor.getTimeRangeId(),
+        IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+  }
+
+  private void executeCompaction(long timePartition, boolean fullMerge) {
+    if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
       compactionMergeWorking = true;
       logger.info("{} submit a compaction merge task", storageGroupName);
       try {
         // fork and filter current tsfile, then commit then to compaction merge
-        tsFileManagement.forkCurrentFileList(tsFileProcessor.getTimeRangeId());
+        tsFileManagement.forkCurrentFileList(timePartition);
+        tsFileManagement.setForceFullMerge(fullMerge);
         CompactionMergeTaskPoolManager.getInstance()
             .submitTask(
-                tsFileManagement.new CompactionMergeTask(this::closeCompactionMergeCallBack,
-                    tsFileProcessor.getTimeRangeId()));
+                tsFileManagement
+                .new CompactionMergeTask(this::closeCompactionMergeCallBack, timePartition));
       } catch (IOException | RejectedExecutionException e) {
         this.closeCompactionMergeCallBack();
         logger.error("{} compaction submit task failed", storageGroupName);
       }
     } else {
-      logger.info("{} last compaction merge task is working, skip current merge",
-          storageGroupName);
+      logger.info("{} last compaction merge task is working, skip current merge", storageGroupName);
     }
   }
 
-  /**
-   * close compaction merge callback, to release some locks
-   */
+  /** close compaction merge callback, to release some locks */
   private void closeCompactionMergeCallBack() {
     this.compactionMergeWorking = false;
   }
@@ -1811,10 +1928,12 @@ public class StorageGroupProcessor {
     List<TsFileResource> upgradedResources = tsFileResource.getUpgradedResources();
     for (TsFileResource resource : upgradedResources) {
       long partitionId = resource.getTimePartition();
-      resource.getDeviceToIndexMap().forEach((device, index) ->
-          updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(partitionId, device,
-              resource.getEndTime(index))
-      );
+      resource
+          .getDeviceToIndexMap()
+          .forEach(
+              (device, index) ->
+                  updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
+                      partitionId, device, resource.getEndTime(index)));
     }
     tsFileManagement.writeLock();
     writeLock();
@@ -1833,17 +1952,18 @@ public class StorageGroupProcessor {
 
     // after upgrade complete, update partitionLatestFlushedTimeForEachDevice
     if (countUpgradeFiles() == 0) {
-      for (Entry<Long, Map<String, Long>> entry : newlyFlushedPartitionLatestFlushedTimeForEachDevice
-          .entrySet()) {
+      for (Entry<Long, Map<String, Long>> entry :
+          newlyFlushedPartitionLatestFlushedTimeForEachDevice.entrySet()) {
         long timePartitionId = entry.getKey();
-        Map<String, Long> latestFlushTimeForPartition = partitionLatestFlushedTimeForEachDevice
-            .getOrDefault(timePartitionId, new HashMap<>());
+        Map<String, Long> latestFlushTimeForPartition =
+            partitionLatestFlushedTimeForEachDevice.getOrDefault(timePartitionId, new HashMap<>());
         for (Entry<String, Long> endTimeMap : entry.getValue().entrySet()) {
           String device = endTimeMap.getKey();
           long endTime = endTimeMap.getValue();
           if (latestFlushTimeForPartition.getOrDefault(device, Long.MIN_VALUE) < endTime) {
             partitionLatestFlushedTimeForEachDevice
-                .computeIfAbsent(timePartitionId, id -> new HashMap<>()).put(device, endTime);
+                .computeIfAbsent(timePartitionId, id -> new HashMap<>())
+                .put(device, endTime);
           }
         }
       }
@@ -1856,8 +1976,9 @@ public class StorageGroupProcessor {
   public void merge(boolean fullMerge) {
     writeLock();
     try {
-      this.tsFileManagement.merge(fullMerge, tsFileManagement.getTsFileList(true),
-          tsFileManagement.getTsFileList(false), dataTTL);
+      for (long timePartitionId : timePartitionIdVersionControllerMap.keySet()) {
+        executeCompaction(timePartitionId, fullMerge);
+      }
     } finally {
       writeUnlock();
     }
@@ -1865,15 +1986,14 @@ public class StorageGroupProcessor {
 
   /**
    * Load a new tsfile to storage group processor. Tne file may have overlap with other files.
-   * <p>
-   * or unsequence list.
-   * <p>
-   * Secondly, execute the loading process by the type.
-   * <p>
-   * Finally, update the latestTimeForEachDevice and partitionLatestFlushedTimeForEachDevice.
    *
-   * @param newTsFileResource tsfile resource
-   * @UsedBy sync module.
+   * <p>or unsequence list.
+   *
+   * <p>Secondly, execute the loading process by the type.
+   *
+   * <p>Finally, update the latestTimeForEachDevice and partitionLatestFlushedTimeForEachDevice.
+   *
+   * @param newTsFileResource tsfile resource @UsedBy sync module.
    */
   public void loadNewTsFileForSync(TsFileResource newTsFileResource) throws LoadFileException {
     File tsfileToBeInserted = newTsFileResource.getTsFile();
@@ -1881,14 +2001,18 @@ public class StorageGroupProcessor {
     tsFileManagement.writeLock();
     writeLock();
     try {
-      if (loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
+      if (loadTsFileByType(
+          LoadTsFileType.LOAD_SEQUENCE,
+          tsfileToBeInserted,
+          newTsFileResource,
           newFilePartitionId)) {
         updateLatestTimeMap(newTsFileResource);
       }
     } catch (DiskSpaceInsufficientException e) {
       logger.error(
           "Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.",
-          tsfileToBeInserted.getAbsolutePath(), tsfileToBeInserted.getParentFile().getName());
+          tsfileToBeInserted.getAbsolutePath(),
+          tsfileToBeInserted.getParentFile().getName());
       IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
       throw new LoadFileException(e);
     } finally {
@@ -1898,18 +2022,18 @@ public class StorageGroupProcessor {
   }
 
   /**
-   * Load a new tsfile to storage group processor. Tne file may have overlap with other files. <p>
-   * that there has no file which is overlapping with the new file.
-   * <p>
-   * Firstly, determine the loading type of the file, whether it needs to be loaded in sequence list
-   * or unsequence list.
-   * <p>
-   * Secondly, execute the loading process by the type.
-   * <p>
-   * Finally, update the latestTimeForEachDevice and partitionLatestFlushedTimeForEachDevice.
+   * Load a new tsfile to storage group processor. Tne file may have overlap with other files.
    *
-   * @param newTsFileResource tsfile resource
-   * @UsedBy load external tsfile module
+   * <p>that there has no file which is overlapping with the new file.
+   *
+   * <p>Firstly, determine the loading type of the file, whether it needs to be loaded in sequence
+   * list or unsequence list.
+   *
+   * <p>Secondly, execute the loading process by the type.
+   *
+   * <p>Finally, update the latestTimeForEachDevice and partitionLatestFlushedTimeForEachDevice.
+   *
+   * @param newTsFileResource tsfile resource @UsedBy load external tsfile module
    */
   public void loadNewTsFile(TsFileResource newTsFileResource) throws LoadFileException {
     File tsfileToBeInserted = newTsFileResource.getTsFile();
@@ -1926,36 +2050,49 @@ public class StorageGroupProcessor {
 
       // loading tsfile by type
       if (insertPos == POS_OVERLAP) {
-        loadTsFileByType(LoadTsFileType.LOAD_UNSEQUENCE, tsfileToBeInserted, newTsFileResource,
+        loadTsFileByType(
+            LoadTsFileType.LOAD_UNSEQUENCE,
+            tsfileToBeInserted,
+            newTsFileResource,
             newFilePartitionId);
       } else {
 
         // check whether the file name needs to be renamed.
         if (!tsFileManagement.isEmpty(true)) {
-          String newFileName = getFileNameForLoadingFile(tsfileToBeInserted.getName(), insertPos,
-              newTsFileResource.getTimePartition(), sequenceList);
+          String newFileName =
+              getFileNameForLoadingFile(
+                  tsfileToBeInserted.getName(),
+                  insertPos,
+                  newTsFileResource.getTimePartition(),
+                  sequenceList);
           if (!newFileName.equals(tsfileToBeInserted.getName())) {
-            logger.info("Tsfile {} must be renamed to {} for loading into the sequence list.",
-                tsfileToBeInserted.getName(), newFileName);
-            newTsFileResource
-                .setFile(fsFactory.getFile(tsfileToBeInserted.getParentFile(), newFileName));
+            logger.info(
+                "Tsfile {} must be renamed to {} for loading into the sequence list.",
+                tsfileToBeInserted.getName(),
+                newFileName);
+            newTsFileResource.setFile(
+                fsFactory.getFile(tsfileToBeInserted.getParentFile(), newFileName));
           }
         }
-        loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
+        loadTsFileByType(
+            LoadTsFileType.LOAD_SEQUENCE,
+            tsfileToBeInserted,
+            newTsFileResource,
             newFilePartitionId);
       }
 
       // update latest time map
       updateLatestTimeMap(newTsFileResource);
       long partitionNum = newTsFileResource.getTimePartition();
-      partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
+      partitionDirectFileVersions
+          .computeIfAbsent(partitionNum, p -> new HashSet<>())
           .addAll(newTsFileResource.getHistoricalVersions());
-      updatePartitionFileVersion(partitionNum,
-          newTsFileResource.getMaxVersion());
+      updatePartitionFileVersion(partitionNum, newTsFileResource.getMaxVersion());
     } catch (DiskSpaceInsufficientException e) {
       logger.error(
           "Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.",
-          tsfileToBeInserted.getAbsolutePath(), tsfileToBeInserted.getParentFile().getName());
+          tsfileToBeInserted.getAbsolutePath(),
+          tsfileToBeInserted.getParentFile().getName());
       IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
       throw new LoadFileException(e);
     } finally {
@@ -1968,8 +2105,8 @@ public class StorageGroupProcessor {
    * Set the version in "partition" to "version" if "version" is larger than the current version.
    */
   public void setPartitionFileVersionToMax(long partition, long version) {
-    partitionMaxFileVersions
-        .compute(partition, (prt, oldVer) -> computeMaxVersion(oldVer, version));
+    partitionMaxFileVersions.compute(
+        partition, (prt, oldVer) -> computeMaxVersion(oldVer, version));
   }
 
   private long computeMaxVersion(Long oldVersion, Long newVersion) {
@@ -1980,14 +2117,15 @@ public class StorageGroupProcessor {
   }
 
   /**
-   * Find the position of "newTsFileResource" in the sequence files if it can be inserted into
-   * them.
+   * Find the position of "newTsFileResource" in the sequence files if it can be inserted into them.
    *
    * @return POS_ALREADY_EXIST(- 2) if some file has the same name as the one to be inserted
-   * POS_OVERLAP(-3) if some file overlaps the new file an insertion position i >= -1 if the new
-   * file can be inserted between [i, i+1]
+   *     POS_OVERLAP(-3) if some file overlaps the new file an insertion position i >= -1 if the new
+   *     file can be inserted between [i, i+1]
    */
-  private int findInsertionPosition(TsFileResource newTsFileResource, long newFilePartitionId,
+  private int findInsertionPosition(
+      TsFileResource newTsFileResource,
+      long newFilePartitionId,
       List<TsFileResource> sequenceList) {
     File tsfileToBeInserted = newTsFileResource.getTsFile();
 
@@ -2028,7 +2166,7 @@ public class StorageGroupProcessor {
    * Compare each device in the two files to find the time relation of them.
    *
    * @return -1 if fileA is totally older than fileB (A < B) 0 if fileA is partially older than
-   * fileB and partially newer than fileB (A X B) 1 if fileA is totally newer than fileB (B < A)
+   *     fileB and partially newer than fileB (A X B) 1 if fileA is totally newer than fileB (B < A)
    */
   private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) {
     boolean hasPre = false, hasSubsequence = false;
@@ -2084,8 +2222,8 @@ public class StorageGroupProcessor {
     }
   }
 
-  private void removeFullyOverlapFiles(TsFileResource newTsFile, Iterator<TsFileResource> iterator
-      , boolean isSeq) {
+  private void removeFullyOverlapFiles(
+      TsFileResource newTsFile, Iterator<TsFileResource> iterator, boolean isSeq) {
     while (iterator.hasNext()) {
       TsFileResource existingTsFile = iterator.next();
       if (newTsFile.getHistoricalVersions().containsAll(existingTsFile.getHistoricalVersions())
@@ -2096,8 +2234,10 @@ public class StorageGroupProcessor {
         try {
           removeFullyOverlapFile(existingTsFile, iterator, isSeq);
         } catch (Exception e) {
-          logger.error("Something gets wrong while removing FullyOverlapFiles: {}",
-              existingTsFile.getTsFile().getAbsolutePath(), e);
+          logger.error(
+              "Something gets wrong while removing FullyOverlapFiles: {}",
+              existingTsFile.getTsFile().getAbsolutePath(),
+              e);
         } finally {
           existingTsFile.writeUnlock();
         }
@@ -2109,17 +2249,16 @@ public class StorageGroupProcessor {
    * remove the given tsFileResource. If the corresponding tsFileProcessor is in the working status,
    * close it before remove the related resource files. maybe time-consuming for closing a tsfile.
    */
-  private void removeFullyOverlapFile(TsFileResource tsFileResource,
-      Iterator<TsFileResource> iterator
-      , boolean isSeq) {
+  private void removeFullyOverlapFile(
+      TsFileResource tsFileResource, Iterator<TsFileResource> iterator, boolean isSeq) {
     if (!tsFileResource.isClosed()) {
       // also remove the TsFileProcessor if the overlapped file is not closed
       long timePartition = tsFileResource.getTimePartition();
-      Map<Long, TsFileProcessor> fileProcessorMap = isSeq ? workSequenceTsFileProcessors :
-          workUnsequenceTsFileProcessors;
+      Map<Long, TsFileProcessor> fileProcessorMap =
+          isSeq ? workSequenceTsFileProcessors : workUnsequenceTsFileProcessors;
       TsFileProcessor tsFileProcessor = fileProcessorMap.get(timePartition);
       if (tsFileProcessor != null && tsFileProcessor.getTsFileResource() == tsFileResource) {
-        //have to take some time to close the tsFileProcessor
+        // have to take some time to close the tsFileProcessor
         tsFileProcessor.syncClose();
         fileProcessorMap.remove(timePartition);
       }
@@ -2132,35 +2271,34 @@ public class StorageGroupProcessor {
   /**
    * Get an appropriate filename to ensure the order between files. The tsfile is named after
    * ({systemTime}-{versionNum}-{mergeNum}.tsfile).
-   * <p>
-   * The sorting rules for tsfile names @see {@link this#compareFileName}, we can restore the list
-   * based on the file name and ensure the correctness of the order, so there are three cases.
-   * <p>
-   * 1. The tsfile is to be inserted in the first place of the list. If the timestamp in the file
-   * name is less than the timestamp in the file name of the first tsfile  in the list, then the
-   * file name is legal and the file name is returned directly. Otherwise, its timestamp can be set
-   * to half of the timestamp value in the file name of the first tsfile in the list , and the
-   * version number is the version number in the file name of the first tsfile in the list.
-   * <p>
-   * 2. The tsfile is to be inserted in the last place of the list. If the timestamp in the file
-   * name is lager than the timestamp in the file name of the last tsfile  in the list, then the
-   * file name is legal and the file name is returned directly. Otherwise, the file name is
-   * generated by the system according to the naming rules and returned.
-   * <p>
-   * 3. This file is inserted between two files. If the timestamp in the name of the file satisfies
-   * the timestamp between the timestamps in the name of the two files, then it is a legal name and
-   * returns directly; otherwise, the time stamp is the mean of the timestamps of the two files, the
-   * version number is the version number in the tsfile with a larger timestamp.
+   *
+   * <p>The sorting rules for tsfile names @see {@link this#compareFileName}, we can restore the
+   * list based on the file name and ensure the correctness of the order, so there are three cases.
+   *
+   * <p>1. The tsfile is to be inserted in the first place of the list. If the timestamp in the file
+   * name is less than the timestamp in the file name of the first tsfile in the list, then the file
+   * name is legal and the file name is returned directly. Otherwise, its timestamp can be set to
+   * half of the timestamp value in the file name of the first tsfile in the list , and the version
+   * number is the version number in the file name of the first tsfile in the list.
+   *
+   * <p>2. The tsfile is to be inserted in the last place of the list. If the timestamp in the file
+   * name is lager than the timestamp in the file name of the last tsfile in the list, then the file
+   * name is legal and the file name is returned directly. Otherwise, the file name is generated by
+   * the system according to the naming rules and returned.
+   *
+   * <p>3. This file is inserted between two files. If the timestamp in the name of the file
+   * satisfies the timestamp between the timestamps in the name of the two files, then it is a legal
+   * name and returns directly; otherwise, the time stamp is the mean of the timestamps of the two
+   * files, the version number is the version number in the tsfile with a larger timestamp.
    *
    * @param tsfileName origin tsfile name
    * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex +
-   * 1]
+   *     1]
    * @return appropriate filename
    */
-  private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
-      long timePartitionId, List<TsFileResource> sequenceList) {
-    long currentTsFileTime = Long
-        .parseLong(tsfileName.split(FILE_NAME_SEPARATOR)[0]);
+  private String getFileNameForLoadingFile(
+      String tsfileName, int insertIndex, long timePartitionId, List<TsFileResource> sequenceList) {
+    long currentTsFileTime = Long.parseLong(tsfileName.split(FILE_NAME_SEPARATOR)[0]);
     long preTime;
     if (insertIndex == -1) {
       preTime = 0L;
@@ -2172,23 +2310,20 @@ public class StorageGroupProcessor {
       return preTime < currentTsFileTime ? tsfileName : getNewTsFileName(timePartitionId);
     } else {
       String subsequenceName = sequenceList.get(insertIndex + 1).getTsFile().getName();
-      long subsequenceTime = Long
-          .parseLong(subsequenceName.split(FILE_NAME_SEPARATOR)[0]);
-      long subsequenceVersion = Long
-          .parseLong(subsequenceName.split(FILE_NAME_SEPARATOR)[1]);
+      long subsequenceTime = Long.parseLong(subsequenceName.split(FILE_NAME_SEPARATOR)[0]);
+      long subsequenceVersion = Long.parseLong(subsequenceName.split(FILE_NAME_SEPARATOR)[1]);
       if (preTime < currentTsFileTime && currentTsFileTime < subsequenceTime) {
         return tsfileName;
       } else {
-        return getNewTsFileName(preTime + ((subsequenceTime - preTime) >> 1), subsequenceVersion,
-            0);
+        return getNewTsFileName(
+            preTime + ((subsequenceTime - preTime) >> 1), subsequenceVersion, 0);
       }
     }
   }
 
   /**
-   * Update latest time in latestTimeForEachDevice and partitionLatestFlushedTimeForEachDevice.
-   *
-   * @UsedBy sync module, load external tsfile module.
+   * Update latest time in latestTimeForEachDevice and
+   * partitionLatestFlushedTimeForEachDevice. @UsedBy sync module, load external tsfile module.
    */
   private void updateLatestTimeMap(TsFileResource newTsFileResource) {
     for (Entry<String, Integer> entry : newTsFileResource.getDeviceToIndexMap().entrySet()) {
@@ -2196,18 +2331,20 @@ public class StorageGroupProcessor {
       int index = entry.getValue();
       long endTime = newTsFileResource.getEndTime(index);
       long timePartitionId = StorageEngine.getTimePartition(endTime);
-      if (!latestTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>())
-          .containsKey(device)
+      if (!latestTimeForEachDevice
+              .computeIfAbsent(timePartitionId, id -> new HashMap<>())
+              .containsKey(device)
           || latestTimeForEachDevice.get(timePartitionId).get(device) < endTime) {
         latestTimeForEachDevice.get(timePartitionId).put(device, endTime);
       }
 
-      Map<String, Long> latestFlushTimeForPartition = partitionLatestFlushedTimeForEachDevice
-          .getOrDefault(timePartitionId, new HashMap<>());
+      Map<String, Long> latestFlushTimeForPartition =
+          partitionLatestFlushedTimeForEachDevice.getOrDefault(timePartitionId, new HashMap<>());
 
       if (latestFlushTimeForPartition.getOrDefault(device, Long.MIN_VALUE) < endTime) {
         partitionLatestFlushedTimeForEachDevice
-            .computeIfAbsent(timePartitionId, id -> new HashMap<>()).put(device, endTime);
+            .computeIfAbsent(timePartitionId, id -> new HashMap<>())
+            .put(device, endTime);
       }
       if (globalLatestFlushedTimeForEachDevice.getOrDefault(device, Long.MIN_VALUE) < endTime) {
         globalLatestFlushedTimeForEachDevice.put(device, endTime);
@@ -2221,18 +2358,21 @@ public class StorageGroupProcessor {
    * @param type load type
    * @param tsFileResource tsfile resource to be loaded
    * @param filePartitionId the partition id of the new file
-   * @return load the file successfully
-   * @UsedBy sync module, load external tsfile module.
+   * @return load the file successfully @UsedBy sync module, load external tsfile module.
    */
-  private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile,
-      TsFileResource tsFileResource, long filePartitionId)
+  private boolean loadTsFileByType(
+      LoadTsFileType type, File syncedTsFile, TsFileResource tsFileResource, long filePartitionId)
       throws LoadFileException, DiskSpaceInsufficientException {
     File targetFile;
     switch (type) {
       case LOAD_UNSEQUENCE:
-        targetFile = fsFactory
-            .getFile(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
-                storageGroupName + File.separatorChar + filePartitionId + File.separator
+        targetFile =
+            fsFactory.getFile(
+                DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
+                storageGroupName
+                    + File.separatorChar
+                    + filePartitionId
+                    + File.separator
                     + tsFileResource.getTsFile().getName());
         tsFileResource.setFile(targetFile);
         if (tsFileManagement.contains(tsFileResource, false)) {
@@ -2240,13 +2380,19 @@ public class StorageGroupProcessor {
           return false;
         }
         tsFileManagement.add(tsFileResource, false);
-        logger.info("Load tsfile in unsequence list, move file from {} to {}",
-            syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath());
+        logger.info(
+            "Load tsfile in unsequence list, move file from {} to {}",
+            syncedTsFile.getAbsolutePath(),
+            targetFile.getAbsolutePath());
         break;
       case LOAD_SEQUENCE:
         targetFile =
-            fsFactory.getFile(DirectoryManager.getInstance().getNextFolderForSequenceFile(),
-                storageGroupName + File.separatorChar + filePartitionId + File.separator
+            fsFactory.getFile(
+                DirectoryManager.getInstance().getNextFolderForSequenceFile(),
+                storageGroupName
+                    + File.separatorChar
+                    + filePartitionId
+                    + File.separator
                     + tsFileResource.getTsFile().getName());
         tsFileResource.setFile(targetFile);
         if (tsFileManagement.contains(tsFileResource, true)) {
@@ -2254,12 +2400,13 @@ public class StorageGroupProcessor {
           return false;
         }
         tsFileManagement.add(tsFileResource, true);
-        logger.info("Load tsfile in sequence list, move file from {} to {}",
-            syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath());
+        logger.info(
+            "Load tsfile in sequence list, move file from {} to {}",
+            syncedTsFile.getAbsolutePath(),
+            targetFile.getAbsolutePath());
         break;
       default:
-        throw new LoadFileException(
-            String.format("Unsupported type of loading tsfile : %s", type));
+        throw new LoadFileException(String.format("Unsupported type of loading tsfile : %s", type));
     }
 
     // move file from sync dir to data dir
@@ -2269,46 +2416,55 @@ public class StorageGroupProcessor {
     try {
       FileUtils.moveFile(syncedTsFile, targetFile);
     } catch (IOException e) {
-      logger.error("File renaming failed when loading tsfile. Origin: {}, Target: {}",
-          syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(), e);
-      throw new LoadFileException(String.format(
-          "File renaming failed when loading tsfile. Origin: %s, Target: %s, because %s",
-          syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(), e.getMessage()));
+      logger.error(
+          "File renaming failed when loading tsfile. Origin: {}, Target: {}",
+          syncedTsFile.getAbsolutePath(),
+          targetFile.getAbsolutePath(),
+          e);
+      throw new LoadFileException(
+          String.format(
+              "File renaming failed when loading tsfile. Origin: %s, Target: %s, because %s",
+              syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(), e.getMessage()));
     }
 
-    File syncedResourceFile = fsFactory.getFile(
-        syncedTsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
-    File targetResourceFile = fsFactory.getFile(
-        targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+    File syncedResourceFile =
+        fsFactory.getFile(syncedTsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+    File targetResourceFile =
+        fsFactory.getFile(targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
     try {
       FileUtils.moveFile(syncedResourceFile, targetResourceFile);
     } catch (IOException e) {
-      logger.error("File renaming failed when loading .resource file. Origin: {}, Target: {}",
-          syncedResourceFile.getAbsolutePath(), targetResourceFile.getAbsolutePath(), e);
-      throw new LoadFileException(String.format(
-          "File renaming failed when loading .resource file. Origin: %s, Target: %s, because %s",
-          syncedResourceFile.getAbsolutePath(), targetResourceFile.getAbsolutePath(),
-          e.getMessage()));
-    }
-    partitionDirectFileVersions.computeIfAbsent(filePartitionId,
-        p -> new HashSet<>()).addAll(tsFileResource.getHistoricalVersions());
+      logger.error(
+          "File renaming failed when loading .resource file. Origin: {}, Target: {}",
+          syncedResourceFile.getAbsolutePath(),
+          targetResourceFile.getAbsolutePath(),
+          e);
+      throw new LoadFileException(
+          String.format(
+              "File renaming failed when loading .resource file. Origin: %s, Target: %s, because %s",
+              syncedResourceFile.getAbsolutePath(),
+              targetResourceFile.getAbsolutePath(),
+              e.getMessage()));
+    }
+    partitionDirectFileVersions
+        .computeIfAbsent(filePartitionId, p -> new HashSet<>())
+        .addAll(tsFileResource.getHistoricalVersions());
     if (!tsFileResource.getHistoricalVersions().isEmpty()) {
-      updatePartitionFileVersion(filePartitionId,
-          tsFileResource.getMaxVersion());
+      updatePartitionFileVersion(filePartitionId, tsFileResource.getMaxVersion());
     }
     return true;
   }
 
   /**
    * Delete tsfile if it exists.
-   * <p>
-   * Firstly, remove the TsFileResource from sequenceFileList/unSequenceFileList.
-   * <p>
-   * Secondly, delete the tsfile and .resource file.
+   *
+   * <p>Firstly, remove the TsFileResource from sequenceFileList/unSequenceFileList.
+   *
+   * <p>Secondly, delete the tsfile and .resource file.
    *
    * @param tsfieToBeDeleted tsfile to be deleted
-   * @return whether the file to be deleted exists.
-   * @UsedBy sync module, load external tsfile module.
+   * @return whether the file to be deleted exists. @UsedBy sync module, load external tsfile
+   *     module.
    */
   public boolean deleteTsfile(File tsfieToBeDeleted) {
     tsFileManagement.writeLock();
@@ -2352,21 +2508,19 @@ public class StorageGroupProcessor {
     return true;
   }
 
-
   public Collection<TsFileProcessor> getWorkSequenceTsFileProcessors() {
     return workSequenceTsFileProcessors.values();
   }
 
   /**
    * Move tsfile to the target directory if it exists.
-   * <p>
-   * Firstly, remove the TsFileResource from sequenceFileList/unSequenceFileList.
-   * <p>
-   * Secondly, move the tsfile and .resource file to the target directory.
+   *
+   * <p>Firstly, remove the TsFileResource from sequenceFileList/unSequenceFileList.
+   *
+   * <p>Secondly, move the tsfile and .resource file to the target directory.
    *
    * @param fileToBeMoved tsfile to be moved
-   * @return whether the file to be moved exists.
-   * @UsedBy load external tsfile module.
+   * @return whether the file to be moved exists. @UsedBy load external tsfile module.
    */
   public boolean moveTsfile(File fileToBeMoved, File targetDir) {
     tsFileManagement.writeLock();
@@ -2403,17 +2557,16 @@ public class StorageGroupProcessor {
     tsFileResourceToBeMoved.writeLock();
     try {
       tsFileResourceToBeMoved.moveTo(targetDir);
-      logger
-          .info("Move tsfile {} to target dir {} successfully.",
-              tsFileResourceToBeMoved.getTsFile(),
-              targetDir.getPath());
+      logger.info(
+          "Move tsfile {} to target dir {} successfully.",
+          tsFileResourceToBeMoved.getTsFile(),
+          targetDir.getPath());
     } finally {
       tsFileResourceToBeMoved.writeUnlock();
     }
     return true;
   }
 
-
   public Collection<TsFileProcessor> getWorkUnsequenceTsFileProcessors() {
     return workUnsequenceTsFileProcessors.values();
   }
@@ -2447,8 +2600,8 @@ public class StorageGroupProcessor {
    * "tsFileResource" may have unwritten data of that file.
    *
    * @return true if the historicalVersions of "tsFileResource" is a subset of
-   * partitionDirectFileVersions, or false if it is not a subset and it contains any version of a
-   * working file USED by cluster module
+   *     partitionDirectFileVersions, or false if it is not a subset and it contains any version of
+   *     a working file USED by cluster module
    */
   public boolean isFileAlreadyExist(TsFileResource tsFileResource, long partitionNum) {
     // consider the case: The local node crashes when it is writing TsFile no.5.
@@ -2468,16 +2621,16 @@ public class StorageGroupProcessor {
         return false;
       }
     }
-    Set<Long> partitionFileVersions = partitionDirectFileVersions
-        .getOrDefault(partitionNum, Collections.emptySet());
-    logger.debug("FileVersions/PartitionVersions: {}/{}", tsFileResource.getHistoricalVersions(),
+    Set<Long> partitionFileVersions =
+        partitionDirectFileVersions.getOrDefault(partitionNum, Collections.emptySet());
+    logger.debug(
+        "FileVersions/PartitionVersions: {}/{}",
+        tsFileResource.getHistoricalVersions(),
         partitionFileVersions);
     return partitionFileVersions.containsAll(tsFileResource.getHistoricalVersions());
   }
 
-  /**
-   * remove all partitions that satisfy a filter.
-   */
+  /** remove all partitions that satisfy a filter. */
   public void removePartitions(TimePartitionFilter filter) {
     // this requires blocking all other activities
     tsFileManagement.writeLock();
@@ -2499,9 +2652,9 @@ public class StorageGroupProcessor {
     }
   }
 
-  //may remove the processorEntrys
-  private void removePartitions(TimePartitionFilter filter,
-      Set<Entry<Long, TsFileProcessor>> processorEntrys) {
+  // may remove the processorEntrys
+  private void removePartitions(
+      TimePartitionFilter filter, Set<Entry<Long, TsFileProcessor>> processorEntrys) {
     for (Iterator<Entry<Long, TsFileProcessor>> iterator = processorEntrys.iterator();
         iterator.hasNext(); ) {
       Entry<Long, TsFileProcessor> longTsFileProcessorEntry = iterator.next();
@@ -2511,13 +2664,14 @@ public class StorageGroupProcessor {
         processor.syncClose();
         iterator.remove();
         updateLatestFlushTimeToPartition(partitionId, Long.MIN_VALUE);
-        logger.debug("{} is removed during deleting partitions",
+        logger.debug(
+            "{} is removed during deleting partitions",
             processor.getTsFileResource().getTsFilePath());
       }
     }
   }
 
-  //may remove the iterator's data
+  // may remove the iterator's data
   private void removePartitions(TimePartitionFilter filter, Iterator<TsFileResource> iterator) {
     while (iterator.hasNext()) {
       TsFileResource tsFileResource = iterator.next();
@@ -2545,25 +2699,27 @@ public class StorageGroupProcessor {
       boolean isSequence = false;
       for (InsertRowPlan plan : insertRowsOfOneDevicePlan.getRowPlans()) {
         if (!isAlive(plan.getTime())) {
-          //we do not need to write these part of data, as they can not be queried
+          // we do not need to write these part of data, as they can not be queried
           continue;
         }
         // init map
         long timePartitionId = StorageEngine.getTimePartition(plan.getTime());
 
-        partitionLatestFlushedTimeForEachDevice
-            .computeIfAbsent(timePartitionId, id -> new HashMap<>());
-        //as the plans have been ordered, and we have get the write lock,
-        //So, if a plan is sequenced, then all the rest plans are sequenced.
+        partitionLatestFlushedTimeForEachDevice.computeIfAbsent(
+            timePartitionId, id -> new HashMap<>());
+        // as the plans have been ordered, and we have get the write lock,
+        // So, if a plan is sequenced, then all the rest plans are sequenced.
         //
         if (!isSequence) {
           isSequence =
-              plan.getTime() > partitionLatestFlushedTimeForEachDevice.get(timePartitionId)
-                  .getOrDefault(plan.getDeviceId().getFullPath(), Long.MIN_VALUE);
+              plan.getTime()
+                  > partitionLatestFlushedTimeForEachDevice
+                      .get(timePartitionId)
+                      .getOrDefault(plan.getDeviceId().getFullPath(), Long.MIN_VALUE);
         }
-        //is unsequence and user set config to discard out of order data
-        if (!isSequence && IoTDBDescriptor.getInstance().getConfig()
-            .isEnableDiscardOutOfOrderData()) {
+        // is unsequence and user set config to discard out of order data
+        if (!isSequence
+            && IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
           return;
         }
 
@@ -2574,11 +2730,11 @@ public class StorageGroupProcessor {
     } finally {
       writeUnlock();
     }
-
   }
 
   private enum LoadTsFileType {
-    LOAD_SEQUENCE, LOAD_UNSEQUENCE
+    LOAD_SEQUENCE,
+    LOAD_UNSEQUENCE
   }
 
   @FunctionalInterface
@@ -2611,13 +2767,11 @@ public class StorageGroupProcessor {
     boolean satisfy(String storageGroupName, long timePartitionId);
   }
 
-  public void setCustomCloseFileListeners(
-      List<CloseFileListener> customCloseFileListeners) {
+  public void setCustomCloseFileListeners(List<CloseFileListener> customCloseFileListeners) {
     this.customCloseFileListeners = customCloseFileListeners;
   }
 
-  public void setCustomFlushListeners(
-      List<FlushListener> customFlushListeners) {
+  public void setCustomFlushListeners(List<FlushListener> customFlushListeners) {
     this.customFlushListeners = customFlushListeners;
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 8cd8d1a..4a9f513 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -26,11 +26,11 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.MetadataManagerHelper;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
 import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
 import org.apache.iotdb.db.exception.StorageGroupProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -65,7 +65,8 @@ public class StorageGroupProcessorTest {
 
   @Before
   public void setUp() throws Exception {
-    IoTDBDescriptor.getInstance().getConfig()
+    IoTDBDescriptor.getInstance()
+        .getConfig()
         .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
     MetadataManagerHelper.initMetadata();
     EnvironmentUtils.envSetUp();
@@ -80,7 +81,8 @@ public class StorageGroupProcessorTest {
     EnvironmentUtils.cleanDir(TestConstant.OUTPUT_DATA_DIR);
     MergeManager.getINSTANCE().stop();
     EnvironmentUtils.cleanEnv();
-    IoTDBDescriptor.getInstance().getConfig()
+    IoTDBDescriptor.getInstance()
+        .getConfig()
         .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
   }
 
@@ -118,9 +120,14 @@ public class StorageGroupProcessorTest {
 
     List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
     for (TsFileProcessor tsfileProcessor : processor.getWorkUnsequenceTsFileProcessors()) {
-      tsfileProcessor
-          .query(deviceId, measurementId, TSDataType.INT32, TSEncoding.RLE, Collections.emptyMap(),
-              new QueryContext(), tsfileResourcesForQuery);
+      tsfileProcessor.query(
+          deviceId,
+          measurementId,
+          TSDataType.INT32,
+          TSEncoding.RLE,
+          Collections.emptyMap(),
+          new QueryContext(),
+          tsfileResourcesForQuery);
     }
 
     Assert.assertEquals(1, tsfileResourcesForQuery.size());
@@ -152,9 +159,8 @@ public class StorageGroupProcessorTest {
       e.printStackTrace();
     }
     processor.syncCloseAllWorkingTsFileProcessors();
-    QueryDataSource queryDataSource = processor
-        .query(new PartialPath(deviceId), measurementId, context,
-            null, null);
+    QueryDataSource queryDataSource =
+        processor.query(new PartialPath(deviceId), measurementId, context, null, null);
     Assert.assertEquals(10, queryDataSource.getSeqResources().size());
     for (TsFileResource resource : queryDataSource.getSeqResources()) {
       Assert.assertTrue(resource.isClosed());
@@ -163,7 +169,7 @@ public class StorageGroupProcessorTest {
 
   @Test
   public void testInsertDataAndRemovePartitionAndInsert()
-          throws WriteProcessException, QueryProcessException, IllegalPathException {
+      throws WriteProcessException, QueryProcessException, IllegalPathException {
     for (int j = 0; j < 10; j++) {
       TSRecord record = new TSRecord(j, deviceId);
       record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
@@ -182,9 +188,8 @@ public class StorageGroupProcessorTest {
     }
     processor.syncCloseAllWorkingTsFileProcessors();
 
-    QueryDataSource queryDataSource = processor
-            .query(new PartialPath(deviceId), measurementId, context,
-                    null, null);
+    QueryDataSource queryDataSource =
+        processor.query(new PartialPath(deviceId), measurementId, context, null, null);
     Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
   }
 
@@ -200,14 +205,15 @@ public class StorageGroupProcessorTest {
     dataTypes.add(TSDataType.INT64.ordinal());
 
     MeasurementMNode[] measurementMNodes = new MeasurementMNode[2];
-    measurementMNodes[0] = new MeasurementMNode(null, "s0",
-        new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null);
-    measurementMNodes[1] = new MeasurementMNode(null, "s1",
-        new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null);
-
-    InsertTabletPlan insertTabletPlan1 = new InsertTabletPlan(new PartialPath("root.vehicle.d0"),
-        measurements,
-        dataTypes);
+    measurementMNodes[0] =
+        new MeasurementMNode(
+            null, "s0", new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null);
+    measurementMNodes[1] =
+        new MeasurementMNode(
+            null, "s1", new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null);
+
+    InsertTabletPlan insertTabletPlan1 =
+        new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes);
     insertTabletPlan1.setMeasurementMNodes(measurementMNodes);
 
     long[] times = new long[100];
@@ -227,9 +233,8 @@ public class StorageGroupProcessorTest {
     processor.insertTablet(insertTabletPlan1);
     processor.asyncCloseAllWorkingTsFileProcessors();
 
-    InsertTabletPlan insertTabletPlan2 = new InsertTabletPlan(new PartialPath("root.vehicle.d0"),
-        measurements,
-        dataTypes);
+    InsertTabletPlan insertTabletPlan2 =
+        new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes);
     insertTabletPlan2.setMeasurementMNodes(measurementMNodes);
 
     for (int r = 50; r < 149; r++) {
@@ -245,9 +250,8 @@ public class StorageGroupProcessorTest {
     processor.asyncCloseAllWorkingTsFileProcessors();
     processor.syncCloseAllWorkingTsFileProcessors();
 
-    QueryDataSource queryDataSource = processor
-        .query(new PartialPath(deviceId), measurementId, context,
-            null, null);
+    QueryDataSource queryDataSource =
+        processor.query(new PartialPath(deviceId), measurementId, context, null, null);
 
     Assert.assertEquals(2, queryDataSource.getSeqResources().size());
     Assert.assertEquals(1, queryDataSource.getUnseqResources().size());
@@ -256,7 +260,6 @@ public class StorageGroupProcessorTest {
     }
   }
 
-
   @Test
   public void testSeqAndUnSeqSyncClose()
       throws WriteProcessException, QueryProcessException, IllegalPathException {
@@ -278,9 +281,8 @@ public class StorageGroupProcessorTest {
 
     processor.syncCloseAllWorkingTsFileProcessors();
 
-    QueryDataSource queryDataSource = processor
-        .query(new PartialPath(deviceId), measurementId, context,
-            null, null);
+    QueryDataSource queryDataSource =
+        processor.query(new PartialPath(deviceId), measurementId, context, null, null);
     Assert.assertEquals(10, queryDataSource.getSeqResources().size());
     Assert.assertEquals(10, queryDataSource.getUnseqResources().size());
     for (TsFileResource resource : queryDataSource.getSeqResources()) {
@@ -319,9 +321,8 @@ public class StorageGroupProcessorTest {
       tsfileProcessor.syncFlush();
     }
 
-    QueryDataSource queryDataSource = processor
-        .query(new PartialPath(deviceId), measurementId, context,
-            null, null);
+    QueryDataSource queryDataSource =
+        processor.query(new PartialPath(deviceId), measurementId, context, null, null);
     Assert.assertEquals(10, queryDataSource.getSeqResources().size());
     Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
     for (TsFileResource resource : queryDataSource.getSeqResources()) {
@@ -353,14 +354,15 @@ public class StorageGroupProcessorTest {
     dataTypes.add(TSDataType.INT64.ordinal());
 
     MeasurementMNode[] measurementMNodes = new MeasurementMNode[2];
-    measurementMNodes[0] = new MeasurementMNode(null, "s0",
-        new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null);
-    measurementMNodes[1] = new MeasurementMNode(null, "s1",
-        new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null);
+    measurementMNodes[0] =
+        new MeasurementMNode(
+            null, "s0", new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null);
+    measurementMNodes[1] =
+        new MeasurementMNode(
+            null, "s1", new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null);
 
-    InsertTabletPlan insertTabletPlan1 = new InsertTabletPlan(new PartialPath("root.vehicle.d0"),
-        measurements,
-        dataTypes);
+    InsertTabletPlan insertTabletPlan1 =
+        new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes);
 
     long[] times = new long[100];
     Object[] columns = new Object[2];
@@ -380,9 +382,8 @@ public class StorageGroupProcessorTest {
     processor.insertTablet(insertTabletPlan1);
     processor.asyncCloseAllWorkingTsFileProcessors();
 
-    InsertTabletPlan insertTabletPlan2 = new InsertTabletPlan(new PartialPath("root.vehicle.d0"),
-        measurements,
-        dataTypes);
+    InsertTabletPlan insertTabletPlan2 =
+        new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes);
 
     for (int r = 149; r >= 50; r--) {
       times[r - 50] = r;
@@ -402,9 +403,8 @@ public class StorageGroupProcessorTest {
       tsfileProcessor.syncFlush();
     }
 
-    QueryDataSource queryDataSource = processor
-        .query(new PartialPath(deviceId), measurementId, context,
-            null, null);
+    QueryDataSource queryDataSource =
+        processor.query(new PartialPath(deviceId), measurementId, context, null, null);
 
     Assert.assertEquals(2, queryDataSource.getSeqResources().size());
     Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
@@ -436,14 +436,15 @@ public class StorageGroupProcessorTest {
     dataTypes.add(TSDataType.INT64.ordinal());
 
     MeasurementMNode[] measurementMNodes = new MeasurementMNode[2];
-    measurementMNodes[0] = new MeasurementMNode(null, "s0",
-        new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null);
-    measurementMNodes[1] = new MeasurementMNode(null, "s1",
-        new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null);
+    measurementMNodes[0] =
+        new MeasurementMNode(
+            null, "s0", new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null);
+    measurementMNodes[1] =
+        new MeasurementMNode(
+            null, "s1", new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null);
 
-    InsertTabletPlan insertTabletPlan1 = new InsertTabletPlan(new PartialPath("root.vehicle.d0"),
-        measurements,
-        dataTypes);
+    InsertTabletPlan insertTabletPlan1 =
+        new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes);
 
     long[] times = new long[1200];
     Object[] columns = new Object[2];
@@ -463,9 +464,8 @@ public class StorageGroupProcessorTest {
     processor.insertTablet(insertTabletPlan1);
     processor.asyncCloseAllWorkingTsFileProcessors();
 
-    InsertTabletPlan insertTabletPlan2 = new InsertTabletPlan(new PartialPath("root.vehicle.d0"),
-        measurements,
-        dataTypes);
+    InsertTabletPlan insertTabletPlan2 =
+        new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes);
 
     for (int r = 1249; r >= 50; r--) {
       times[r - 50] = r;
@@ -485,9 +485,8 @@ public class StorageGroupProcessorTest {
       tsfileProcessor.syncFlush();
     }
 
-    QueryDataSource queryDataSource = processor
-        .query(new PartialPath(deviceId), measurementId, context,
-            null, null);
+    QueryDataSource queryDataSource =
+        processor.query(new PartialPath(deviceId), measurementId, context, null, null);
 
     Assert.assertEquals(2, queryDataSource.getSeqResources().size());
     Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
@@ -519,14 +518,15 @@ public class StorageGroupProcessorTest {
     dataTypes.add(TSDataType.INT64.ordinal());
 
     MeasurementMNode[] measurementMNodes = new MeasurementMNode[2];
-    measurementMNodes[0] = new MeasurementMNode(null, "s0",
-        new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null);
-    measurementMNodes[1] = new MeasurementMNode(null, "s1",
-        new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null);
+    measurementMNodes[0] =
+        new MeasurementMNode(
+            null, "s0", new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null);
+    measurementMNodes[1] =
+        new MeasurementMNode(
+            null, "s1", new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null);
 
-    InsertTabletPlan insertTabletPlan1 = new InsertTabletPlan(new PartialPath("root.vehicle.d0"),
-        measurements,
-        dataTypes);
+    InsertTabletPlan insertTabletPlan1 =
+        new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes);
 
     long[] times = new long[1200];
     Object[] columns = new Object[2];
@@ -546,9 +546,8 @@ public class StorageGroupProcessorTest {
     processor.insertTablet(insertTabletPlan1);
     processor.asyncCloseAllWorkingTsFileProcessors();
 
-    InsertTabletPlan insertTabletPlan2 = new InsertTabletPlan(new PartialPath("root.vehicle.d0"),
-        measurements,
-        dataTypes);
+    InsertTabletPlan insertTabletPlan2 =
+        new InsertTabletPlan(new PartialPath("root.vehicle.d0"), measurements, dataTypes);
 
     for (int r = 1249; r >= 50; r--) {
       times[r - 50] = r;
@@ -568,9 +567,8 @@ public class StorageGroupProcessorTest {
       tsfileProcessor.syncFlush();
     }
 
-    QueryDataSource queryDataSource = processor
-        .query(new PartialPath(deviceId), measurementId, context,
-            null, null);
+    QueryDataSource queryDataSource =
+        processor.query(new PartialPath(deviceId), measurementId, context, null, null);
 
     Assert.assertEquals(2, queryDataSource.getSeqResources().size());
     Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
@@ -607,11 +605,10 @@ public class StorageGroupProcessorTest {
       // wait
     }
 
-    QueryDataSource queryDataSource = processor
-        .query(new PartialPath(deviceId), measurementId, context,
-            null, null);
+    QueryDataSource queryDataSource =
+        processor.query(new PartialPath(deviceId), measurementId, context, null, null);
     Assert.assertEquals(10, queryDataSource.getSeqResources().size());
-    Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
+    Assert.assertEquals(10, queryDataSource.getUnseqResources().size());
     for (TsFileResource resource : queryDataSource.getSeqResources()) {
       Assert.assertTrue(resource.isClosed());
     }
@@ -625,6 +622,5 @@ public class StorageGroupProcessorTest {
     DummySGP(String systemInfoDir, String storageGroupName) throws StorageGroupProcessorException {
       super(systemInfoDir, storageGroupName, new TsFileFlushPolicy.DirectFlushPolicy());
     }
-
   }
-}
\ No newline at end of file
+}