You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/06/23 10:02:19 UTC

[incubator-iotdb] branch hot_compaction updated: update StorageGroupProcessor

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch hot_compaction
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/hot_compaction by this push:
     new a10b404  update StorageGroupProcessor
     new d08b086  Merge pull request #1410 from zhanglingzhe0820/add_vm_merge
a10b404 is described below

commit a10b404edce4c5d5d85a16b225df1e47eb7a1ecb
Author: 张凌哲 <zh...@bytedance.com>
AuthorDate: Tue Jun 23 17:57:37 2020 +0800

    update StorageGroupProcessor
---
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |  10 -
 .../engine/storagegroup/StorageGroupProcessor.java | 230 ++++++++++++---------
 2 files changed, 137 insertions(+), 103 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 0ead37f..cf84a6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -23,14 +23,11 @@ import static org.apache.iotdb.db.utils.MergeUtils.writeBatchPoint;
 import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_UPGRADE;
 import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SEPARATOR;
 import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.VM_SUFFIX;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -41,14 +38,9 @@ import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.runtime.FlushRunTimeException;
-import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.metadata.mnode.InternalMNode;
-import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.reader.chunk.ChunkDataIterator;
 import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
 import org.apache.iotdb.db.utils.datastructure.TVList;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
@@ -57,8 +49,6 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.reader.IBatchReader;
-import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
-import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 009acac..ae12100 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -20,7 +20,9 @@ package org.apache.iotdb.db.engine.storagegroup;
 
 import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
 import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SEPARATOR;
 import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.VM_SUFFIX;
 
 import java.io.File;
 import java.io.IOException;
@@ -229,22 +231,20 @@ public class StorageGroupProcessor {
   private TsFileFlushPolicy fileFlushPolicy;
 
   /**
-   * partitionDirectFileVersions records the versions of the direct TsFiles (generated by close,
-   * not including the files generated by merge) of each partition.
-   * As data file close is managed by the leader in the distributed version, the files with the
-   * same version(s) have the same data, despite that the inner structure (the size and
-   * organization of chunks) may be different, so we can easily find what remote files we do not
-   * have locally.
-   * partition number -> version number set
+   * partitionDirectFileVersions records the versions of the direct TsFiles (generated by close, not
+   * including the files generated by merge) of each partition. As data file close is managed by the
+   * leader in the distributed version, the files with the same version(s) have the same data,
+   * despite that the inner structure (the size and organization of chunks) may be different, so we
+   * can easily find what remote files we do not have locally. partition number -> version number
+   * set
    */
   private Map<Long, Set<Long>> partitionDirectFileVersions = new HashMap<>();
 
   /**
-   * The max file versions in each partition. By recording this, if several IoTDB instances have
-   * the same policy of closing file and their ingestion is identical, then files of the same
-   * version in different IoTDB instance will have identical data, providing convenience for data
-   * comparison across different instances.
-   * partition number -> max version number
+   * The max file versions in each partition. By recording this, if several IoTDB instances have the
+   * same policy of closing file and their ingestion is identical, then files of the same version in
+   * different IoTDB instance will have identical data, providing convenience for data comparison
+   * across different instances. partition number -> max version number
    */
   private Map<Long, Long> partitionMaxFileVersions = new HashMap<>();
 
@@ -272,27 +272,33 @@ public class StorageGroupProcessor {
     try {
       // collect candidate TsFiles from sequential and unsequential data directory
       Pair<List<TsFileResource>, List<TsFileResource>> seqTsFilesPair = getAllFiles(
-              DirectoryManager.getInstance().getAllSequenceFileFolders());
+          DirectoryManager.getInstance().getAllSequenceFileFolders());
       List<TsFileResource> tmpSeqTsFiles = seqTsFilesPair.left;
       List<TsFileResource> oldSeqTsFiles = seqTsFilesPair.right;
       upgradeSeqFileList.addAll(oldSeqTsFiles);
       Pair<List<TsFileResource>, List<TsFileResource>> unseqTsFilesPair = getAllFiles(
-              DirectoryManager.getInstance().getAllUnSequenceFileFolders());
+          DirectoryManager.getInstance().getAllUnSequenceFileFolders());
       List<TsFileResource> tmpUnseqTsFiles = unseqTsFilesPair.left;
       List<TsFileResource> oldUnseqTsFiles = unseqTsFilesPair.right;
       upgradeUnseqFileList.addAll(oldUnseqTsFiles);
+      Map<String, List<TsFileResource>> vmSeqFiles = getAllVms(
+          DirectoryManager.getInstance().getAllSequenceFileFolders());
+      Map<String, List<TsFileResource>> vmUnseqFiles = getAllVms(
+          DirectoryManager.getInstance().getAllUnSequenceFileFolders());
 
-      recoverSeqFiles(tmpSeqTsFiles);
-      recoverUnseqFiles(tmpUnseqTsFiles);
+      recoverSeqFiles(tmpSeqTsFiles, vmSeqFiles);
+      recoverUnseqFiles(tmpUnseqTsFiles, vmUnseqFiles);
 
       for (TsFileResource resource : sequenceFileTreeSet) {
         long partitionNum = resource.getTimePartition();
-        partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
+        partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
+            .addAll(resource.getHistoricalVersions());
         updatePartitionFileVersion(partitionNum, Collections.max(resource.getHistoricalVersions()));
       }
       for (TsFileResource resource : unSequenceFileList) {
         long partitionNum = resource.getTimePartition();
-        partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
+        partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
+            .addAll(resource.getHistoricalVersions());
         updatePartitionFileVersion(partitionNum, Collections.max(resource.getHistoricalVersions()));
       }
 
@@ -317,7 +323,6 @@ public class StorageGroupProcessor {
       throw new StorageGroupProcessorException(e);
     }
 
-
     for (TsFileResource resource : sequenceFileTreeSet) {
       long timePartitionId = resource.getTimePartition();
       Map<String, Long> endTimeMap = new HashMap<>();
@@ -346,11 +351,11 @@ public class StorageGroupProcessor {
   /**
    * use old seq file to update latestTimeForEachDevice, globalLatestFlushedTimeForEachDevice,
    * partitionLatestFlushedTimeForEachDevice and timePartitionIdVersionControllerMap
-   *
    */
   private void updateLastestFlushedTime() throws IOException {
 
-    VersionController versionController = new SimpleFileVersionController(storageGroupSysDir.getPath());
+    VersionController versionController = new SimpleFileVersionController(
+        storageGroupSysDir.getPath());
     long currentVersion = versionController.currVersion();
     for (TsFileResource resource : upgradeSeqFileList) {
       for (Entry<String, Integer> entry : resource.getDeviceToIndexMap().entrySet()) {
@@ -359,24 +364,27 @@ public class StorageGroupProcessor {
         long endTime = resource.getEndTime(index);
         long endTimePartitionId = StorageEngine.getTimePartition(endTime);
         latestTimeForEachDevice.computeIfAbsent(endTimePartitionId, l -> new HashMap<>())
-                .put(deviceId, endTime);
+            .put(deviceId, endTime);
         globalLatestFlushedTimeForEachDevice.put(deviceId, endTime);
 
         // set all the covered partition's LatestFlushedTime to Long.MAX_VALUE
         long partitionId = StorageEngine.getTimePartition(resource.getStartTime(index));
         while (partitionId <= endTimePartitionId) {
           partitionLatestFlushedTimeForEachDevice.computeIfAbsent(partitionId, l -> new HashMap<>())
-                  .put(deviceId, Long.MAX_VALUE);
+              .put(deviceId, Long.MAX_VALUE);
           if (!timePartitionIdVersionControllerMap.containsKey(partitionId)) {
-            File directory = SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, String.valueOf(partitionId));
-            if(!directory.exists()){
+            File directory = SystemFileFactory.INSTANCE
+                .getFile(storageGroupSysDir, String.valueOf(partitionId));
+            if (!directory.exists()) {
               directory.mkdirs();
             }
-            File versionFile = SystemFileFactory.INSTANCE.getFile(directory, SimpleFileVersionController.FILE_PREFIX + currentVersion);
+            File versionFile = SystemFileFactory.INSTANCE
+                .getFile(directory, SimpleFileVersionController.FILE_PREFIX + currentVersion);
             if (!versionFile.createNewFile()) {
               logger.warn("Version file {} has already been created ", versionFile);
             }
-            timePartitionIdVersionControllerMap.put(partitionId, new SimpleFileVersionController(storageGroupSysDir.getPath(), partitionId));
+            timePartitionIdVersionControllerMap.put(partitionId,
+                new SimpleFileVersionController(storageGroupSysDir.getPath(), partitionId));
           }
           partitionId++;
         }
@@ -402,7 +410,8 @@ public class StorageGroupProcessor {
         });
   }
 
-  private Pair<List<TsFileResource>, List<TsFileResource>> getAllFiles(List<String> folders) throws IOException {
+  private Pair<List<TsFileResource>, List<TsFileResource>> getAllFiles(List<String> folders)
+      throws IOException {
     List<File> tsFiles = new ArrayList<>();
     List<File> upgradeFiles = new ArrayList<>();
     for (String baseDir : folders) {
@@ -420,9 +429,12 @@ public class StorageGroupProcessor {
       // the process was interrupted before the merged files could be named
       continueFailedRenames(fileFolder, MERGE_SUFFIX);
 
-      File[] oldTsfileArray = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), TSFILE_SUFFIX);
-      File[] oldResourceFileArray = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), TsFileResource.RESOURCE_SUFFIX);
-      File[] oldModificationFileArray = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), ModificationFile.FILE_SUFFIX);
+      File[] oldTsfileArray = fsFactory
+          .listFilesBySuffix(fileFolder.getAbsolutePath(), TSFILE_SUFFIX);
+      File[] oldResourceFileArray = fsFactory
+          .listFilesBySuffix(fileFolder.getAbsolutePath(), TsFileResource.RESOURCE_SUFFIX);
+      File[] oldModificationFileArray = fsFactory
+          .listFilesBySuffix(fileFolder.getAbsolutePath(), ModificationFile.FILE_SUFFIX);
       File upgradeFolder = fsFactory.getFile(fileFolder, IoTDBConstant.UPGRADE_FOLDER_NAME);
       // move the old files to upgrade folder if exists
       if (oldTsfileArray.length != 0 || oldResourceFileArray.length != 0) {
@@ -498,6 +510,34 @@ public class StorageGroupProcessor {
     return new Pair<>(ret, upgradeRet);
   }
 
+  private Map<String, List<TsFileResource>> getAllVms(List<String> folders) throws IOException {
+    List<File> vmFiles = new ArrayList<>();
+    for (String baseDir : folders) {
+      File fileFolder = fsFactory.getFile(baseDir, storageGroupName);
+      if (!fileFolder.exists()) {
+        continue;
+      }
+      Collections
+          .addAll(vmFiles, fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), VM_SUFFIX));
+    }
+
+    Map<String, List<TsFileResource>> vmTsFileResourceMap = new HashMap<>();
+    for (File f : vmFiles) {
+      TsFileResource fileResource = new TsFileResource(f);
+      fileResource.setClosed(false);
+      // make sure the flush command is called before IoTDB is down.
+      fileResource.deserialize();
+      String tsfilePrefix = f.getName().split(TSFILE_SEPARATOR)[0];
+      List<TsFileResource> vmTsFileResource = new ArrayList<>();
+      if (vmTsFileResourceMap.containsKey(tsfilePrefix)) {
+        vmTsFileResource = vmTsFileResourceMap.get(tsfilePrefix);
+      }
+      vmTsFileResource.add(fileResource);
+      vmTsFileResourceMap.put(tsfilePrefix, vmTsFileResource);
+    }
+    return vmTsFileResourceMap;
+  }
+
   private void continueFailedRenames(File fileFolder, String suffix) {
     File[] files = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), suffix);
     if (files != null) {
@@ -512,7 +552,8 @@ public class StorageGroupProcessor {
     }
   }
 
-  private void recoverSeqFiles(List<TsFileResource> tsFiles) {
+  private void recoverSeqFiles(List<TsFileResource> tsFiles,
+      Map<String, List<TsFileResource>> vmFiles) {
     for (int i = 0; i < tsFiles.size(); i++) {
       TsFileResource tsFileResource = tsFiles.get(i);
       long timePartitionId = tsFileResource.getTimePartition();
@@ -533,7 +574,9 @@ public class StorageGroupProcessor {
         tsFileResource.setClosed(true);
       } else if (writer.canWrite()) {
         // the last file is not closed, continue writing to in
+        String tsfilePrefix = tsFileResource.getFile().getName().split(TSFILE_SEPARATOR)[0];
         TsFileProcessor tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
+            vmFiles.get(tsfilePrefix),
             getVersionControllerByTimePartitionId(timePartitionId),
             this::closeUnsealedTsFileProcessorCallBack,
             this::updateLatestFlushTimeCallback, true, writer);
@@ -548,7 +591,8 @@ public class StorageGroupProcessor {
     }
   }
 
-  private void recoverUnseqFiles(List<TsFileResource> tsFiles) {
+  private void recoverUnseqFiles(List<TsFileResource> tsFiles,
+      Map<String, List<TsFileResource>> vmFiles) {
     for (int i = 0; i < tsFiles.size(); i++) {
       TsFileResource tsFileResource = tsFiles.get(i);
       long timePartitionId = tsFileResource.getTimePartition();
@@ -568,7 +612,9 @@ public class StorageGroupProcessor {
         tsFileResource.setClosed(true);
       } else if (writer.canWrite()) {
         // the last file is not closed, continue writing to in
+        String tsfilePrefix = tsFileResource.getFile().getName().split(TSFILE_SEPARATOR)[0];
         TsFileProcessor tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
+            vmFiles.get(tsfilePrefix),
             getVersionControllerByTimePartitionId(timePartitionId),
             this::closeUnsealedTsFileProcessorCallBack,
             this::unsequenceFlushCallback, false, writer);
@@ -611,7 +657,8 @@ public class StorageGroupProcessor {
       long timePartitionId = StorageEngine.getTimePartition(insertPlan.getTime());
 
       latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>());
-      partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>());
+      partitionLatestFlushedTimeForEachDevice
+          .computeIfAbsent(timePartitionId, id -> new HashMap<>());
 
       // insert to sequence or unSequence file
       insertToTsFileProcessor(insertPlan,
@@ -625,7 +672,7 @@ public class StorageGroupProcessor {
 
   /**
    * Insert a tablet (rows belonging to the same devices) into this storage group.
-   * @param insertTabletPlan
+   *
    * @throws WriteProcessException when update last cache failed
    * @throws BatchInsertionException if some of the rows failed to be inserted
    */
@@ -660,7 +707,8 @@ public class StorageGroupProcessor {
       // before is first start point
       int before = loc;
       // before time partition
-      long beforeTimePartition = StorageEngine.getTimePartition(insertTabletPlan.getTimes()[before]);
+      long beforeTimePartition = StorageEngine
+          .getTimePartition(insertTabletPlan.getTimes()[before]);
       // init map
       long lastFlushTime = partitionLatestFlushedTimeForEachDevice.
           computeIfAbsent(beforeTimePartition, id -> new HashMap<>()).
@@ -723,8 +771,8 @@ public class StorageGroupProcessor {
   }
 
   /**
-   * insert batch to tsfile processor thread-safety that the caller need to guarantee
-   * The rows to be inserted are in the range [start, end)
+   * insert batch to tsfile processor thread-safety that the caller need to guarantee The rows to be
+   * inserted are in the range [start, end)
    *
    * @param insertTabletPlan insert a tablet of a device
    * @param sequence whether is sequence
@@ -948,12 +996,12 @@ public class StorageGroupProcessor {
     VersionController versionController = getVersionControllerByTimePartitionId(timePartitionId);
     if (sequence) {
       tsFileProcessor = new TsFileProcessor(storageGroupName,
-          fsFactory.getFileWithParent(filePath),
+          fsFactory.getFileWithParent(filePath), new ArrayList<>(),
           versionController, this::closeUnsealedTsFileProcessorCallBack,
           this::updateLatestFlushTimeCallback, true);
     } else {
       tsFileProcessor = new TsFileProcessor(storageGroupName,
-          fsFactory.getFileWithParent(filePath),
+          fsFactory.getFileWithParent(filePath), new ArrayList<>(),
           versionController, this::closeUnsealedTsFileProcessorCallBack,
           this::unsequenceFlushCallback, false);
     }
@@ -1321,7 +1369,8 @@ public class StorageGroupProcessor {
 
     int deviceIndex = tsFileResource.getDeviceToIndexMap().get(deviceId);
     long startTime = tsFileResource.getStartTime(deviceIndex);
-    long endTime = tsFileResource.isClosed() || !isSeq ? tsFileResource.getEndTime(deviceIndex) : Long.MAX_VALUE;
+    long endTime = tsFileResource.isClosed() || !isSeq ? tsFileResource.getEndTime(deviceIndex)
+        : Long.MAX_VALUE;
 
     if (!isAlive(endTime)) {
       return false;
@@ -1395,7 +1444,8 @@ public class StorageGroupProcessor {
     }
   }
 
-  private void logDeletion(long timestamp, String deviceId, String measurementId, long timePartitionId)
+  private void logDeletion(long timestamp, String deviceId, String measurementId,
+      long timePartitionId)
       throws IOException {
     if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
       DeletePlan deletionPlan = new DeletePlan(timestamp, new Path(deviceId, measurementId));
@@ -1477,7 +1527,8 @@ public class StorageGroupProcessor {
       partitionLatestFlushedTimeForEachDevice
           .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
           .put(entry.getKey(), entry.getValue());
-      updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(), entry.getKey(), entry.getValue());
+      updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(),
+          entry.getKey(), entry.getValue());
       if (globalLatestFlushedTimeForEachDevice
           .getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
         globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
@@ -1490,10 +1541,11 @@ public class StorageGroupProcessor {
   /**
    * used for upgrading
    */
-  public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(long partitionId, String deviceId, long time) {
+  public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(long partitionId,
+      String deviceId, long time) {
     newlyFlushedPartitionLatestFlushedTimeForEachDevice
-            .computeIfAbsent(partitionId, id -> new HashMap<>())
-            .compute(deviceId, (k, v) -> v == null ? time : Math.max(v, time));
+        .computeIfAbsent(partitionId, id -> new HashMap<>())
+        .compute(deviceId, (k, v) -> v == null ? time : Math.max(v, time));
   }
 
   /**
@@ -1547,8 +1599,8 @@ public class StorageGroupProcessor {
     for (TsFileResource resource : upgradedResources) {
       long partitionId = resource.getTimePartition();
       resource.getDeviceToIndexMap().forEach((device, index) ->
-        updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(partitionId, device,
-            resource.getEndTime(index))
+          updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(partitionId, device,
+              resource.getEndTime(index))
       );
     }
     insertLock.writeLock().lock();
@@ -1747,7 +1799,6 @@ public class StorageGroupProcessor {
 
   /**
    * acquire the write locks of the resource and the merge lock
-   * @param seqFile
    */
   private void doubleWriteLock(TsFileResource seqFile) {
     boolean fileLockGot;
@@ -1763,7 +1814,7 @@ public class StorageGroupProcessor {
         if (fileLockGot) {
           seqFile.writeUnlock();
         }
-        if(mergeLockGot) {
+        if (mergeLockGot) {
           mergeLock.writeLock().unlock();
         }
       }
@@ -1772,7 +1823,6 @@ public class StorageGroupProcessor {
 
   /**
    * release the write locks of the resource and the merge lock
-   * @param seqFile
    */
   private void doubleWriteUnlock(TsFileResource seqFile) {
     mergeLock.writeLock().unlock();
@@ -1798,7 +1848,7 @@ public class StorageGroupProcessor {
     mergeLock.writeLock().lock();
     try {
       if (loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
-          newFilePartitionId)){
+          newFilePartitionId)) {
         updateLatestTimeMap(newTsFileResource);
       }
     } catch (DiskSpaceInsufficientException e) {
@@ -1865,7 +1915,8 @@ public class StorageGroupProcessor {
       long partitionNum = newTsFileResource.getTimePartition();
       partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
           .addAll(newTsFileResource.getHistoricalVersions());
-      updatePartitionFileVersion(partitionNum, Collections.max(newTsFileResource.getHistoricalVersions()));
+      updatePartitionFileVersion(partitionNum,
+          Collections.max(newTsFileResource.getHistoricalVersions()));
     } catch (DiskSpaceInsufficientException e) {
       logger.error(
           "Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.",
@@ -1880,11 +1931,10 @@ public class StorageGroupProcessor {
 
   /**
    * Set the version in "partition" to "version" if "version" is larger than the current version.
-   * @param partition
-   * @param version
    */
   public void setPartitionFileVersionToMax(long partition, long version) {
-    partitionMaxFileVersions.compute(partition, (prt, oldVer) -> computeMaxVersion(oldVer, version));
+    partitionMaxFileVersions
+        .compute(partition, (prt, oldVer) -> computeMaxVersion(oldVer, version));
   }
 
   private long computeMaxVersion(Long oldVersion, Long newVersion) {
@@ -1895,12 +1945,12 @@ public class StorageGroupProcessor {
   }
 
   /**
-   * Find the position of "newTsFileResource" in the sequence files if it can be inserted into them.
-   * @param newTsFileResource
-   * @param newFilePartitionId
-   * @return POS_ALREADY_EXIST(-2) if some file has the same name as the one to be inserted
-   *         POS_OVERLAP(-3) if some file overlaps the new file
-   *         an insertion position i >= -1 if the new file can be inserted between [i, i+1]
+   * Find the position of "newTsFileResource" in the sequence files if it can be inserted into
+   * them.
+   *
+   * @return POS_ALREADY_EXIST(- 2) if some file has the same name as the one to be inserted
+   * POS_OVERLAP(-3) if some file overlaps the new file an insertion position i >= -1 if the new
+   * file can be inserted between [i, i+1]
    */
   private int findInsertionPosition(TsFileResource newTsFileResource, long newFilePartitionId,
       List<TsFileResource> sequenceList) {
@@ -1941,11 +1991,9 @@ public class StorageGroupProcessor {
 
   /**
    * Compare each device in the two files to find the time relation of them.
-   * @param fileA
-   * @param fileB
-   * @return -1 if fileA is totally older than fileB (A < B)
-   *          0 if fileA is partially older than fileB and partially newer than fileB (A X B)
-   *          1 if fileA is totally newer than fileB (B < A)
+   *
+   * @return -1 if fileA is totally older than fileB (A < B) 0 if fileA is partially older than
+   * fileB and partially newer than fileB (A X B) 1 if fileA is totally newer than fileB (B < A)
    */
   private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) {
     boolean hasPre = false, hasSubsequence = false;
@@ -1981,11 +2029,9 @@ public class StorageGroupProcessor {
   }
 
   /**
-   * If the historical versions of a file is a sub-set of the given file's, (close and) remove it to reduce
-   * unnecessary merge. Only used when the file sender and the receiver share the same file
-   * close policy.
-   * Warning: DO NOT REMOVE
-   * @param resource
+   * If the historical versions of a file is a sub-set of the given file's, (close and) remove it to
+   * reduce unnecessary merge. Only used when the file sender and the receiver share the same file
+   * close policy. Warning: DO NOT REMOVE
    */
   @SuppressWarnings("unused")
   public void removeFullyOverlapFiles(TsFileResource resource) {
@@ -2026,13 +2072,10 @@ public class StorageGroupProcessor {
 
   /**
    * remove the given tsFileResource. If the corresponding tsFileProcessor is in the working status,
-   * close it before remove the related resource files.
-   * maybe time-consuming for closing a tsfile.
-   * @param tsFileResource
-   * @param iterator
-   * @param isSeq
+   * close it before remove the related resource files. maybe time-consuming for closing a tsfile.
    */
-  private void removeFullyOverlapFile(TsFileResource tsFileResource, Iterator<TsFileResource> iterator
+  private void removeFullyOverlapFile(TsFileResource tsFileResource,
+      Iterator<TsFileResource> iterator
       , boolean isSeq) {
     if (!tsFileResource.isClosed()) {
       // also remove the TsFileProcessor if the overlapped file is not closed
@@ -2074,8 +2117,8 @@ public class StorageGroupProcessor {
    * version number is the version number in the tsfile with a larger timestamp.
    *
    * @param tsfileName origin tsfile name
-   * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex
-   *                   + 1]
+   * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex +
+   * 1]
    * @return appropriate filename
    */
   private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
@@ -2142,9 +2185,9 @@ public class StorageGroupProcessor {
    * @param type load type
    * @param tsFileResource tsfile resource to be loaded
    * @param filePartitionId the partition id of the new file
-   * @UsedBy sync module, load external tsfile module.
    * @return load the file successfully
    * @UsedBy sync module, load external tsfile module.
+   * @UsedBy sync module, load external tsfile module.
    */
   private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile,
       TsFileResource tsFileResource, long filePartitionId)
@@ -2153,7 +2196,8 @@ public class StorageGroupProcessor {
     switch (type) {
       case LOAD_UNSEQUENCE:
         targetFile = new File(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
-            storageGroupName + File.separatorChar + filePartitionId + File.separator + tsFileResource
+            storageGroupName + File.separatorChar + filePartitionId + File.separator
+                + tsFileResource
                 .getFile().getName());
         tsFileResource.setFile(targetFile);
         if (unSequenceFileList.contains(tsFileResource)) {
@@ -2213,7 +2257,8 @@ public class StorageGroupProcessor {
     }
     partitionDirectFileVersions.computeIfAbsent(filePartitionId,
         p -> new HashSet<>()).addAll(tsFileResource.getHistoricalVersions());
-    updatePartitionFileVersion(filePartitionId, Collections.max(tsFileResource.getHistoricalVersions()));
+    updatePartitionFileVersion(filePartitionId,
+        Collections.max(tsFileResource.getHistoricalVersions()));
     return true;
   }
 
@@ -2354,6 +2399,7 @@ public class StorageGroupProcessor {
 
   @FunctionalInterface
   public interface CloseTsFileCallBack {
+
     void call(TsFileProcessor caller) throws TsFileProcessorException, IOException;
   }
 
@@ -2362,17 +2408,15 @@ public class StorageGroupProcessor {
   }
 
   /**
-   * Check if the data of "tsFileResource" all exist locally by comparing the historical versions
-   * in the partition of "partitionNumber". This is available only when the IoTDB instances which generated
-   * "tsFileResource" have the same close file policy as the local one.
-   * If one of the version in "tsFileResource" equals to a version of a working file, false is
-   * returned because "tsFileResource" may have unwritten data of that file.
-   * @param tsFileResource
-   * @param partitionNum
+   * Check if the data of "tsFileResource" all exist locally by comparing the historical versions in
+   * the partition of "partitionNumber". This is available only when the IoTDB instances which
+   * generated "tsFileResource" have the same close file policy as the local one. If one of the
+   * version in "tsFileResource" equals to a version of a working file, false is returned because
+   * "tsFileResource" may have unwritten data of that file.
+   *
    * @return true if the historicalVersions of "tsFileResource" is a subset of
-   * partitionDirectFileVersions, or false if it is not a subset and it contains any
-   * version of a working file
-   * USED by cluster module
+   * partitionDirectFileVersions, or false if it is not a subset and it contains any version of a
+   * working file USED by cluster module
    */
   public boolean isFileAlreadyExist(TsFileResource tsFileResource, long partitionNum) {
     // consider the case: The local node crashes when it is writing TsFile no.5.