You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/07/17 11:26:49 UTC

[incubator-iotdb] branch dev_merge updated: add documentation and code refinements

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev_merge
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/dev_merge by this push:
     new a301d7b  add documentation and code refinements
a301d7b is described below

commit a301d7b89039de39eb9dd9bce3cd1d6a31991ec5
Author: 江天 <jt...@163.com>
AuthorDate: Wed Jul 17 19:24:25 2019 +0800

    add documentation and code refinements
---
 .../iotdb/db/engine/merge/manage/MergeManager.java |  4 +
 .../db/engine/merge/manage/MergeResource.java      | 17 +++-
 .../iotdb/db/engine/merge/recover/LogAnalyzer.java | 21 ++++-
 .../iotdb/db/engine/merge/recover/MergeLogger.java | 17 ++--
 .../engine/merge/selector/MergeFileSelector.java   | 70 ++++------------
 .../iotdb/db/engine/merge/task/MergeCallback.java  |  1 +
 .../{MergeSeriesTask.java => MergeChunkTask.java}  | 98 +++++++++++++---------
 .../iotdb/db/engine/merge/task/MergeFileTask.java  | 42 +++++-----
 .../iotdb/db/engine/merge/task/MergeTask.java      | 33 ++++----
 .../db/engine/merge/task/RecoverMergeTask.java     | 42 +++++-----
 .../java/org/apache/iotdb/db/utils/MergeUtils.java | 31 +++++++
 .../iotdb/tsfile/read/controller/ChunkLoader.java  |  4 +
 .../tsfile/read/controller/ChunkLoaderImpl.java    |  4 +
 13 files changed, 220 insertions(+), 164 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
index 98132c1..a04e0f2 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
@@ -33,6 +33,10 @@ import org.apache.iotdb.db.service.ServiceType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * MergeManager provides a ThreadPool to queue and run all merge tasks to restrain the total
+ * resources occupied by merge and manages a Timer to periodically issue a global merge.
+ */
 public class MergeManager implements IService {
 
   private static final Logger logger = LoggerFactory.getLogger(MergeManager.class);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
index d8cc445..7792828 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
@@ -45,12 +45,18 @@ import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 
+/**
+ * MergeResource manages files and caches of readers, writers, MeasurementSchemas and
+ * modifications to avoid unnecessary object creations and file openings.
+ */
 public class MergeResource {
 
   private List<TsFileResource> seqFiles;
   private List<TsFileResource> unseqFiles;
 
   // future feature
+  // keeping ChunkMetadata in memory avoids reading them again when we need to move unmerged
+  // chunks to the merged file, but this may consume memory considerably
   private boolean keepChunkMetadata = false;
 
   private QueryContext mergeContext = new QueryContext();
@@ -60,6 +66,7 @@ public class MergeResource {
   private Map<TsFileResource, List<Modification>> modificationCache = new HashMap<>();
   private Map<TsFileResource, MetadataQuerier> metadataQuerierCache = new HashMap<>();
   private Map<String, MeasurementSchema> measurementSchemaMap = new HashMap<>();
+  private Map<MeasurementSchema, IChunkWriter> chunkWriterCache = new HashMap<>();
 
   public MergeResource(
       List<TsFileResource> seqFiles,
@@ -77,10 +84,12 @@ public class MergeResource {
     fileWriterCache.clear();
     modificationCache.clear();
     metadataQuerierCache.clear();
+    measurementSchemaMap.clear();
+    chunkWriterCache.clear();
   }
 
-  public  MeasurementSchema getSchema(Path path) {
-    return measurementSchemaMap.get(path.getMeasurement());
+  public  MeasurementSchema getSchema(String measurementId) {
+    return measurementSchemaMap.get(measurementId);
   }
 
   public RestorableTsFileIOWriter getMergeFileWriter(TsFileResource resource) throws IOException {
@@ -117,8 +126,8 @@ public class MergeResource {
   }
 
   public IChunkWriter getChunkWriter(MeasurementSchema measurementSchema) {
-    return  new ChunkWriterImpl(measurementSchema,
-        new ChunkBuffer(measurementSchema), TSFileConfig.pageCheckSizeThreshold);
+    return chunkWriterCache.computeIfAbsent(measurementSchema,
+        k -> new ChunkWriterImpl(k, new ChunkBuffer(k), TSFileConfig.pageCheckSizeThreshold));
   }
 
   public TsFileSequenceReader getFileReader(TsFileResource tsFileResource) throws IOException {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java
index 5c7a004..19ed3d1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java
@@ -37,6 +37,10 @@ import org.apache.iotdb.tsfile.read.common.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * LogAnalyzer scans the "merge.log" file and recovers information such as files of last merge,
+ * the last available positions of each file and how many timeseries and files have been merged.
+ */
 public class LogAnalyzer {
 
   private static final Logger logger = LoggerFactory.getLogger(LogAnalyzer.class);
@@ -59,6 +63,12 @@ public class LogAnalyzer {
     this.logFile = logFile;
   }
 
+  /**
+   * Scan through the logs to find out where the last merge has stopped and store the information
+   * about the progress in the fields.
+   * @return a Status indicating the completed stage of the last merge.
+   * @throws IOException
+   */
   public Status analyze() throws IOException {
     Status status = Status.NONE;
     try (BufferedReader bufferedReader =
@@ -135,7 +145,7 @@ public class LogAnalyzer {
       }
     }
     if (logger.isDebugEnabled()) {
-      logger.debug("{} found {} seq files after {}ms", taskName, mergeUnseqFiles.size(),
+      logger.debug("{} found {} unseq files after {}ms", taskName, mergeUnseqFiles.size(),
           (System.currentTimeMillis() - startTime));
     }
     resource.setUnseqFiles(mergeUnseqFiles);
@@ -208,7 +218,14 @@ public class LogAnalyzer {
   }
 
   public enum Status {
-    NONE, FILES_LOGGED, ALL_TS_MERGED, MERGE_END
+    // almost nothing has been done
+    NONE,
+    // at least the files to be merged are known
+    FILES_LOGGED,
+    // all the timeseries have been merged (merged chunks are generated)
+    ALL_TS_MERGED,
+    // all the merge files are merged with the origin files and the job is almost done
+    MERGE_END
   }
 
   public List<Path> getUnmergedPaths() {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogger.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogger.java
index 3e38936..24bf620 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogger.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogger.java
@@ -28,17 +28,20 @@ import org.apache.iotdb.db.engine.merge.manage.MergeResource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.tsfile.read.common.Path;
 
+/**
+ * MergeLogger records the progress of a merge in file "merge.log" as text lines.
+ */
 public class MergeLogger {
 
   public static final String MERGE_LOG_NAME = "merge.log";
 
-  public static final String STR_SEQ_FILES = "seqFiles";
-  public static final String STR_UNSEQ_FILES = "unseqFiles";
-  public static final String STR_START = "start";
-  public static final String STR_END = "end";
-  public static final String STR_ALL_TS_END = "all ts end";
-  public static final String STR_MERGE_START = "merge start";
-  public static final String STR_MERGE_END = "merge end";
+  static final String STR_SEQ_FILES = "seqFiles";
+  static final String STR_UNSEQ_FILES = "unseqFiles";
+  static final String STR_START = "start";
+  static final String STR_END = "end";
+  static final String STR_ALL_TS_END = "all ts end";
+  static final String STR_MERGE_START = "merge start";
+  static final String STR_MERGE_END = "merge end";
 
   private BufferedWriter logStream;
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/selector/MergeFileSelector.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/selector/MergeFileSelector.java
index 136a4f2..b826960 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/selector/MergeFileSelector.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/selector/MergeFileSelector.java
@@ -27,10 +27,10 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.stream.Collectors;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.MergeException;
+import org.apache.iotdb.db.utils.MergeUtils;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
 import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
@@ -56,7 +56,6 @@ public class MergeFileSelector {
   private long totalCost;
   private long memoryBudget;
   private long maxSeqFileCost;
-  private long tempMaxSeqFileCost;
 
   private Map<TsFileResource, Long> fileMetaCostMap = new HashMap<>();
   private Map<TsFileResource, Long> maxSeriesCostMap = new HashMap<>();
@@ -67,9 +66,9 @@ public class MergeFileSelector {
   private Map<TsFileResource, TsFileSequenceReader> fileReaderCache = new HashMap<>();
 
   private List<Integer> tmpSelectedSeqFiles;
+  private long tempMaxSeqFileCost;
 
   private boolean[] seqSelected;
-  private boolean[] unseqSelected;
 
   public MergeFileSelector(
       List<TsFileResource> seqFiles,
@@ -86,22 +85,22 @@ public class MergeFileSelector {
    * pluses the new cost is still under the budget, accept the unseqFile and the seqFiles as
    * candidates, otherwise go to the next iteration.
    * The memory cost of a file is calculated in two ways:
-   *    The rough estimation: for a seqFile, the size of its metadata is the estimation. Since in
-   *    the worst case, the file only contains one timeseries and all its metadata will be loaded
-   *    into memory with at most one actual data chunk (which is negligible) and writing the
-   *    timeseries into a new file generate metadata of the similar size, so the size of all
+   *    The rough estimation: for a seqFile, the size of its metadata is used for estimation.
+   *    Since in the worst case, the file only contains one timeseries and all its metadata will
+   *    be loaded into memory with at most one actual data page (which is negligible) and writing
+   *    the timeseries into a new file generate metadata of the similar size, so the size of all
    *    seqFiles' metadata (generated when writing new chunks) pluses the largest one (loaded
    *    when reading a timeseries from the seqFiles) is the total estimation of all seqFiles; for
-   *    an unseqFile, since the merge reader may read all its chunks to perform a merge read, the
-   *    whole file may be loaded into memory and for the same reason of writing series, so we use
-   *    the file's length as the maximum estimation.
-   *    The tight estimation: based on the rough estimation, we scan the file's meta data to
-   *    count the number of chunks for each series, and the series which have the most chunks in
-   *    the file and its chunk proportion to shrink the rough estimation.
+   *    an unseqFile, since the merge reader may read all chunks of a series to perform a merge
+   *    read, the whole file may be loaded into memory, so we use the file's length as the
+   *    maximum estimation.
+   *    The tight estimation: based on the rough estimation, we scan the file's metadata to
+   *    count the number of chunks for each series, find the series which have the most
+   *    chunks in the file and use its chunk proportion to refine the rough estimation.
    * The rough estimation is performed first, if no candidates can be found using rough
    * estimation, we run the selection again with tight estimation.
    * @return two lists of TsFileResource, the former is selected seqFiles and the latter is
-   * selected unseqFiles or an empty array if there is no proper candidates by the budget.
+   * selected unseqFiles or an empty array if there are no proper candidates by the budget.
    * @throws MergeException
    */
   public List[] doSelect() throws MergeException {
@@ -113,7 +112,7 @@ public class MergeFileSelector {
       if (selectedUnseqFiles.isEmpty()) {
         doSelect(true);
       }
-      clean();
+      clear();
       if (selectedUnseqFiles.isEmpty()) {
         logger.info("No merge candidates are found");
         return new List[0];
@@ -130,7 +129,7 @@ public class MergeFileSelector {
     return new List[]{selectedSeqFiles, selectedUnseqFiles};
   }
 
-  private void clean() throws IOException {
+  private void clear() throws IOException {
     for (TsFileSequenceReader reader : fileReaderCache.values()) {
       reader.close();
     }
@@ -141,17 +140,12 @@ public class MergeFileSelector {
 
     tmpSelectedSeqFiles = new ArrayList<>();
     seqSelected = new boolean[seqFiles.size()];
-    unseqSelected = new boolean[unseqFiles.size()];
 
     totalCost = 0;
 
     int unseqIndex = 0;
     while (unseqIndex < unseqFiles.size()) {
       // select next unseq files
-      if (unseqSelected[unseqIndex]) {
-        unseqIndex++;
-        continue;
-      }
       TsFileResource unseqFile = unseqFiles.get(unseqIndex);
 
       selectOverlappedSeqFiles(unseqFile);
@@ -162,7 +156,6 @@ public class MergeFileSelector {
 
       if (totalCost + newCost < memoryBudget) {
         selectedUnseqFiles.add(unseqFile);
-        unseqSelected[unseqIndex] = true;
         maxSeqFileCost = tempMaxSeqFileCost;
 
         for (Integer seqIdx : tmpSelectedSeqFiles) {
@@ -185,7 +178,7 @@ public class MergeFileSelector {
         continue;
       }
       TsFileResource seqFile = seqFiles.get(i);
-      if (fileOverlap(seqFile, unseqFile)) {
+      if (MergeUtils.fileOverlap(seqFile, unseqFile)) {
         tmpSelectedSeqFiles.add(i);
       }
     }
@@ -206,7 +199,7 @@ public class MergeFileSelector {
         cost += fileCost;
         tempMaxSeqFileCost = fileCost;
       }
-      // but writing data into a new file may generate the same amount of metadata
+      // but writing data into a new file may generate the same amount of metadata in memory
       cost += fileCost;
     }
     return cost;
@@ -321,35 +314,6 @@ public class MergeFileSelector {
   }
 
 
-  private boolean fileOverlap(TsFileResource seqFile, TsFileResource unseqFile) {
-    Map<String, Long> seqStartTimes = seqFile.getStartTimeMap();
-    Map<String, Long> seqEndTimes = seqFile.getEndTimeMap();
-    Map<String, Long> unseqStartTimes = unseqFile.getStartTimeMap();
-    Map<String, Long> unseqEndTimes = unseqFile.getEndTimeMap();
-
-    for (Entry<String, Long> seqEntry : seqStartTimes.entrySet()) {
-      Long unseqStartTime = unseqStartTimes.get(seqEntry.getKey());
-      if (unseqStartTime == null) {
-        continue;
-      }
-      Long unseqEndTime = unseqEndTimes.get(seqEntry.getKey());
-      Long seqStartTime = seqEntry.getValue();
-      Long seqEndTime = seqEndTimes.get(seqEntry.getKey());
-      
-      if (intervalOverlap(seqStartTime, seqEndTime, unseqStartTime, unseqEndTime)) {
-        return true;
-      }
-    }
-    return false;
-  }
-  
-  private boolean intervalOverlap(long l1, long r1, long l2, long r2) {
-   return  (l1 <= l2 && l2 <= r1) ||
-        (l1 <= r2 && r2 <= r1) ||
-        (l2 <= l1 && l1 <= r2) ||
-        (l2 <= r1 && r1 <= r2);
-  }
-
   private TsFileSequenceReader getReader(TsFileResource tsFileResource) throws IOException {
     TsFileSequenceReader reader = fileReaderCache.get(tsFileResource);
     if (reader == null) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeCallback.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeCallback.java
index 7377a2c..c66edee 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeCallback.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeCallback.java
@@ -32,6 +32,7 @@ public interface MergeCallback {
    *    modifications have been merged into the new files.
    *    2. remove the unseqFiles since they have been merged into new files.
    *    3. remove the merge log file
+   *    4. exit merging status
    * @param seqFiles
    * @param unseqFiles
    */
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeSeriesTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeChunkTask.java
similarity index 79%
rename from iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeSeriesTask.java
rename to iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeChunkTask.java
index e6ec446..165bc3d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeSeriesTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeChunkTask.java
@@ -29,12 +29,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
 import org.apache.iotdb.db.engine.merge.manage.MergeResource;
-import org.apache.iotdb.db.utils.MergeUtils;
+import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.utils.MergeUtils;
 import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -42,8 +42,6 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.controller.ChunkLoader;
-import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
 import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
@@ -52,9 +50,16 @@ import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class MergeSeriesTask {
+/**
+ * MergeChunkTask rewrites chunks that satisfy that following conditions into the merge temp file:
+ *  1. the number of points in this chunk (size of chunk) < minChunkPointNum
+ *  2. the previous chunks are rewritten but their total size is still less than minChunkPointNum
+ *  3. the chunk is overflowed (contains the duplicated data with some unseq chunks)
+ *  4, the chunk is updated/deleted
+ */
+class MergeChunkTask {
 
-  private static final Logger logger = LoggerFactory.getLogger(MergeSeriesTask.class);
+  private static final Logger logger = LoggerFactory.getLogger(MergeChunkTask.class);
   private static int minChunkPointNum = IoTDBDescriptor.getInstance().getConfig().getChunkMergePointThreshold();
 
   private Map<TsFileResource, Integer> mergedChunkCnt;
@@ -71,7 +76,7 @@ class MergeSeriesTask {
 
   int totalChunkWritten;
 
-  MergeSeriesTask(
+  MergeChunkTask(
       Map<TsFileResource, Integer> mergedChunkCnt,
       Map<TsFileResource, Integer> unmergedChunkCnt,
       Map<TsFileResource, Map<Path, List<Long>>> unmergedChunkStartTimes,
@@ -95,7 +100,7 @@ class MergeSeriesTask {
     for (TsFileResource seqFile : resource.getSeqFiles()) {
       unmergedChunkStartTimes.put(seqFile, new HashMap<>());
     }
-    // merge each series and write data into each seqFile's temp merge file
+    // merge each series and write data into each seqFile's corresponding temp merge file
     int mergedCnt = 0;
     double progress = 0.0;
     for (Path path : unmergedSeries) {
@@ -119,7 +124,7 @@ class MergeSeriesTask {
 
   private void mergeOnePath(Path path) throws IOException {
     IPointReader unseqReader = resource.getUnseqReader(path);
-    MeasurementSchema schema = resource.getSchema(path);
+    MeasurementSchema schema = resource.getSchema(path.getMeasurement());
     try {
       if (unseqReader.hasNext()) {
         currTimeValuePair = unseqReader.next();
@@ -157,28 +162,32 @@ class MergeSeriesTask {
     }
 
     TsFileSequenceReader fileSequenceReader = resource.getFileReader(currTsFile);
-    ChunkLoader chunkLoader = new ChunkLoaderImpl(fileSequenceReader);
     List<Modification> modifications = resource.getModifications(currTsFile, path);
     List<ChunkMetaData> seqChunkMeta = resource.queryChunkMetadata(path, currTsFile);
-    if (seqChunkMeta.isEmpty()) {
+    // if the last seqFile does not contains this series but the unseqFiles do, data of this
+    // series should also be written into a new chunk
+    if (seqChunkMeta.isEmpty()
+        && !(seqFileIdx + 1 == resource.getSeqFiles().size() && currTimeValuePair != null)) {
       return;
     }
     modifyChunkMetaData(seqChunkMeta, modifications);
     RestorableTsFileIOWriter mergeFileWriter = resource.getMergeFileWriter(currTsFile);
 
     currDeviceMaxTime = currTsFile.getEndTimeMap().get(path.getDevice());
-    // merge unseq data with this file or small chunks in this file into a larger chunk
+    // merge unseq data with seq data in this file or small chunks in this file into a larger chunk
     mergeFileWriter.startChunkGroup(deviceId);
-    if (mergeChunks(seqChunkMeta, fileLimitTime, chunkLoader, measurementSchema,
+    if (mergeChunks(seqChunkMeta, fileLimitTime, fileSequenceReader, measurementSchema,
         unseqReader, mergeFileWriter, currTsFile, path)) {
-      mergeFileWriter.endChunkGroup(seqChunkMeta.get(seqChunkMeta.size() - 1).getVersion() + 1);
+      long version = !seqChunkMeta.isEmpty() ?
+          seqChunkMeta.get(seqChunkMeta.size() - 1).getVersion() + 1 : 0;
+      mergeFileWriter.endChunkGroup(version);
       mergeLogger.logFilePositionUpdate(mergeFileWriter.getFile());
     }
     currTsFile.updateTime(path.getDevice(), currDeviceMaxTime);
   }
 
   private boolean mergeChunks(List<ChunkMetaData> seqChunkMeta, long fileLimitTime,
-      ChunkLoader chunkLoader, MeasurementSchema measurementSchema,
+      TsFileSequenceReader reader, MeasurementSchema measurementSchema,
       IPointReader unseqReader, RestorableTsFileIOWriter mergeFileWriter, TsFileResource currFile
       , Path path)
       throws IOException {
@@ -189,12 +198,10 @@ class MergeSeriesTask {
     for (int i = 0; i < seqChunkMeta.size(); i++) {
       ChunkMetaData currMeta = seqChunkMeta.get(i);
       boolean isLastChunk = i + 1 == seqChunkMeta.size();
-      long chunkLimitTime = i + 1 < seqChunkMeta.size() ? seqChunkMeta.get(i + 1).getStartTime()
-          : fileLimitTime;
+      long chunkLimitTime = isLastChunk ? fileLimitTime : seqChunkMeta.get(i + 1).getStartTime();
 
-      int newPtWritten = writeChunk(chunkLimitTime, ptWritten, chunkLoader, currMeta, chunkWriter,
-          isLastChunk,
-          measurementSchema, unseqReader);
+      int newPtWritten = writeChunk(chunkLimitTime, ptWritten, reader, currMeta, chunkWriter,
+          isLastChunk, measurementSchema, unseqReader);
 
       if (newPtWritten > ptWritten) {
         mergedChunkNum ++;
@@ -210,8 +217,16 @@ class MergeSeriesTask {
         ptWritten = 0;
       }
     }
+    // this only happens when the last seqFile does not contain this series, otherwise the remaining
+    // parts will be merged with the last chunk in the seqFile
+    if (fileLimitTime == Long.MAX_VALUE && currTimeValuePair != null) {
+      ptWritten += writeRemainingUnseq(chunkWriter, measurementSchema.getType(),
+          unseqReader, fileLimitTime);
+      mergedChunkNum ++;
+    }
+
+    // the last merged chunk may still be smaller than the threshold, flush it anyway
     if (ptWritten > 0) {
-      // the last merged chunk may still be smaller than the threshold, flush it anyway
       chunkWriter.writeToFileWriter(mergeFileWriter);
     }
     updateChunkCounts(currFile, mergedChunkNum, unmergedChunkNum);
@@ -219,6 +234,20 @@ class MergeSeriesTask {
     return mergedChunkNum > 0;
   }
 
+  private int writeRemainingUnseq(IChunkWriter chunkWriter,
+      TSDataType dataType, IPointReader unseqReader, long timeLimit) throws IOException {
+    int ptWritten = 0;
+    while (currTimeValuePair != null && currTimeValuePair.getTimestamp() < timeLimit) {
+      writeTVPair(currTimeValuePair, chunkWriter, dataType);
+      if (currTimeValuePair.getTimestamp() > currDeviceMaxTime) {
+        currDeviceMaxTime = currTimeValuePair.getTimestamp();
+      }
+      ptWritten ++;
+      currTimeValuePair = unseqReader.hasNext() ? unseqReader.next() : null;
+    }
+    return ptWritten;
+  }
+
   private void updateChunkCounts(TsFileResource currFile, int newMergedChunkNum,
       int newUnmergedChunkNum) {
     mergedChunkCnt.compute(currFile, (tsFileResource, anInt) -> anInt == null ? newMergedChunkNum
@@ -227,28 +256,28 @@ class MergeSeriesTask {
         : anInt + newUnmergedChunkNum);
   }
 
-  private int writeChunk(long chunkLimitTime, int ptWritten, ChunkLoader chunkLoader,
+  private int writeChunk(long chunkLimitTime, int ptWritten, TsFileSequenceReader reader,
       ChunkMetaData currMeta, IChunkWriter chunkWriter, boolean isLastChunk,
       MeasurementSchema measurementSchema, IPointReader unseqReader) throws IOException {
 
-    // unseq data is not over and this chunk's time range cover the overflow point
-    boolean chunkOverlap =
+    // unseq data is not over and this chunk's time range covers the current overflow point
+    boolean chunkOverflowed =
         currTimeValuePair != null && currTimeValuePair.getTimestamp() < chunkLimitTime;
-    // a small chunk has been written, this chunk merge with it to create a larger chunk
-    // or this chunk is too small and it is not the last chunk, merge it with the next chunk
+    // a small chunk has been written, this chunk should be merged with it to create a larger chunk
+    // or this chunk is too small and it is not the last chunk, merge it with the next chunks
     boolean chunkTooSmall =
         ptWritten > 0 || (minChunkPointNum >= 0 && currMeta.getNumOfPoints() < minChunkPointNum && !isLastChunk);
     boolean chunkModified = currMeta.getDeletedAt() > Long.MIN_VALUE;
     int newPtWritten = ptWritten;
 
-    if (!chunkOverlap && (chunkTooSmall || chunkModified || fullMerge)) {
+    if (!chunkOverflowed && (chunkTooSmall || chunkModified || fullMerge)) {
       // just rewrite the (modified) chunk
-      Chunk chunk = chunkLoader.getChunk(currMeta);
+      Chunk chunk = reader.readMemChunk(currMeta);
       newPtWritten += MergeUtils.writeChunkWithoutUnseq(chunk, chunkWriter, measurementSchema);
       totalChunkWritten ++;
-    } else if (chunkOverlap) {
+    } else if (chunkOverflowed) {
       // this chunk may merge with the current point
-      Chunk chunk = chunkLoader.getChunk(currMeta);
+      Chunk chunk = reader.readMemChunk(currMeta);
       newPtWritten += writeChunkWithUnseq(chunk, chunkWriter, measurementSchema.getType(),
           unseqReader, chunkLimitTime);
       totalChunkWritten ++;
@@ -264,14 +293,7 @@ class MergeSeriesTask {
       BatchData batchData = chunkReader.nextBatch();
       cnt += mergeWriteBatch(batchData, chunkWriter, dataType, unseqReader);
     }
-    while (currTimeValuePair != null && currTimeValuePair.getTimestamp() < chunkLimitTime) {
-      writeTVPair(currTimeValuePair, chunkWriter, dataType);
-      if (currTimeValuePair.getTimestamp() > currDeviceMaxTime) {
-        currDeviceMaxTime = currTimeValuePair.getTimestamp();
-      }
-      currTimeValuePair = unseqReader.hasNext() ? unseqReader.next() : null;
-      cnt ++;
-    }
+    cnt += writeRemainingUnseq(chunkWriter, dataType, unseqReader, chunkLimitTime);
     return cnt;
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index af129cb..6cb36f2 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -26,8 +26,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import org.apache.commons.io.FileUtils;
-import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
 import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
 import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
@@ -35,8 +35,6 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.controller.ChunkLoader;
-import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.apache.iotdb.tsfile.write.schema.FileSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -46,6 +44,11 @@ import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * MergeFileTask merges the merge temporary files with the seqFiles, either move the merged
+ * chunks in the temp files into the seqFiles or move the unmerged chunks into the merge temp
+ * files, depending on which one is the majority.
+ */
 class MergeFileTask {
 
   private static final Logger logger = LoggerFactory.getLogger(MergeFileTask.class);
@@ -73,7 +76,7 @@ class MergeFileTask {
   }
 
   void mergeFiles() throws IOException {
-    // decide whether to write the unmerged chunks to the merge files or to move the merged data
+    // decide whether to write the unmerged chunks to the merge files or to move the merged chunks
     // back to the origin seqFile's
     if (logger.isInfoEnabled()) {
       logger.info("{} starts to merge {} files", taskName, unmergedFiles.size());
@@ -128,20 +131,19 @@ class MergeFileTask {
         // this file may already be truncated if this merge is a system reboot merge
         oldFileWriter = new RestorableTsFileIOWriter(seqFile.getFile());
       }
-
+      // filter the chunks that have been merged
       oldFileWriter.filterChunks(unmergedChunkStartTimes.get(seqFile));
 
       RestorableTsFileIOWriter newFileWriter = resource.getMergeFileWriter(seqFile);
       newFileWriter.close();
-      try ( TsFileSequenceReader newFileReader =
+      try (TsFileSequenceReader newFileReader =
           new TsFileSequenceReader(newFileWriter.getFile().getPath())) {
-        ChunkLoader chunkLoader = new ChunkLoaderImpl(newFileReader);
         List<ChunkGroupMetaData> chunkGroupMetadataList = newFileWriter.getChunkGroupMetaDatas();
         if (logger.isDebugEnabled()) {
           logger.debug("{} find {} merged chunk groups", taskName, chunkGroupMetadataList.size());
         }
         for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetadataList) {
-          writeMergedChunkGroup(chunkGroupMetaData, chunkLoader, oldFileWriter);
+          writeMergedChunkGroup(chunkGroupMetaData, newFileReader, oldFileWriter);
         }
       }
       oldFileWriter.endFile(new FileSchema(oldFileWriter.getKnownSchema()));
@@ -157,27 +159,23 @@ class MergeFileTask {
   }
 
   private void writeMergedChunkGroup(ChunkGroupMetaData chunkGroupMetaData,
-      ChunkLoader chunkLoader, TsFileIOWriter fileWriter)
+      TsFileSequenceReader reader, TsFileIOWriter fileWriter)
       throws IOException {
     fileWriter.startChunkGroup(chunkGroupMetaData.getDeviceID());
-    long maxVersion = Long.MIN_VALUE;
+    long version = chunkGroupMetaData.getVersion();
     for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
-      Path path = new Path(chunkGroupMetaData.getDeviceID(), chunkMetaData.getMeasurementUid());
-      MeasurementSchema measurementSchema = resource.getSchema(path);
+      MeasurementSchema measurementSchema = resource.getSchema(chunkMetaData.getMeasurementUid());
       IChunkWriter chunkWriter = resource.getChunkWriter(measurementSchema);
-      Chunk chunk = chunkLoader.getChunk(chunkMetaData);
+      Chunk chunk = reader.readMemChunk(chunkMetaData);
       writeChunkWithoutUnseq(chunk, chunkWriter, measurementSchema);
       chunkWriter.writeToFileWriter(fileWriter);
-      maxVersion = maxVersion < chunkMetaData.getVersion() ? chunkMetaData.getVersion() :
-          maxVersion;
     }
-    fileWriter.endChunkGroup(maxVersion + 1);
+    fileWriter.endChunkGroup(version + 1);
   }
 
   private void moveUnmergedToNew(TsFileResource seqFile) throws IOException {
     Map<Path, List<Long>> fileUnmergedChunkStartTimes = this.unmergedChunkStartTimes.get(seqFile);
     RestorableTsFileIOWriter fileWriter = resource.getMergeFileWriter(seqFile);
-    ChunkLoader chunkLoader = new ChunkLoaderImpl(resource.getFileReader(seqFile));
 
     mergeLogger.logFileMergeStart(fileWriter.getFile(), fileWriter.getFile().length());
     logger.debug("{} moving unmerged chunks of {} to the new file", taskName, seqFile);
@@ -193,7 +191,7 @@ class MergeFileTask {
         }
 
         List<ChunkMetaData> chunkMetaDataList = resource.queryChunkMetadata(path, seqFile);
-        MeasurementSchema measurementSchema = resource.getSchema(path);
+        MeasurementSchema measurementSchema = resource.getSchema(path.getMeasurement());
         IChunkWriter chunkWriter = resource.getChunkWriter(measurementSchema);
 
         if (logger.isDebugEnabled()) {
@@ -201,8 +199,8 @@ class MergeFileTask {
         }
 
         fileWriter.startChunkGroup(path.getDevice());
-        long maxVersion = writeUnmergedChunks(chunkStartTimes, chunkMetaDataList, chunkLoader,
-            chunkWriter, measurementSchema, fileWriter);
+        long maxVersion = writeUnmergedChunks(chunkStartTimes, chunkMetaDataList,
+            resource.getFileReader(seqFile), chunkWriter, measurementSchema, fileWriter);
         fileWriter.endChunkGroup(maxVersion + 1);
       }
     }
@@ -223,7 +221,7 @@ class MergeFileTask {
   }
 
   private long writeUnmergedChunks(List<Long> chunkStartTimes,
-      List<ChunkMetaData> chunkMetaDataList, ChunkLoader chunkLoader, IChunkWriter chunkWriter,
+      List<ChunkMetaData> chunkMetaDataList, TsFileSequenceReader reader, IChunkWriter chunkWriter,
       MeasurementSchema measurementSchema, RestorableTsFileIOWriter fileWriter) throws IOException {
     long maxVersion = 0;
     int chunkIdx = 0;
@@ -231,7 +229,7 @@ class MergeFileTask {
       for (; chunkIdx < chunkMetaDataList.size(); chunkIdx ++) {
         ChunkMetaData metaData = chunkMetaDataList.get(chunkIdx);
         if (metaData.getStartTime() == startTime) {
-          Chunk chunk = chunkLoader.getChunk(metaData);
+          Chunk chunk = reader.readMemChunk(metaData);
           writeChunkWithoutUnseq(chunk, chunkWriter, measurementSchema);
           chunkWriter.writeToFileWriter(fileWriter);
           maxVersion = metaData.getVersion() > maxVersion ? metaData.getVersion() : maxVersion;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
index 64dd4da..c743280 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
@@ -35,7 +35,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * MergeTask merges given SeqFiles and UnseqFiles into a new one.
+ * MergeTask merges given seqFiles and unseqFiles into a new one, which basically consists of three
+ * steps: 1. rewrite overflowed, modified or small-sized chunks into temp merge files
+ *        2. move the merged chunks in the temp files back to the seqFiles or move the unmerged
+ *        chunks in the seqFiles int temp files and replace the seqFiles with the temp files.
+ *        3. remove unseqFiles
  */
 public class MergeTask implements Callable<Void> {
 
@@ -51,9 +55,7 @@ public class MergeTask implements Callable<Void> {
   Map<TsFileResource, Map<Path, List<Long>>> unmergedChunkStartTimes = new HashMap<>();
 
   private MergeCallback callback;
-
   String taskName;
-
   boolean fullMerge;
 
   public MergeTask(List<TsFileResource> seqFiles,
@@ -72,9 +74,10 @@ public class MergeTask implements Callable<Void> {
       doMerge();
     } catch (Exception e) {
       logger.error("Runtime exception in merge {}", taskName, e);
-      resource.setSeqFiles(Collections.emptyList());
-      resource.setUnseqFiles(Collections.emptyList());
-      cleanUp(true);
+      cleanUp(false);
+      // call the callback to make sure the StorageGroup exit merging status, but passing 2
+      // empty file lists to avoid files being deleted.
+      callback.call(Collections.emptyList(), Collections.emptyList(), new File(storageGroupDir, MergeLogger.MERGE_LOG_NAME));
       throw e;
     }
     return null;
@@ -89,12 +92,13 @@ public class MergeTask implements Callable<Void> {
     long totalFileSize = MergeUtils.collectFileSizes(resource.getSeqFiles(),
         resource.getUnseqFiles());
     mergeLogger = new MergeLogger(storageGroupDir);
+
     mergeLogger.logFiles(resource);
 
     List<Path> unmergedSeries = MergeUtils.collectPaths(resource);
-    MergeSeriesTask mergeSeriesTask = new MergeSeriesTask(mergedChunkCnt, unmergedChunkCnt,
+    MergeChunkTask mergeChunkTask = new MergeChunkTask(mergedChunkCnt, unmergedChunkCnt,
         unmergedChunkStartTimes, taskName, mergeLogger, resource, fullMerge, unmergedSeries);
-    mergeSeriesTask.mergeSeries();
+    mergeChunkTask.mergeSeries();
 
     MergeFileTask mergeFileTask = new MergeFileTask(taskName,mergedChunkCnt, unmergedChunkCnt,
         unmergedChunkStartTimes, mergeLogger, resource, resource.getSeqFiles());
@@ -102,20 +106,18 @@ public class MergeTask implements Callable<Void> {
 
     cleanUp(true);
     if (logger.isInfoEnabled()) {
-      double elapsedTime = (double) (System.currentTimeMillis() - startTime);
+      double elapsedTime = (double) (System.currentTimeMillis() - startTime) / 1000.0;
       double byteRate = totalFileSize / elapsedTime / 1024 / 1024 * 1000;
       double seriesRate = unmergedSeries.size() / elapsedTime * 1000;
-      double chunkRate = mergeSeriesTask.totalChunkWritten / elapsedTime * 1000;
+      double chunkRate = mergeChunkTask.totalChunkWritten / elapsedTime * 1000;
       double fileRate =
           (resource.getSeqFiles().size() + resource.getUnseqFiles().size()) / elapsedTime * 1000;
-      logger.info("{} ends after {}ms, byteRate: {}MB/s, seriesRate {}/s, chunkRate: {}/s, "
+      logger.info("{} ends after {}s, byteRate: {}MB/s, seriesRate {}/s, chunkRate: {}/s, "
               + "fileRate: {}/s",
           taskName, elapsedTime, byteRate, seriesRate, chunkRate, fileRate);
     }
   }
 
-
-
   void cleanUp(boolean executeCallback) throws IOException {
     logger.info("{} is cleaning up", taskName);
 
@@ -134,12 +136,11 @@ public class MergeTask implements Callable<Void> {
 
     File logFile = new File(storageGroupDir, MergeLogger.MERGE_LOG_NAME);
     if (executeCallback) {
-      // make sure merge.log is not deleted before unseqFiles are cleared
+      // make sure merge.log is not deleted until unseqFiles are cleared so that when system
+      // reboots, the undeleted files can be deleted again
       callback.call(resource.getSeqFiles(), resource.getUnseqFiles(), logFile);
     } else {
       logFile.delete();
     }
   }
-
-
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
index 809be34..6c12c16 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
@@ -37,13 +37,16 @@ import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * RecoverMergeTask is an extension of MergeTask, which resumes the last merge progress by
+ * scanning merge.log using LogAnalyzer and continue the unfinished merge.
+ */
 public class RecoverMergeTask extends MergeTask {
 
   private static final Logger logger = LoggerFactory.getLogger(RecoverMergeTask.class);
 
   private LogAnalyzer analyzer;
 
-
   public RecoverMergeTask(String storageGroupDir, MergeCallback callback, String taskName,
       boolean fullMerge) throws IOException {
     super(null, null, storageGroupDir, callback, taskName, fullMerge);
@@ -87,10 +90,10 @@ public class RecoverMergeTask extends MergeTask {
 
   private void resumeAfterFilesLogged(boolean continueMerge) throws IOException {
     if (continueMerge) {
-      resumeMerge();
-      MergeSeriesTask mergeSeriesTask = new MergeSeriesTask(mergedChunkCnt, unmergedChunkCnt,
+      resumeMergeProgress();
+      MergeChunkTask mergeChunkTask = new MergeChunkTask(mergedChunkCnt, unmergedChunkCnt,
           unmergedChunkStartTimes, taskName, mergeLogger, resource, fullMerge, analyzer.getUnmergedPaths());
-      mergeSeriesTask.mergeSeries();
+      mergeChunkTask.mergeSeries();
 
       MergeFileTask mergeFileTask = new MergeFileTask(taskName,mergedChunkCnt, unmergedChunkCnt,
           unmergedChunkStartTimes, mergeLogger, resource, resource.getSeqFiles());
@@ -101,26 +104,25 @@ public class RecoverMergeTask extends MergeTask {
 
   private void resumeAfterAllTsMerged(boolean continueMerge) throws IOException {
     if (continueMerge) {
-      resumeMerge();
+      resumeMergeProgress();
       MergeFileTask mergeFileTask = new MergeFileTask(taskName,mergedChunkCnt, unmergedChunkCnt,
           unmergedChunkStartTimes, mergeLogger, resource, analyzer.getUnmergedFiles());
       mergeFileTask.mergeFiles();
     } else {
-      // NOTICE: although some of the seqFiles may have been truncated, later TsFile recovery
-      // will recover them, so they are not a concern here
+      // NOTICE: although some of the seqFiles may have been truncated in last merge, we do not
+      // recover them here because later TsFile recovery will recover them
       truncateFiles();
     }
     cleanUp(continueMerge);
   }
 
-  private void resumeMerge() throws IOException {
+  private void resumeMergeProgress() throws IOException {
     mergeLogger = new MergeLogger(storageGroupDir);
     truncateFiles();
     recoverChunkCounts();
   }
 
-
-  // scan metadata to compute how many chunks are merged/unmerged so at last we can decide to
+  // scan the metadata to compute how many chunks are merged/unmerged so at last we can decide to
   // move the merged chunks or the unmerged chunks
   private void recoverChunkCounts() throws IOException {
     logger.debug("{} recovering chunk counts", taskName);
@@ -143,25 +145,25 @@ public class RecoverMergeTask extends MergeTask {
         mergeFileWriter.getVisibleMetadataList(path.getDevice(), path.getMeasurement(), null);
     mergedChunkCnt.compute(tsFileResource, (k, v) -> v == null ? mergeFileChunks.size() :
         v + mergeFileChunks.size());
-    int seqIndex = 0;
-    int mergeIndex = 0;
+    int seqChunkIndex = 0;
+    int mergeChunkIndex = 0;
     int unmergedCnt = 0;
-    while (seqIndex < seqFileChunks.size() && mergeIndex < mergeFileChunks.size()) {
-      ChunkMetaData seqChunk = seqFileChunks.get(seqIndex);
-      ChunkMetaData mergedChunk = mergeFileChunks.get(mergeIndex);
+    while (seqChunkIndex < seqFileChunks.size() && mergeChunkIndex < mergeFileChunks.size()) {
+      ChunkMetaData seqChunk = seqFileChunks.get(seqChunkIndex);
+      ChunkMetaData mergedChunk = mergeFileChunks.get(mergeChunkIndex);
       if (seqChunk.getStartTime() < mergedChunk.getStartTime()) {
         // this seqChunk is unmerged
         unmergedCnt ++;
-        seqIndex ++;
+        seqChunkIndex ++;
         unmergedChunkStartTimes.get(tsFileResource).get(path).add(seqChunk.getStartTime());
       } else if (mergedChunk.getStartTime() <= seqChunk.getStartTime() &&
           seqChunk.getStartTime() <= mergedChunk.getEndTime()) {
         // this seqChunk is merged
-        seqIndex ++;
+        seqChunkIndex ++;
       } else {
         // seqChunk.startTime > mergeChunk.endTime, find next mergedChunk that may cover the
         // seqChunk
-        mergeIndex ++;
+        mergeChunkIndex ++;
       }
     }
     int finalUnmergedCnt = unmergedCnt;
@@ -183,8 +185,4 @@ public class RecoverMergeTask extends MergeTask {
       }
     }
   }
-
-
-
-
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
index 9a3a830..e844c01 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
@@ -24,6 +24,8 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import org.apache.iotdb.db.engine.merge.manage.MergeResource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -150,4 +152,33 @@ public class MergeUtils {
     }
     return chunk.getHeader().getNumOfPages();
   }
+
+  public static boolean fileOverlap(TsFileResource seqFile, TsFileResource unseqFile) {
+    Map<String, Long> seqStartTimes = seqFile.getStartTimeMap();
+    Map<String, Long> seqEndTimes = seqFile.getEndTimeMap();
+    Map<String, Long> unseqStartTimes = unseqFile.getStartTimeMap();
+    Map<String, Long> unseqEndTimes = unseqFile.getEndTimeMap();
+
+    for (Entry<String, Long> seqEntry : seqStartTimes.entrySet()) {
+      Long unseqStartTime = unseqStartTimes.get(seqEntry.getKey());
+      if (unseqStartTime == null) {
+        continue;
+      }
+      Long unseqEndTime = unseqEndTimes.get(seqEntry.getKey());
+      Long seqStartTime = seqEntry.getValue();
+      Long seqEndTime = seqEndTimes.get(seqEntry.getKey());
+
+      if (intervalOverlap(seqStartTime, seqEndTime, unseqStartTime, unseqEndTime)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private static boolean intervalOverlap(long l1, long r1, long l2, long r2) {
+   return  (l1 <= l2 && l2 <= r1) ||
+        (l1 <= r2 && r2 <= r1) ||
+        (l2 <= l1 && l1 <= r2) ||
+        (l2 <= r1 && r1 <= r2);
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/ChunkLoader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/ChunkLoader.java
index 78e561e..e7d9c70 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/ChunkLoader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/ChunkLoader.java
@@ -34,4 +34,8 @@ public interface ChunkLoader {
    */
   void close() throws IOException;
 
+  /**
+   * clear Chunk cache if used.
+   */
+  void clear();
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/ChunkLoaderImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/ChunkLoaderImpl.java
index 4907634..ae16a97 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/ChunkLoaderImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/ChunkLoaderImpl.java
@@ -69,4 +69,8 @@ public class ChunkLoaderImpl implements ChunkLoader {
     reader.close();
   }
 
+  @Override
+  public void clear() {
+    chunkCache.clear();
+  }
 }