You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/03/16 10:59:36 UTC

[iotdb] branch rel/0.11 updated: [To 0.11] fix unseq compaction loss data bug after delete operation (#2801)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.11 by this push:
     new 052b232  [To 0.11] fix unseq compaction loss data bug after delete operation (#2801)
052b232 is described below

commit 052b232a7de30ef771a06d89af667ee71f247377
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Tue Mar 16 18:59:12 2021 +0800

    [To 0.11] fix unseq compaction loss data bug after delete operation (#2801)
---
 .../db/engine/merge/manage/MergeResource.java      |  56 ++++--
 .../iotdb/db/engine/merge/task/MergeFileTask.java  | 114 +++++++----
 .../db/engine/merge/task/MergeMultiChunkTask.java  | 211 ++++++++++++++-------
 3 files changed, 254 insertions(+), 127 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
index d304d4c..bb4d069 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -60,7 +61,10 @@ public class MergeResource {
   private Map<TsFileResource, TsFileSequenceReader> fileReaderCache = new HashMap<>();
   private Map<TsFileResource, RestorableTsFileIOWriter> fileWriterCache = new HashMap<>();
   private Map<TsFileResource, List<Modification>> modificationCache = new HashMap<>();
-  private Map<PartialPath, MeasurementSchema> measurementSchemaMap = new HashMap<>(); //is this too waste?
+  private Map<TsFileResource, Map<String, Pair<Long, Long>>> startEndTimeCache =
+      new HashMap<>(); // pair<startTime, endTime>
+  private Map<PartialPath, MeasurementSchema> measurementSchemaMap =
+      new HashMap<>(); // is this too waste?
   private Map<MeasurementSchema, IChunkWriter> chunkWriterCache = new ConcurrentHashMap<>();
 
   private long timeLowerBound = Long.MIN_VALUE;
@@ -68,10 +72,8 @@ public class MergeResource {
   private boolean cacheDeviceMeta = false;
 
   public MergeResource(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles) {
-    this.seqFiles = seqFiles.stream().filter(this::filterResource)
-        .collect(Collectors.toList());
-    this.unseqFiles = unseqFiles.stream().filter(this::filterResource)
-        .collect(Collectors.toList());
+    this.seqFiles = seqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
+    this.unseqFiles = unseqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
   }
 
   /** If returns true, it means to participate in the merge */
@@ -81,13 +83,11 @@ public class MergeResource {
         && (!res.isClosed() || res.stillLives(timeLowerBound));
   }
 
-  public MergeResource(Collection<TsFileResource> seqFiles, List<TsFileResource> unseqFiles,
-      long timeLowerBound) {
+  public MergeResource(
+      Collection<TsFileResource> seqFiles, List<TsFileResource> unseqFiles, long timeLowerBound) {
     this.timeLowerBound = timeLowerBound;
-    this.seqFiles =
-        seqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
-    this.unseqFiles =
-        unseqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
+    this.seqFiles = seqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
+    this.unseqFiles = unseqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
   }
 
   public void clear() throws IOException {
@@ -118,8 +118,9 @@ public class MergeResource {
   public RestorableTsFileIOWriter getMergeFileWriter(TsFileResource resource) throws IOException {
     RestorableTsFileIOWriter writer = fileWriterCache.get(resource);
     if (writer == null) {
-      writer = new RestorableTsFileIOWriter(FSFactoryProducer.getFSFactory()
-          .getFile(resource.getTsFilePath() + MERGE_SUFFIX));
+      writer =
+          new RestorableTsFileIOWriter(
+              FSFactoryProducer.getFSFactory().getFile(resource.getTsFilePath() + MERGE_SUFFIX));
       fileWriterCache.put(resource, writer);
     }
     return writer;
@@ -183,8 +184,9 @@ public class MergeResource {
    */
   public List<Modification> getModifications(TsFileResource tsFileResource, PartialPath path) {
     // copy from TsFileResource so queries are not affected
-    List<Modification> modifications = modificationCache.computeIfAbsent(tsFileResource,
-        resource -> new LinkedList<>(resource.getModFile().getModifications()));
+    List<Modification> modifications =
+        modificationCache.computeIfAbsent(
+            tsFileResource, resource -> new LinkedList<>(resource.getModFile().getModifications()));
     List<Modification> pathModifications = new ArrayList<>();
     Iterator<Modification> modificationIterator = modifications.iterator();
     while (modificationIterator.hasNext()) {
@@ -234,8 +236,7 @@ public class MergeResource {
     return unseqFiles;
   }
 
-  public void setUnseqFiles(
-      List<TsFileResource> unseqFiles) {
+  public void setUnseqFiles(List<TsFileResource> unseqFiles) {
     this.unseqFiles = unseqFiles;
   }
 
@@ -265,4 +266,25 @@ public class MergeResource {
     this.chunkWriterCache.clear();
   }
 
+  public void updateStartTime(TsFileResource tsFileResource, String device, long startTime) {
+    Map<String, Pair<Long, Long>> deviceStartEndTimePairMap =
+        startEndTimeCache.computeIfAbsent(tsFileResource, k -> new HashMap<>());
+    Pair<Long, Long> startEndTimePair =
+        deviceStartEndTimePairMap.computeIfAbsent(
+            device, k -> new Pair<>(Long.MAX_VALUE, Long.MIN_VALUE));
+    startEndTimePair.left = startEndTimePair.left > startTime ? startTime : startEndTimePair.left;
+  }
+
+  public void updateEndTime(TsFileResource tsFileResource, String device, long endTime) {
+    Map<String, Pair<Long, Long>> deviceStartEndTimePairMap =
+        startEndTimeCache.computeIfAbsent(tsFileResource, k -> new HashMap<>());
+    Pair<Long, Long> startEndTimePair =
+        deviceStartEndTimePairMap.computeIfAbsent(
+            device, k -> new Pair<>(Long.MAX_VALUE, Long.MIN_VALUE));
+    startEndTimePair.right = startEndTimePair.right < endTime ? endTime : startEndTimePair.right;
+  }
+
+  public Map<String, Pair<Long, Long>> getStartEndTime(TsFileResource tsFileResource) {
+    return startEndTimeCache.getOrDefault(tsFileResource, new HashMap<>());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index 2685c1d..92914a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.writer.ForceAppendTsFileWriter;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
@@ -67,8 +68,12 @@ class MergeFileTask {
   private int currentMergeIndex;
   private String currMergeFile;
 
-  MergeFileTask(String taskName, MergeContext context, MergeLogger mergeLogger,
-      MergeResource resource, List<TsFileResource> unmergedSeqFiles) {
+  MergeFileTask(
+      String taskName,
+      MergeContext context,
+      MergeLogger mergeLogger,
+      MergeResource resource,
+      List<TsFileResource> unmergedSeqFiles) {
     this.taskName = taskName;
     this.context = context;
     this.mergeLogger = mergeLogger;
@@ -93,16 +98,24 @@ class MergeFileTask {
       if (mergedChunkNum >= unmergedChunkNum) {
         // move the unmerged data to the new file
         if (logger.isInfoEnabled()) {
-          logger.info("{} moving unmerged data of {} to the merged file, {} merged chunks, {} "
-                  + "unmerged chunks", taskName, seqFile.getTsFile().getName(), mergedChunkNum,
+          logger.info(
+              "{} moving unmerged data of {} to the merged file, {} merged chunks, {} "
+                  + "unmerged chunks",
+              taskName,
+              seqFile.getTsFile().getName(),
+              mergedChunkNum,
               unmergedChunkNum);
         }
         moveUnmergedToNew(seqFile);
       } else {
         // move the merged data to the old file
         if (logger.isInfoEnabled()) {
-          logger.info("{} moving merged data of {} to the old file {} merged chunks, {} "
-                  + "unmerged chunks", taskName, seqFile.getTsFile().getName(), mergedChunkNum,
+          logger.info(
+              "{} moving merged data of {} to the old file {} merged chunks, {} "
+                  + "unmerged chunks",
+              taskName,
+              seqFile.getTsFile().getName(),
+              mergedChunkNum,
               unmergedChunkNum);
         }
         moveMergedToOld(seqFile);
@@ -116,22 +129,27 @@ class MergeFileTask {
       logProgress();
     }
     if (logger.isInfoEnabled()) {
-      logger.info("{} has merged all files after {}ms", taskName,
-          System.currentTimeMillis() - startTime);
+      logger.info(
+          "{} has merged all files after {}ms", taskName, System.currentTimeMillis() - startTime);
     }
     mergeLogger.logMergeEnd();
   }
 
   private void logProgress() {
     if (logger.isDebugEnabled()) {
-      logger.debug("{} has merged {}, processed {}/{} files", taskName, currMergeFile,
-          currentMergeIndex + 1, unmergedFiles.size());
+      logger.debug(
+          "{} has merged {}, processed {}/{} files",
+          taskName,
+          currMergeFile,
+          currentMergeIndex + 1,
+          unmergedFiles.size());
     }
   }
 
   public String getProgress() {
-    return String.format("Merging %s, processed %d/%d files", currMergeFile,
-        currentMergeIndex + 1, unmergedFiles.size());
+    return String.format(
+        "Merging %s, processed %d/%d files",
+        currMergeFile, currentMergeIndex + 1, unmergedFiles.size());
   }
 
   private void moveMergedToOld(TsFileResource seqFile) throws IOException {
@@ -150,8 +168,7 @@ class MergeFileTask {
       TsFileIOWriter oldFileWriter = getOldFileWriter(seqFile);
 
       // filter the chunks that have been merged
-      oldFileWriter.filterChunks(new HashMap<>(context.getUnmergedChunkStartTimes().get(seqFile))
-      );
+      oldFileWriter.filterChunks(new HashMap<>(context.getUnmergedChunkStartTimes().get(seqFile)));
 
       RestorableTsFileIOWriter newFileWriter = resource.getMergeFileWriter(seqFile);
       newFileWriter.close();
@@ -160,11 +177,11 @@ class MergeFileTask {
         Map<String, List<ChunkMetadata>> chunkMetadataListInChunkGroups =
             newFileWriter.getDeviceChunkMetadataMap();
         if (logger.isDebugEnabled()) {
-          logger.debug("{} find {} merged chunk groups", taskName,
-              chunkMetadataListInChunkGroups.size());
+          logger.debug(
+              "{} find {} merged chunk groups", taskName, chunkMetadataListInChunkGroups.size());
         }
-        for (Map.Entry<String, List<ChunkMetadata>> entry : chunkMetadataListInChunkGroups
-            .entrySet()) {
+        for (Map.Entry<String, List<ChunkMetadata>> entry :
+            chunkMetadataListInChunkGroups.entrySet()) {
           String deviceId = entry.getKey();
           List<ChunkMetadata> chunkMetadataList = entry.getValue();
           writeMergedChunkGroup(chunkMetadataList, deviceId, newFileReader, oldFileWriter);
@@ -193,15 +210,26 @@ class MergeFileTask {
   }
 
   private void updateStartTimeAndEndTime(TsFileResource seqFile, TsFileIOWriter fileWriter) {
-    //TODO change to get one timeseries block each time
-    for (Entry<String, List<ChunkMetadata>> deviceChunkMetadataEntry : fileWriter
-        .getDeviceChunkMetadataMap().entrySet()) {
-      String device = deviceChunkMetadataEntry.getKey();
-      for (ChunkMetadata chunkMetadata : deviceChunkMetadataEntry.getValue()) {
-        seqFile.updateStartTime(device, chunkMetadata.getStartTime());
-        seqFile.updateEndTime(device, chunkMetadata.getEndTime());
+    // TODO change to get one timeseries block each time
+    Map<String, List<ChunkMetadata>> deviceChunkMetadataListMap =
+        fileWriter.getDeviceChunkMetadataMap();
+    for (Entry<String, List<ChunkMetadata>> deviceChunkMetadataListEntry :
+        deviceChunkMetadataListMap.entrySet()) {
+      String device = deviceChunkMetadataListEntry.getKey();
+      for (ChunkMetadata chunkMetadata : deviceChunkMetadataListEntry.getValue()) {
+        resource.updateStartTime(seqFile, device, chunkMetadata.getStartTime());
+        resource.updateEndTime(seqFile, device, chunkMetadata.getEndTime());
       }
     }
+    // update all device start time and end time of the resource
+    Map<String, Pair<Long, Long>> deviceStartEndTimePairMap = resource.getStartEndTime(seqFile);
+    for (Entry<String, Pair<Long, Long>> deviceStartEndTimePairEntry :
+        deviceStartEndTimePairMap.entrySet()) {
+      String device = deviceStartEndTimePairEntry.getKey();
+      Pair<Long, Long> startEndTimePair = deviceStartEndTimePairEntry.getValue();
+      seqFile.putStartTime(device, startEndTimePair.left);
+      seqFile.putEndTime(device, startEndTimePair.right);
+    }
   }
 
   /**
@@ -209,8 +237,8 @@ class MergeFileTask {
    * aborted.
    */
   private void restoreOldFile(TsFileResource seqFile) throws IOException {
-    RestorableTsFileIOWriter oldFileRecoverWriter = new RestorableTsFileIOWriter(
-        seqFile.getTsFile());
+    RestorableTsFileIOWriter oldFileRecoverWriter =
+        new RestorableTsFileIOWriter(seqFile.getTsFile());
     if (oldFileRecoverWriter.hasCrashed() && oldFileRecoverWriter.canWrite()) {
       oldFileRecoverWriter.endFile();
     } else {
@@ -218,15 +246,13 @@ class MergeFileTask {
     }
   }
 
-  /**
-   * Open an appending writer for an old seq file so we can add new chunks to it.
-   */
+  /** Open an appending writer for an old seq file so we can add new chunks to it. */
   private TsFileIOWriter getOldFileWriter(TsFileResource seqFile) throws IOException {
     TsFileIOWriter oldFileWriter;
     try {
       oldFileWriter = new ForceAppendTsFileWriter(seqFile.getTsFile());
-      mergeLogger.logFileMergeStart(seqFile.getTsFile(),
-          ((ForceAppendTsFileWriter) oldFileWriter).getTruncatePosition());
+      mergeLogger.logFileMergeStart(
+          seqFile.getTsFile(), ((ForceAppendTsFileWriter) oldFileWriter).getTruncatePosition());
       logger.debug("{} moving merged chunks of {} to the old file", taskName, seqFile);
       ((ForceAppendTsFileWriter) oldFileWriter).doTruncate();
     } catch (TsFileNotCompleteException e) {
@@ -248,8 +274,11 @@ class MergeFileTask {
     }
   }
 
-  private void writeMergedChunkGroup(List<ChunkMetadata> chunkMetadataList, String device,
-      TsFileSequenceReader reader, TsFileIOWriter fileWriter)
+  private void writeMergedChunkGroup(
+      List<ChunkMetadata> chunkMetadataList,
+      String device,
+      TsFileSequenceReader reader,
+      TsFileIOWriter fileWriter)
       throws IOException {
     fileWriter.startChunkGroup(device);
     long maxVersion = 0;
@@ -288,8 +317,9 @@ class MergeFileTask {
         }
 
         fileWriter.startChunkGroup(path.getDevice());
-        long maxVersion = writeUnmergedChunks(chunkStartTimes, chunkMetadataList,
-            resource.getFileReader(seqFile), fileWriter);
+        long maxVersion =
+            writeUnmergedChunks(
+                chunkStartTimes, chunkMetadataList, resource.getFileReader(seqFile), fileWriter);
 
         if (Thread.interrupted()) {
           Thread.currentThread().interrupt();
@@ -329,9 +359,12 @@ class MergeFileTask {
     }
   }
 
-  private long writeUnmergedChunks(List<Long> chunkStartTimes,
-      List<ChunkMetadata> chunkMetadataList, TsFileSequenceReader reader,
-      RestorableTsFileIOWriter fileWriter) throws IOException {
+  private long writeUnmergedChunks(
+      List<Long> chunkStartTimes,
+      List<ChunkMetadata> chunkMetadataList,
+      TsFileSequenceReader reader,
+      RestorableTsFileIOWriter fileWriter)
+      throws IOException {
     long maxVersion = 0;
     int chunkIdx = 0;
     for (Long startTime : chunkStartTimes) {
@@ -353,5 +386,4 @@ class MergeFileTask {
     }
     return maxVersion;
   }
-
-}
\ No newline at end of file
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
index a57bcf7..8be0bfe 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
@@ -61,8 +61,8 @@ import org.slf4j.LoggerFactory;
 public class MergeMultiChunkTask {
 
   private static final Logger logger = LoggerFactory.getLogger(MergeMultiChunkTask.class);
-  private static int minChunkPointNum = IoTDBDescriptor.getInstance().getConfig()
-      .getMergeChunkPointNumberThreshold();
+  private static int minChunkPointNum =
+      IoTDBDescriptor.getInstance().getConfig().getMergeChunkPointNumberThreshold();
 
   private MergeLogger mergeLogger;
   private List<PartialPath> unmergedSeries;
@@ -84,9 +84,15 @@ public class MergeMultiChunkTask {
 
   private String storageGroupName;
 
-  public MergeMultiChunkTask(MergeContext context, String taskName, MergeLogger mergeLogger,
-      MergeResource mergeResource, boolean fullMerge, List<PartialPath> unmergedSeries,
-      int concurrentMergeSeriesNum, String storageGroupName) {
+  public MergeMultiChunkTask(
+      MergeContext context,
+      String taskName,
+      MergeLogger mergeLogger,
+      MergeResource mergeResource,
+      boolean fullMerge,
+      List<PartialPath> unmergedSeries,
+      int concurrentMergeSeriesNum,
+      String storageGroupName) {
     this.mergeContext = context;
     this.taskName = taskName;
     this.mergeLogger = mergeLogger;
@@ -124,8 +130,8 @@ public class MergeMultiChunkTask {
       }
     }
     if (logger.isInfoEnabled()) {
-      logger.info("{} all series are merged after {}ms", taskName,
-          System.currentTimeMillis() - startTime);
+      logger.info(
+          "{} all series are merged after {}ms", taskName, System.currentTimeMillis() - startTime);
     }
     mergeLogger.logAllTsEnd();
   }
@@ -166,8 +172,7 @@ public class MergeMultiChunkTask {
     mergeLogger.logTSEnd();
   }
 
-  private void pathsMergeOneFile(int seqFileIdx, IPointReader[] unseqReaders)
-      throws IOException {
+  private void pathsMergeOneFile(int seqFileIdx, IPointReader[] unseqReaders) throws IOException {
     TsFileResource currTsFile = resource.getSeqFiles().get(seqFileIdx);
     String deviceId = currMergingPaths.get(0).getDevice();
     long currDeviceMinTime = currTsFile.getStartTime(deviceId);
@@ -196,6 +201,10 @@ public class MergeMultiChunkTask {
       modifications[i] = resource.getModifications(currTsFile, currMergingPaths.get(i));
       seqChunkMeta[i] = resource.queryChunkMetadata(currMergingPaths.get(i), currTsFile);
       modifyChunkMetaData(seqChunkMeta[i], modifications[i]);
+      for (ChunkMetadata chunkMetadata : seqChunkMeta[i]) {
+        resource.updateStartTime(currTsFile, deviceId, chunkMetadata.getStartTime());
+        resource.updateEndTime(currTsFile, deviceId, chunkMetadata.getEndTime());
+      }
 
       if (Thread.interrupted()) {
         Thread.currentThread().interrupt();
@@ -215,8 +224,14 @@ public class MergeMultiChunkTask {
     }
     // merge unseq data with seq data in this file or small chunks in this file into a larger chunk
     mergeFileWriter.startChunkGroup(deviceId);
-    boolean dataWritten = mergeChunks(seqChunkMeta, isLastFile, fileSequenceReader, unseqReaders,
-        mergeFileWriter, currTsFile);
+    boolean dataWritten =
+        mergeChunks(
+            seqChunkMeta,
+            isLastFile,
+            fileSequenceReader,
+            unseqReaders,
+            mergeFileWriter,
+            currTsFile);
     if (dataWritten) {
       mergeFileWriter.writeVersion(0L);
       mergeFileWriter.endChunkGroup();
@@ -230,8 +245,10 @@ public class MergeMultiChunkTask {
     // series should also be written into a new chunk
     List<Integer> ret = new ArrayList<>();
     for (int i = 0; i < currMergingPaths.size(); i++) {
-      if (seqChunkMeta[i] == null || seqChunkMeta[i].isEmpty()
-          && !(seqFileIdx + 1 == resource.getSeqFiles().size() && currTimeValuePairs[i] != null)) {
+      if (seqChunkMeta[i] == null
+          || seqChunkMeta[i].isEmpty()
+              && !(seqFileIdx + 1 == resource.getSeqFiles().size()
+                  && currTimeValuePairs[i] != null)) {
         continue;
       }
       ret.add(i);
@@ -240,13 +257,17 @@ public class MergeMultiChunkTask {
   }
 
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
-  private boolean mergeChunks(List<ChunkMetadata>[] seqChunkMeta, boolean isLastFile,
-      TsFileSequenceReader reader, IPointReader[] unseqReaders,
-      RestorableTsFileIOWriter mergeFileWriter, TsFileResource currFile)
+  private boolean mergeChunks(
+      List<ChunkMetadata>[] seqChunkMeta,
+      boolean isLastFile,
+      TsFileSequenceReader reader,
+      IPointReader[] unseqReaders,
+      RestorableTsFileIOWriter mergeFileWriter,
+      TsFileResource currFile)
       throws IOException {
     int[] ptWrittens = new int[seqChunkMeta.length];
-    int mergeChunkSubTaskNum = IoTDBDescriptor.getInstance().getConfig()
-        .getMergeChunkSubThreadNum();
+    int mergeChunkSubTaskNum =
+        IoTDBDescriptor.getInstance().getConfig().getMergeChunkSubThreadNum();
     MetaListEntry[] metaListEntries = new MetaListEntry[currMergingPaths.size()];
     PriorityQueue<Integer>[] chunkIdxHeaps = new PriorityQueue[mergeChunkSubTaskNum];
     for (int i = 0; i < mergeChunkSubTaskNum; i++) {
@@ -271,13 +292,19 @@ public class MergeMultiChunkTask {
 
     List<Future<Void>> futures = new ArrayList<>();
     for (int i = 0; i < mergeChunkSubTaskNum; i++) {
-      futures.add(MergeManager.getINSTANCE()
-          .submitChunkSubTask(new MergeChunkHeapTask(chunkIdxHeaps[i],
-              metaListEntries, ptWrittens,
-              reader,
-              mergeFileWriter, unseqReaders,
-              currFile,
-              isLastFile, i)));
+      futures.add(
+          MergeManager.getINSTANCE()
+              .submitChunkSubTask(
+                  new MergeChunkHeapTask(
+                      chunkIdxHeaps[i],
+                      metaListEntries,
+                      ptWrittens,
+                      reader,
+                      mergeFileWriter,
+                      unseqReaders,
+                      currFile,
+                      isLastFile,
+                      i)));
 
       if (Thread.interrupted()) {
         Thread.currentThread().interrupt();
@@ -297,49 +324,74 @@ public class MergeMultiChunkTask {
     }
 
     // add merge and unmerged chunk statistic
-    mergeContext.getMergedChunkCnt().compute(currFile, (tsFileResource, anInt) -> anInt == null ?
-        mergedChunkNum.get() : anInt + mergedChunkNum.get());
-    mergeContext.getUnmergedChunkCnt().compute(currFile, (tsFileResource, anInt) -> anInt == null ?
-        unmergedChunkNum.get() : anInt + unmergedChunkNum.get());
+    mergeContext
+        .getMergedChunkCnt()
+        .compute(
+            currFile,
+            (tsFileResource, anInt) ->
+                anInt == null ? mergedChunkNum.get() : anInt + mergedChunkNum.get());
+    mergeContext
+        .getUnmergedChunkCnt()
+        .compute(
+            currFile,
+            (tsFileResource, anInt) ->
+                anInt == null ? unmergedChunkNum.get() : anInt + unmergedChunkNum.get());
 
     return mergedChunkNum.get() > 0;
   }
 
-
   /**
    * merge a sequence chunk SK
-   * <p>
-   * 1. no need to write the chunk to .merge file when: isn't full merge & there isn't unclosed
+   *
+   * <p>1. no need to write the chunk to .merge file when: isn't full merge & there isn't unclosed
    * chunk before & SK is big enough & SK isn't overflowed & SK isn't modified
+   *
    * <p>
-   * <p>
-   * 2. write SK to .merge.file without compressing when: is full merge & there isn't unclosed chunk
-   * before & SK is big enough & SK isn't overflowed & SK isn't modified
-   * <p>
-   * 3. other cases: need to unCompress the chunk and write 3.1 SK isn't overflowed 3.2 SK is
+   *
+   * <p>2. write SK to .merge.file without compressing when: is full merge & there isn't unclosed
+   * chunk before & SK is big enough & SK isn't overflowed & SK isn't modified
+   *
+   * <p>3. other cases: need to unCompress the chunk and write 3.1 SK isn't overflowed 3.2 SK is
    * overflowed
    */
   @SuppressWarnings("java:S2445") // avoid writing the same writer concurrently
-  private int mergeChunkV2(ChunkMetadata currMeta, boolean chunkOverflowed,
-      boolean chunkTooSmall, Chunk chunk, int lastUnclosedChunkPoint, int pathIdx,
-      TsFileIOWriter mergeFileWriter, IPointReader unseqReader,
-      IChunkWriter chunkWriter, TsFileResource currFile) throws IOException {
+  private int mergeChunkV2(
+      ChunkMetadata currMeta,
+      boolean chunkOverflowed,
+      boolean chunkTooSmall,
+      Chunk chunk,
+      int lastUnclosedChunkPoint,
+      int pathIdx,
+      TsFileIOWriter mergeFileWriter,
+      IPointReader unseqReader,
+      IChunkWriter chunkWriter,
+      TsFileResource currFile)
+      throws IOException {
 
     int unclosedChunkPoint = lastUnclosedChunkPoint;
-    boolean chunkModified = (currMeta.getDeleteIntervalList() != null &&
-        !currMeta.getDeleteIntervalList().isEmpty());
+    boolean chunkModified =
+        (currMeta.getDeleteIntervalList() != null && !currMeta.getDeleteIntervalList().isEmpty());
 
     // no need to write the chunk to .merge file
-    if (!fullMerge && lastUnclosedChunkPoint == 0 && !chunkTooSmall && !chunkOverflowed
+    if (!fullMerge
+        && lastUnclosedChunkPoint == 0
+        && !chunkTooSmall
+        && !chunkOverflowed
         && !chunkModified) {
       unmergedChunkNum.incrementAndGet();
-      mergeContext.getUnmergedChunkStartTimes().get(currFile).get(currMergingPaths.get(pathIdx))
+      mergeContext
+          .getUnmergedChunkStartTimes()
+          .get(currFile)
+          .get(currMergingPaths.get(pathIdx))
           .add(currMeta.getStartTime());
       return 0;
     }
 
     // write SK to .merge.file without compressing
-    if (fullMerge && lastUnclosedChunkPoint == 0 && !chunkTooSmall && !chunkOverflowed
+    if (fullMerge
+        && lastUnclosedChunkPoint == 0
+        && !chunkTooSmall
+        && !chunkOverflowed
         && !chunkModified) {
       synchronized (mergeFileWriter) {
         mergeFileWriter.writeChunk(chunk, currMeta);
@@ -356,8 +408,8 @@ public class MergeMultiChunkTask {
       mergedChunkNum.incrementAndGet();
     } else {
       // 3.2 SK is overflowed, uncompress sequence chunk and merge with unseq chunk, then write
-      unclosedChunkPoint += writeChunkWithUnseq(chunk, chunkWriter, unseqReader,
-          currMeta.getEndTime(), pathIdx);
+      unclosedChunkPoint +=
+          writeChunkWithUnseq(chunk, chunkWriter, unseqReader, currMeta.getEndTime(), pathIdx);
       mergedChunkNum.incrementAndGet();
     }
 
@@ -374,8 +426,9 @@ public class MergeMultiChunkTask {
     return unclosedChunkPoint;
   }
 
-  private int writeRemainingUnseq(IChunkWriter chunkWriter,
-      IPointReader unseqReader, long timeLimit, int pathIdx) throws IOException {
+  private int writeRemainingUnseq(
+      IChunkWriter chunkWriter, IPointReader unseqReader, long timeLimit, int pathIdx)
+      throws IOException {
     int ptWritten = 0;
     while (currTimeValuePairs[pathIdx] != null
         && currTimeValuePairs[pathIdx].getTimestamp() < timeLimit) {
@@ -388,8 +441,13 @@ public class MergeMultiChunkTask {
     return ptWritten;
   }
 
-  private int writeChunkWithUnseq(Chunk chunk, IChunkWriter chunkWriter, IPointReader unseqReader,
-      long chunkLimitTime, int pathIdx) throws IOException {
+  private int writeChunkWithUnseq(
+      Chunk chunk,
+      IChunkWriter chunkWriter,
+      IPointReader unseqReader,
+      long chunkLimitTime,
+      int pathIdx)
+      throws IOException {
     int cnt = 0;
     ChunkReader chunkReader = new ChunkReader(chunk, null);
     while (chunkReader.hasNextSatisfiedPage()) {
@@ -400,8 +458,9 @@ public class MergeMultiChunkTask {
     return cnt;
   }
 
-  private int mergeWriteBatch(BatchData batchData, IChunkWriter chunkWriter,
-      IPointReader unseqReader, int pathIdx) throws IOException {
+  private int mergeWriteBatch(
+      BatchData batchData, IChunkWriter chunkWriter, IPointReader unseqReader, int pathIdx)
+      throws IOException {
     int cnt = 0;
     for (int i = 0; i < batchData.length(); i++) {
       long time = batchData.getTimeByIndex(i);
@@ -443,11 +502,16 @@ public class MergeMultiChunkTask {
 
     private int totalSeriesNum;
 
-    public MergeChunkHeapTask(PriorityQueue<Integer> chunkIdxHeap,
-        MetaListEntry[] metaListEntries, int[] ptWrittens,
+    public MergeChunkHeapTask(
+        PriorityQueue<Integer> chunkIdxHeap,
+        MetaListEntry[] metaListEntries,
+        int[] ptWrittens,
         TsFileSequenceReader reader,
         RestorableTsFileIOWriter mergeFileWriter,
-        IPointReader[] unseqReaders, TsFileResource currFile, boolean isLastFile, int taskNum) {
+        IPointReader[] unseqReaders,
+        TsFileResource currFile,
+        boolean isLastFile,
+        int taskNum) {
       this.chunkIdxHeap = chunkIdxHeap;
       this.metaListEntries = metaListEntries;
       this.ptWrittens = ptWrittens;
@@ -482,18 +546,28 @@ public class MergeMultiChunkTask {
           MetaListEntry metaListEntry = metaListEntries[pathIdx];
           ChunkMetadata currMeta = metaListEntry.current();
           boolean isLastChunk = !metaListEntry.hasNext();
-          boolean chunkOverflowed = MergeUtils
-              .isChunkOverflowed(currTimeValuePairs[pathIdx], currMeta);
-          boolean chunkTooSmall = MergeUtils
-              .isChunkTooSmall(ptWrittens[pathIdx], currMeta, isLastChunk, minChunkPointNum);
+          boolean chunkOverflowed =
+              MergeUtils.isChunkOverflowed(currTimeValuePairs[pathIdx], currMeta);
+          boolean chunkTooSmall =
+              MergeUtils.isChunkTooSmall(
+                  ptWrittens[pathIdx], currMeta, isLastChunk, minChunkPointNum);
 
           Chunk chunk;
           synchronized (reader) {
             chunk = reader.readMemChunk(currMeta);
           }
-          ptWrittens[pathIdx] = mergeChunkV2(currMeta, chunkOverflowed, chunkTooSmall, chunk,
-              ptWrittens[pathIdx], pathIdx, mergeFileWriter, unseqReaders[pathIdx], chunkWriter,
-              currFile);
+          ptWrittens[pathIdx] =
+              mergeChunkV2(
+                  currMeta,
+                  chunkOverflowed,
+                  chunkTooSmall,
+                  chunk,
+                  ptWrittens[pathIdx],
+                  pathIdx,
+                  mergeFileWriter,
+                  unseqReaders[pathIdx],
+                  chunkWriter,
+                  currFile);
 
           if (!isLastChunk) {
             metaListEntry.next();
@@ -504,9 +578,8 @@ public class MergeMultiChunkTask {
         // this only happens when the seqFiles do not contain this series, otherwise the remaining
         // data will be merged with the last chunk in the seqFiles
         if (isLastFile && currTimeValuePairs[pathIdx] != null) {
-          ptWrittens[pathIdx] += writeRemainingUnseq(chunkWriter, unseqReaders[pathIdx],
-              Long.MAX_VALUE,
-              pathIdx);
+          ptWrittens[pathIdx] +=
+              writeRemainingUnseq(chunkWriter, unseqReaders[pathIdx], Long.MAX_VALUE, pathIdx);
           mergedChunkNum.incrementAndGet();
         }
         // the last merged chunk may still be smaller than the threshold, flush it anyway
@@ -527,8 +600,8 @@ public class MergeMultiChunkTask {
     }
 
     public String getProgress() {
-      return String.format("Processed %d/%d series", totalSeriesNum - chunkIdxHeap.size(),
-          totalSeriesNum);
+      return String.format(
+          "Processed %d/%d series", totalSeriesNum - chunkIdxHeap.size(), totalSeriesNum);
     }
   }
 }