You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/10/25 03:16:54 UTC

[incubator-iotdb] branch dev_new_merge_strategy updated: add revocery in squeeze strategy

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev_new_merge_strategy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/dev_new_merge_strategy by this push:
     new 390bd3f  add revocery in squeeze strategy
390bd3f is described below

commit 390bd3f743f3740ba1fed8904ca11b16d329fc80
Author: jt <jt...@163.com>
AuthorDate: Fri Oct 25 11:16:31 2019 +0800

    add revocery in squeeze strategy
---
 .../engine/merge/inplace/recover/LogAnalyzer.java  |   6 +-
 .../engine/merge/inplace/recover/MergeLogger.java  |   2 +-
 .../task/{MergeTask.java => InplaceMergeTask.java} |  10 +-
 ...MergeTask.java => RecoverInplaceMergeTask.java} |  17 +-
 .../iotdb/db/engine/merge/manage/MergeManager.java |   4 +-
 .../db/engine/merge/manage/MergeResource.java      |   2 +-
 .../engine/merge/squeeze/recover/LogAnalyzer.java  | 185 +++------------------
 .../engine/merge/squeeze/recover/MergeLogger.java  |  52 +-----
 .../engine/merge/squeeze/task/MergeSeriesTask.java |  14 +-
 .../squeeze/task/RecoverSqueezeMergeTask.java      | 100 +++++++++++
 .../merge/squeeze/task/SqueezeMergeTask.java       |  30 ++--
 .../engine/storagegroup/StorageGroupProcessor.java |  29 +++-
 .../apache/iotdb/db/engine/merge/MergeLogTest.java |   6 +-
 .../iotdb/db/engine/merge/MergePerfTest.java       |   6 +-
 .../iotdb/db/engine/merge/MergeTaskTest.java       |  42 ++---
 15 files changed, 211 insertions(+), 294 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/LogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/LogAnalyzer.java
index 9b1f957..73ba34e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/LogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/LogAnalyzer.java
@@ -30,7 +30,7 @@ import java.util.Map.Entry;
 
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.merge.manage.MergeResource;
-import org.apache.iotdb.db.engine.merge.inplace.task.MergeTask;
+import org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.MetadataErrorException;
 import org.apache.iotdb.db.metadata.MManager;
@@ -176,7 +176,7 @@ public class LogAnalyzer {
 
     status = Status.MERGE_START;
     for (TsFileResource seqFile : resource.getSeqFiles()) {
-      File mergeFile = SystemFileFactory.INSTANCE.getFile(seqFile.getFile().getPath() + MergeTask.MERGE_SUFFIX);
+      File mergeFile = SystemFileFactory.INSTANCE.getFile(seqFile.getFile().getPath() + InplaceMergeTask.MERGE_SUFFIX);
       fileLastPositions.put(mergeFile, 0L);
     }
 
@@ -238,7 +238,7 @@ public class LogAnalyzer {
         fileLastPositions.put(currFile, lastPost);
       } else {
         fileLastPositions.remove(currFile);
-        String seqFilePath = currFile.getAbsolutePath().replace(MergeTask.MERGE_SUFFIX, "");
+        String seqFilePath = currFile.getAbsolutePath().replace(InplaceMergeTask.MERGE_SUFFIX, "");
         Iterator<TsFileResource> unmergedFileIter = unmergedFiles.iterator();
         while (unmergedFileIter.hasNext()) {
           TsFileResource seqFile = unmergedFileIter.next();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/MergeLogger.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/MergeLogger.java
index 24f6244..d4a6b51 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/MergeLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/recover/MergeLogger.java
@@ -34,7 +34,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
  */
 public class MergeLogger {
 
-  public static final String MERGE_LOG_NAME = "merge.log";
+  public static final String MERGE_LOG_NAME = "merge.log.inplace";
 
   static final String STR_SEQ_FILES = "seqFiles";
   static final String STR_UNSEQ_FILES = "unseqFiles";
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/MergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/InplaceMergeTask.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/MergeTask.java
rename to server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/InplaceMergeTask.java
index c993cd6..11ad188 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/MergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/InplaceMergeTask.java
@@ -47,10 +47,10 @@ import org.slf4j.LoggerFactory;
  *        chunks in the seqFiles into temp files and replace the seqFiles with the temp files.
  *        3. remove unseqFiles
  */
-public class MergeTask implements Callable<Void> {
+public class InplaceMergeTask implements Callable<Void> {
 
-  public static final String MERGE_SUFFIX = ".merge";
-  private static final Logger logger = LoggerFactory.getLogger(MergeTask.class);
+  public static final String MERGE_SUFFIX = ".merge.inplace";
+  private static final Logger logger = LoggerFactory.getLogger(InplaceMergeTask.class);
 
   MergeResource resource;
   String storageGroupSysDir;
@@ -63,7 +63,7 @@ public class MergeTask implements Callable<Void> {
   String taskName;
   boolean fullMerge;
 
-  MergeTask(List<TsFileResource> seqFiles,
+  InplaceMergeTask(List<TsFileResource> seqFiles,
       List<TsFileResource> unseqFiles, String storageGroupSysDir, MergeCallback callback,
       String taskName, boolean fullMerge, String storageGroupName) {
     this.resource = new MergeResource(seqFiles, unseqFiles);
@@ -75,7 +75,7 @@ public class MergeTask implements Callable<Void> {
     this.storageGroupName = storageGroupName;
   }
 
-  public MergeTask(MergeResource mergeResource, String storageGroupSysDir, MergeCallback callback,
+  public InplaceMergeTask(MergeResource mergeResource, String storageGroupSysDir, MergeCallback callback,
       String taskName, boolean fullMerge, int concurrentMergeSeriesNum, String storageGroupName) {
     this.resource = mergeResource;
     this.storageGroupSysDir = storageGroupSysDir;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/RecoverMergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/RecoverInplaceMergeTask.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/RecoverMergeTask.java
rename to server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/RecoverInplaceMergeTask.java
index 93d0ae2..2c3a928 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/RecoverMergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/inplace/task/RecoverInplaceMergeTask.java
@@ -47,13 +47,13 @@ import org.slf4j.LoggerFactory;
  * RecoverMergeTask is an extension of MergeTask, which resumes the last merge progress by
  * scanning merge.log using LogAnalyzer and continue the unfinished merge.
  */
-public class RecoverMergeTask extends MergeTask {
+public class RecoverInplaceMergeTask extends InplaceMergeTask {
 
-  private static final Logger logger = LoggerFactory.getLogger(RecoverMergeTask.class);
+  private static final Logger logger = LoggerFactory.getLogger(RecoverInplaceMergeTask.class);
 
   private LogAnalyzer analyzer;
 
-  public RecoverMergeTask(List<TsFileResource> seqFiles,
+  public RecoverInplaceMergeTask(List<TsFileResource> seqFiles,
       List<TsFileResource> unseqFiles, String storageGroupSysDir,
       MergeCallback callback, String taskName,
       boolean fullMerge, String storageGroupName) {
@@ -160,9 +160,8 @@ public class RecoverMergeTask extends MergeTask {
       long maxChunkNum = chunkNums[1];
       long fileMetaSize = MergeUtils.getFileMetaSize(seqFile, resource.getFileReader(seqFile));
       long newSingleSeriesSeqReadCost =  fileMetaSize * maxChunkNum / totalChunkNum;
-      singleSeriesSeqReadCost = newSingleSeriesSeqReadCost > singleSeriesSeqReadCost ?
-          newSingleSeriesSeqReadCost : singleSeriesSeqReadCost;
-      maxSeqReadCost = fileMetaSize > maxSeqReadCost ? fileMetaSize : maxSeqReadCost;
+      singleSeriesSeqReadCost = Math.max(newSingleSeriesSeqReadCost, singleSeriesSeqReadCost);
+      maxSeqReadCost = Math.max(fileMetaSize, maxSeqReadCost);
       seqWriteCost += fileMetaSize;
     }
 
@@ -171,10 +170,8 @@ public class RecoverMergeTask extends MergeTask {
     int ub = MaxSeriesMergeFileSelector.MAX_SERIES_NUM;
     int mid = (lb + ub) / 2;
     while (mid != lb) {
-      long unseqCost = singleSeriesUnseqCost * mid < maxUnseqCost ? singleSeriesUnseqCost * mid :
-          maxUnseqCost;
-      long seqReadCos = singleSeriesSeqReadCost * mid < maxSeqReadCost ?
-          singleSeriesSeqReadCost * mid : maxSeqReadCost;
+      long unseqCost = Math.min(singleSeriesUnseqCost * mid, maxUnseqCost);
+      long seqReadCos = Math.min(singleSeriesSeqReadCost * mid, maxSeqReadCost);
       long totalCost = unseqCost + seqReadCos + seqWriteCost;
       if (totalCost <= memBudget) {
         lb = mid;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
index a6d8432..93223f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
@@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.merge.inplace.task.MergeTask;
+import org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.ServiceType;
@@ -56,7 +56,7 @@ public class MergeManager implements IService {
     return INSTANCE;
   }
 
-  public void submitMainTask(MergeTask mergeTask) {
+  public void submitMainTask(InplaceMergeTask mergeTask) {
     mergeTaskPool.submit(mergeTask);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
index 7372b44..d0cbbd4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
@@ -46,7 +46,7 @@ import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
-import static org.apache.iotdb.db.engine.merge.inplace.task.MergeTask.MERGE_SUFFIX;
+import static org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask.MERGE_SUFFIX;
 
 /**
  * MergeResource manages files and caches of readers, writers, MeasurementSchemas and
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/recover/LogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/recover/LogAnalyzer.java
index 04ffb6d..0354513 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/recover/LogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/recover/LogAnalyzer.java
@@ -19,53 +19,35 @@
 
 package org.apache.iotdb.db.engine.merge.squeeze.recover;
 
-import static org.apache.iotdb.db.engine.merge.inplace.recover.MergeLogger.STR_ALL_TS_END;
-import static org.apache.iotdb.db.engine.merge.inplace.recover.MergeLogger.STR_END;
-import static org.apache.iotdb.db.engine.merge.inplace.recover.MergeLogger.STR_MERGE_END;
-import static org.apache.iotdb.db.engine.merge.inplace.recover.MergeLogger.STR_MERGE_START;
-import static org.apache.iotdb.db.engine.merge.inplace.recover.MergeLogger.STR_SEQ_FILES;
-import static org.apache.iotdb.db.engine.merge.inplace.recover.MergeLogger.STR_START;
-import static org.apache.iotdb.db.engine.merge.inplace.recover.MergeLogger.STR_TIMESERIES;
-import static org.apache.iotdb.db.engine.merge.inplace.recover.MergeLogger.STR_UNSEQ_FILES;
+
+import static org.apache.iotdb.db.engine.merge.squeeze.recover.MergeLogger.STR_ALL_TS_END;
+import static org.apache.iotdb.db.engine.merge.squeeze.recover.MergeLogger.STR_SEQ_FILES;
+import static org.apache.iotdb.db.engine.merge.squeeze.recover.MergeLogger.STR_UNSEQ_FILES;
 
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
-import org.apache.iotdb.db.engine.merge.inplace.task.MergeTask;
 import org.apache.iotdb.db.engine.merge.manage.MergeResource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.MetadataErrorException;
-import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * LogAnalyzer scans the "merge.log" file and recovers information such as files of last merge,
- * the last available positions of each file and how many timeseries and files have been merged.
- * An example of merging 1 seqFile and 1 unseqFile containing 3 series is:
+ * whether the new file is generated.
+ * An example of merging 1 seqFile and 1 unseqFile is:
  * seqFiles
  * server/0seq.tsfile
  * unseqFiles
  * server/0unseq.tsfile
  * merge start
- * start root.mergeTest.device0.sensor0
- * server/0seq.tsfile.merge 338
- * end
- * start root.mergeTest.device0.sensor1
- * server/0seq.tsfile.merge 664
- * end
  * all ts end
- * server/0seq.tsfile 145462
- * end
+ * server/0seq.tsfile.merge
  * merge end
  */
 public class LogAnalyzer {
@@ -76,23 +58,16 @@ public class LogAnalyzer {
   private MergeResource resource;
   private String taskName;
   private File logFile;
-  private String storageGroupName;
-
-  private Map<File, Long> fileLastPositions = new HashMap<>();
-  private Map<File, Long> tempFileLastPositions = new HashMap<>();
 
-  private List<Path> mergedPaths = new ArrayList<>();
-  private List<Path> unmergedPaths;
-  private List<TsFileResource> unmergedFiles;
   private String currLine;
 
+  private TsFileResource newResource;
   private Status status;
 
-  public LogAnalyzer(MergeResource resource, String taskName, File logFile, String storageGroupName) {
+  public LogAnalyzer(MergeResource resource, String taskName, File logFile) {
     this.resource = resource;
     this.taskName = taskName;
     this.logFile = logFile;
-    this.storageGroupName = storageGroupName;
   }
 
   /**
@@ -101,7 +76,7 @@ public class LogAnalyzer {
    * @return a Status indicating the completed stage of the last merge.
    * @throws IOException
    */
-  public Status analyze() throws IOException, MetadataErrorException {
+  public Status analyze() throws IOException {
     status = Status.NONE;
     try (BufferedReader bufferedReader = new BufferedReader(new FileReader(logFile))) {
       currLine = bufferedReader.readLine();
@@ -110,15 +85,7 @@ public class LogAnalyzer {
 
         analyzeUnseqFiles(bufferedReader);
 
-        List<String> storageGroupPaths = MManager.getInstance().getPaths(storageGroupName + ".*");
-        unmergedPaths = new ArrayList<>();
-        for (String path : storageGroupPaths) {
-          unmergedPaths.add(new Path(path));
-        }
-
-        analyzeMergedSeries(bufferedReader, unmergedPaths);
-
-        analyzeMergedFiles(bufferedReader);
+        analyzeMergedFile(bufferedReader);
       }
     }
     return status;
@@ -159,7 +126,7 @@ public class LogAnalyzer {
     long startTime = System.currentTimeMillis();
     List<TsFileResource> mergeUnseqFiles = new ArrayList<>();
     while ((currLine = bufferedReader.readLine()) != null) {
-      if (currLine.equals(STR_TIMESERIES)) {
+      if (currLine.equals(STR_ALL_TS_END)) {
         break;
       }
       Iterator<TsFileResource> iterator = resource.getUnseqFiles().iterator();
@@ -180,124 +147,28 @@ public class LogAnalyzer {
     resource.setUnseqFiles(mergeUnseqFiles);
   }
 
-  private void analyzeMergedSeries(BufferedReader bufferedReader, List<Path> unmergedPaths) throws IOException {
-    if (!STR_MERGE_START.equals(currLine)) {
-      return;
-    }
-
-    status = Status.MERGE_START;
-    for (TsFileResource seqFile : resource.getSeqFiles()) {
-      File mergeFile = SystemFileFactory.INSTANCE.getFile(seqFile.getFile().getPath() + MergeTask.MERGE_SUFFIX);
-      fileLastPositions.put(mergeFile, 0L);
-    }
-
-    List<Path> currTSList = new ArrayList<>();
-    long startTime = System.currentTimeMillis();
-    while ((currLine = bufferedReader.readLine()) != null) {
-      if (STR_ALL_TS_END.equals(currLine)) {
-        break;
-      }
-      if (currLine.contains(STR_START)) {
-        // a TS starts to merge
-        String[] splits = currLine.split(" ");
-        for (int i = 1; i < splits.length; i ++) {
-          currTSList.add(new Path(splits[i]));
-        }
-        tempFileLastPositions.clear();
-      } else if (!currLine.contains(STR_END)) {
-        // file position
-        String[] splits = currLine.split(" ");
-        File file = SystemFileFactory.INSTANCE.getFile(splits[0]);
-        Long position = Long.parseLong(splits[1]);
-        tempFileLastPositions.put(file, position);
-      } else {
-        // a TS ends merging
-        unmergedPaths.removeAll(currTSList);
-        for (Entry<File, Long> entry : tempFileLastPositions.entrySet()) {
-          fileLastPositions.put(entry.getKey(), entry.getValue());
-        }
-        mergedPaths.addAll(currTSList);
-      }
-    }
-    tempFileLastPositions = null;
-    if (logger.isDebugEnabled()) {
-      logger.debug("{} found {} series have already been merged after {}ms", taskName,
-          mergedPaths.size(), (System.currentTimeMillis() - startTime));
-    }
-  }
 
-  private void analyzeMergedFiles(BufferedReader bufferedReader) throws IOException {
+  private void analyzeMergedFile(BufferedReader bufferedReader) throws IOException {
     if (!STR_ALL_TS_END.equals(currLine)) {
       return;
     }
 
-    status = Status.ALL_TS_MERGED;
-    unmergedFiles = resource.getSeqFiles();
+    currLine = bufferedReader.readLine();
+    if (currLine != null) {
+      status = Status.ALL_TS_MERGED;
+      File newFile = FSFactoryProducer.getFSFactory().getFile(currLine);
+      newResource = new TsFileResource(newFile);
+      newResource.deSerialize();
 
-    File currFile = null;
-    long startTime = System.currentTimeMillis();
-    int mergedCnt = 0;
-    while ((currLine = bufferedReader.readLine()) != null) {
-      if (STR_MERGE_END.equals(currLine)) {
-        status = Status.MERGE_END;
-        break;
-      }
-      if (!currLine.contains(STR_END)) {
-        String[] splits = currLine.split(" ");
-        currFile = SystemFileFactory.INSTANCE.getFile(splits[0]);
-        Long lastPost = Long.parseLong(splits[1]);
-        fileLastPositions.put(currFile, lastPost);
-      } else {
-        fileLastPositions.remove(currFile);
-        String seqFilePath = currFile.getAbsolutePath().replace(MergeTask.MERGE_SUFFIX, "");
-        Iterator<TsFileResource> unmergedFileIter = unmergedFiles.iterator();
-        while (unmergedFileIter.hasNext()) {
-          TsFileResource seqFile = unmergedFileIter.next();
-          if (seqFile.getFile().getAbsolutePath().equals(seqFilePath)) {
-            mergedCnt ++;
-            unmergedFileIter.remove();
-            break;
-          }
-        }
+      if (logger.isDebugEnabled()) {
+        logger.debug("{} found files have already been merged into {}", taskName,
+            newFile.getPath());
       }
     }
-    if (logger.isDebugEnabled()) {
-      logger.debug("{} found {} files have already been merged after {}ms", taskName,
-          mergedCnt, (System.currentTimeMillis() - startTime));
-    }
-  }
-
-  public List<Path> getUnmergedPaths() {
-    return unmergedPaths;
-  }
-
-  public void setUnmergedPaths(List<Path> unmergedPaths) {
-    this.unmergedPaths = unmergedPaths;
-  }
-
-  public List<TsFileResource> getUnmergedFiles() {
-    return unmergedFiles;
-  }
-
-  public void setUnmergedFiles(
-      List<TsFileResource> unmergedFiles) {
-    this.unmergedFiles = unmergedFiles;
-  }
-
-  public List<Path> getMergedPaths() {
-    return mergedPaths;
-  }
-
-  public void setMergedPaths(List<Path> mergedPaths) {
-    this.mergedPaths = mergedPaths;
-  }
-
-  public Map<File, Long> getFileLastPositions() {
-    return fileLastPositions;
   }
 
-  public void setFileLastPositions(Map<File, Long> fileLastPositions) {
-    this.fileLastPositions = fileLastPositions;
+  public TsFileResource getNewResource() {
+    return newResource;
   }
 
   public enum Status {
@@ -305,9 +176,7 @@ public class LogAnalyzer {
     NONE,
     // at least the files and timeseries to be merged are known
     MERGE_START,
-    // all the timeseries have been merged(merged chunks are generated)
-    ALL_TS_MERGED,
-    // all the merge files are merged with the origin files and the task is almost done
-    MERGE_END
+    // all the timeseries have been merged(new file is generated)
+    ALL_TS_MERGED
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/recover/MergeLogger.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/recover/MergeLogger.java
index b707dec..1f99557 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/recover/MergeLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/recover/MergeLogger.java
@@ -34,16 +34,11 @@ import org.apache.iotdb.tsfile.read.common.Path;
  */
 public class MergeLogger {
 
-  public static final String MERGE_LOG_NAME = "merge.log";
+  public static final String MERGE_LOG_NAME = "merge.log.squeeze";
 
   static final String STR_SEQ_FILES = "seqFiles";
   static final String STR_UNSEQ_FILES = "unseqFiles";
-  static final String STR_TIMESERIES = "timeseries";
-  static final String STR_START = "start";
-  static final String STR_END = "end";
   static final String STR_ALL_TS_END = "all ts end";
-  static final String STR_MERGE_START = "merge start";
-  static final String STR_MERGE_END = "merge end";
 
   private BufferedWriter logStream;
 
@@ -56,51 +51,12 @@ public class MergeLogger {
     logStream.close();
   }
 
-  public void logTSStart(List<Path> paths) throws IOException {
-    logStream.write(STR_START);
-    for (Path path : paths) {
-      logStream.write(" " + path.getFullPath());
-    }
-    logStream.newLine();
-    logStream.flush();
-  }
-
-  public void logFilePosition(File file) throws IOException {
-    logStream.write(String.format("%s %d", file.getAbsolutePath(), file.length()));
-    logStream.newLine();
-    logStream.flush();
-  }
-
-  public void logTSEnd() throws IOException {
-    logStream.write(STR_END);
-    logStream.newLine();
-    logStream.flush();
-  }
-
   public void logAllTsEnd() throws IOException {
     logStream.write(STR_ALL_TS_END);
     logStream.newLine();
     logStream.flush();
   }
 
-  public void logFileMergeStart(File file, long position) throws IOException {
-    logStream.write(String.format("%s %d", file.getAbsolutePath(), position));
-    logStream.newLine();
-    logStream.flush();
-  }
-
-  public void logFileMergeEnd() throws IOException {
-    logStream.write(STR_END);
-    logStream.newLine();
-    logStream.flush();
-  }
-
-  public void logMergeEnd() throws IOException {
-    logStream.write(STR_MERGE_END);
-    logStream.newLine();
-    logStream.flush();
-  }
-
   public void logFiles(MergeResource resource) throws IOException {
     logSeqFiles(resource.getSeqFiles());
     logUnseqFiles(resource.getUnseqFiles());
@@ -126,12 +82,6 @@ public class MergeLogger {
     logStream.flush();
   }
 
-  public void logMergeStart() throws IOException {
-    logStream.write(STR_MERGE_START);
-    logStream.newLine();
-    logStream.flush();
-  }
-
   public void logNewFile(TsFileResource resource) throws IOException {
     logStream.write(resource.getFile().getAbsolutePath());
     logStream.newLine();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/MergeSeriesTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/MergeSeriesTask.java
index 89796f8..ed14c6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/MergeSeriesTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/MergeSeriesTask.java
@@ -131,7 +131,6 @@ class MergeSeriesTask {
       logger.info("{} all series are merged after {}ms", taskName,
           System.currentTimeMillis() - startTime);
     }
-    mergeLogger.logAllTsEnd();
 
     return newResource;
   }
@@ -186,15 +185,10 @@ class MergeSeriesTask {
 
   private void createNewFileWriter() throws IOException {
     // use the minimum version as the version of the new file
-    long currFileVersion = Long.MAX_VALUE;
-    File parent = null;
-    for (TsFileResource seqFile : resource.getSeqFiles()) {
-      long fileVersion =
-          Long.parseLong(seqFile.getFile().getName()
-              .replace(TSFILE_SUFFIX, "").split(TSFILE_SEPARATOR)[1]);
-      currFileVersion = Math.min(currFileVersion, fileVersion);
-      parent = parent == null ? seqFile.getFile().getParentFile() : parent;
-    }
+    long currFileVersion =
+        Long.parseLong(
+            resource.getSeqFiles().get(0).getFile().getName().replace(TSFILE_SUFFIX, "").split(TSFILE_SEPARATOR)[1]);
+    File parent = resource.getSeqFiles().get(0).getFile().getParentFile();
     File newFile = FSFactoryProducer.getFSFactory().getFile(parent,
         System.currentTimeMillis() + TSFILE_SEPARATOR + currFileVersion + TSFILE_SUFFIX + MERGE_SUFFIX);
     newFileWriter = new RestorableTsFileIOWriter(newFile);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/RecoverSqueezeMergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/RecoverSqueezeMergeTask.java
new file mode 100644
index 0000000..c32ffb6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/RecoverSqueezeMergeTask.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.squeeze.task;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.merge.MergeCallback;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.squeeze.recover.LogAnalyzer;
+import org.apache.iotdb.db.engine.merge.squeeze.recover.LogAnalyzer.Status;
+import org.apache.iotdb.db.engine.merge.squeeze.recover.MergeLogger;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RecoverMergeTask is an extension of MergeTask, which resumes the last merge progress by
+ * scanning merge.log using LogAnalyzer and continue the unfinished merge.
+ */
+public class RecoverSqueezeMergeTask extends SqueezeMergeTask {
+
+  private static final Logger logger = LoggerFactory.getLogger(RecoverSqueezeMergeTask.class);
+
+  public RecoverSqueezeMergeTask(List<TsFileResource> seqFiles,
+      List<TsFileResource> unseqFiles, String storageGroupSysDir,
+      MergeCallback callback, String taskName, String storageGroupName) {
+    super(new MergeResource(seqFiles, unseqFiles), storageGroupSysDir, callback, taskName,
+        1, storageGroupName);
+  }
+
+  public void recoverMerge() throws IOException {
+    File logFile = SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, MergeLogger.MERGE_LOG_NAME);
+    if (!logFile.exists()) {
+      logger.info("{} no merge.log, merge recovery ends", taskName);
+      return;
+    }
+    long startTime = System.currentTimeMillis();
+
+    LogAnalyzer analyzer = new LogAnalyzer(resource, taskName, logFile);
+    Status status = analyzer.analyze();
+    if (logger.isInfoEnabled()) {
+      logger.info("{} merge recovery status determined: {} after {}ms", taskName, status,
+          (System.currentTimeMillis() - startTime));
+    }
+    switch (status) {
+      case NONE:
+        logFile.delete();
+        break;
+      case MERGE_START:
+        removeMergedFile();
+        // set the files to empty to let the StorageGroupProcessor do a clean up
+        resource.setSeqFiles(Collections.emptyList());
+        resource.setUnseqFiles(Collections.emptyList());
+        cleanUp(true);
+        break;
+      case ALL_TS_MERGED:
+        newResource = analyzer.getNewResource();
+        cleanUp(true);
+        break;
+      default:
+        throw new UnsupportedOperationException(taskName + " found unrecognized status " + status);
+    }
+    if (logger.isInfoEnabled()) {
+      logger.info("{} merge recovery ends after {}ms", taskName,
+          (System.currentTimeMillis() - startTime));
+    }
+  }
+
+  private void removeMergedFile() {
+    File sgDir =
+        FSFactoryProducer.getFSFactory().getFile(resource.getSeqFiles().get(0).getFile().getParent());
+    File[] mergeFiles = sgDir.listFiles(file -> file.getName().endsWith(MERGE_SUFFIX));
+    if (mergeFiles != null) {
+      for  (File file : mergeFiles) {
+        file.delete();
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/SqueezeMergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/SqueezeMergeTask.java
index 4e6ac3d..96c4861 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/SqueezeMergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/task/SqueezeMergeTask.java
@@ -23,20 +23,20 @@ import org.slf4j.LoggerFactory;
 
 public class SqueezeMergeTask implements Callable<Void> {
 
-  static final String MERGE_SUFFIX = ".merge";
+  static final String MERGE_SUFFIX = ".merge.squeeze";
   private static final Logger logger = LoggerFactory.getLogger(SqueezeMergeTask.class);
 
-  private MergeResource resource;
-  private String storageGroupSysDir;
-  private String storageGroupName;
+  MergeResource resource;
+  String storageGroupSysDir;
+  String storageGroupName;
   private MergeLogger mergeLogger;
   private MergeContext mergeContext = new MergeContext();
 
-  private MergeCallback callback;
-  private int concurrentMergeSeriesNum;
-  private String taskName;
+  MergeCallback callback;
+  int concurrentMergeSeriesNum;
+  String taskName;
 
-  private TsFileResource newResource;
+  TsFileResource newResource;
 
   public SqueezeMergeTask(MergeResource mergeResource, String storageGroupSysDir, MergeCallback callback,
       String taskName, int concurrentMergeSeriesNum, String storageGroupName) {
@@ -87,8 +87,6 @@ public class SqueezeMergeTask implements Callable<Void> {
       unmergedSeries.add(new Path(path));
     }
 
-    mergeLogger.logMergeStart();
-
     MergeSeriesTask mergeChunkTask = new MergeSeriesTask(mergeContext, taskName, mergeLogger, resource
         ,unmergedSeries, concurrentMergeSeriesNum);
     newResource = mergeChunkTask.mergeSeries();
@@ -98,17 +96,16 @@ public class SqueezeMergeTask implements Callable<Void> {
       double elapsedTime = (double) (System.currentTimeMillis() - startTime) / 1000.0;
       double byteRate = totalFileSize / elapsedTime / 1024 / 1024;
       double seriesRate = unmergedSeries.size() / elapsedTime;
-      double chunkRate = mergeContext.getTotalChunkWritten() / elapsedTime;
       double fileRate =
           (resource.getSeqFiles().size() + resource.getUnseqFiles().size()) / elapsedTime;
       double ptRate = mergeContext.getTotalPointWritten() / elapsedTime;
-      logger.info("{} ends after {}s, byteRate: {}MB/s, seriesRate {}/s, chunkRate: {}/s, "
+      logger.info("{} ends after {}s, byteRate: {}MB/s, seriesRate {}/s, "
               + "fileRate: {}/s, ptRate: {}/s",
-          taskName, elapsedTime, byteRate, seriesRate, chunkRate, fileRate, ptRate);
+          taskName, elapsedTime, byteRate, seriesRate, fileRate, ptRate);
     }
   }
 
-  private void cleanUp(boolean executeCallback) throws IOException {
+  void cleanUp(boolean executeCallback) throws IOException {
     logger.info("{} is cleaning up", taskName);
 
     resource.clear();
@@ -118,11 +115,6 @@ public class SqueezeMergeTask implements Callable<Void> {
       mergeLogger.close();
     }
 
-    for (TsFileResource seqFile : resource.getSeqFiles()) {
-      File mergeFile = FSFactoryProducer.getFSFactory().getFile(seqFile.getFile().getPath() + MERGE_SUFFIX);
-      mergeFile.delete();
-    }
-
     File logFile = FSFactoryProducer.getFSFactory().getFile(storageGroupSysDir,
         MergeLogger.MERGE_LOG_NAME);
     if (executeCallback) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 64aae8e..3c4f982 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.engine.storagegroup;
 
-import static org.apache.iotdb.db.engine.merge.inplace.task.MergeTask.MERGE_SUFFIX;
+import static org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask.MERGE_SUFFIX;
 import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
 import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SEPARATOR;
 import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
@@ -46,8 +46,8 @@ import org.apache.iotdb.db.engine.merge.IMergeFileSelector;
 import org.apache.iotdb.db.engine.merge.inplace.selector.MaxFileMergeFileSelector;
 import org.apache.iotdb.db.engine.merge.inplace.selector.MaxSeriesMergeFileSelector;
 import org.apache.iotdb.db.engine.merge.inplace.selector.MergeFileStrategy;
-import org.apache.iotdb.db.engine.merge.inplace.task.MergeTask;
-import org.apache.iotdb.db.engine.merge.inplace.task.RecoverMergeTask;
+import org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask;
+import org.apache.iotdb.db.engine.merge.inplace.task.RecoverInplaceMergeTask;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
@@ -223,7 +223,7 @@ public class StorageGroupProcessor {
       if (mergingMods.exists()) {
         mergingModification = new ModificationFile(mergingMods.getPath());
       }
-      RecoverMergeTask recoverMergeTask = new RecoverMergeTask(seqTsFiles, unseqTsFiles,
+      RecoverInplaceMergeTask recoverMergeTask = new RecoverInplaceMergeTask(seqTsFiles, unseqTsFiles,
           storageGroupSysDir.getPath(), this::mergeEndAction, taskName,
           IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(), storageGroupName);
       logger.info("{} a RecoverMergeTask {} starts...", storageGroupName, taskName);
@@ -878,7 +878,7 @@ public class StorageGroupProcessor {
         // cached during selection
         mergeResource.setCacheDeviceMeta(true);
 
-        MergeTask mergeTask = new MergeTask(mergeResource, storageGroupSysDir.getPath(),
+        InplaceMergeTask mergeTask = new InplaceMergeTask(mergeResource, storageGroupSysDir.getPath(),
             this::mergeEndAction, taskName, fullMerge, fileSelector.getConcurrentMergeNum(), storageGroupName);
         mergingModification = new ModificationFile(storageGroupSysDir + File.separator + MERGING_MODIFICAITON_FILE_NAME);
         MergeManager.getINSTANCE().submitMainTask(mergeTask);
@@ -962,7 +962,7 @@ public class StorageGroupProcessor {
     }
     // block new queries and insertions to prevent the seqFiles from changing
     writeLock();
-    mergeLock.writeLock().lock();;
+    mergeLock.writeLock().lock();
     try {
       removeUnseqFiles(unseqFiles);
       // insert the new file into a proper place
@@ -1025,8 +1025,21 @@ public class StorageGroupProcessor {
 
     if (unseqFiles.isEmpty()) {
       // merge runtime exception arose, just end this merge
-      isMerging = false;
-      logger.info("{} a merge task abnormally ends", storageGroupName);
+      mergeLock.writeLock().lock();
+      try {
+        if (mergingModification != null) {
+          logger.debug("{} is updating the merged file's modification file", storageGroupName);
+          mergingModification.remove();
+          mergingModification = null;
+        }
+      } catch (IOException e) {
+        logger.error("{} cannot remove merge modifications after merge abnormally ends",
+            storageGroupName, e);
+      } finally {
+        isMerging = false;
+        logger.info("{} a merge task abnormally ends", storageGroupName);
+        mergeLock.writeLock().unlock();
+      }
       return;
     }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
index 0c525c8..bfb1218 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
@@ -29,7 +29,7 @@ import java.io.IOException;
 import java.util.List;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.engine.merge.manage.MergeResource;
-import org.apache.iotdb.db.engine.merge.inplace.task.MergeTask;
+import org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.MetadataErrorException;
 import org.apache.iotdb.db.exception.PathErrorException;
@@ -58,8 +58,8 @@ public class MergeLogTest extends MergeTest {
 
   @Test
   public void testMergeLog() throws Exception {
-    MergeTask mergeTask =
-        new MergeTask(new MergeResource(seqResources.subList(0, 1), unseqResources.subList(0, 1)),
+    InplaceMergeTask mergeTask =
+        new InplaceMergeTask(new MergeResource(seqResources.subList(0, 1), unseqResources.subList(0, 1)),
             tempSGDir.getPath(), this::testCallBack, "test", false, 1, MERGE_TEST_SG);
     mergeTask.call();
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java
index 67082fd..eae1982 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java
@@ -25,7 +25,7 @@ import java.util.List;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.merge.manage.MergeResource;
-import org.apache.iotdb.db.engine.merge.inplace.task.MergeTask;
+import org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 
@@ -43,8 +43,8 @@ public class MergePerfTest extends MergeTest{
     timeConsumption = System.currentTimeMillis();
     MergeResource resource = new MergeResource(seqResources, unseqResources);
     resource.setCacheDeviceMeta(true);
-    MergeTask mergeTask =
-        new MergeTask(resource, tempSGDir.getPath(), (k, v
+    InplaceMergeTask mergeTask =
+        new InplaceMergeTask(resource, tempSGDir.getPath(), (k, v
             , l) -> {}, "test", fullMerge, 100, MERGE_TEST_SG);
     mergeTask.call();
     timeConsumption = System.currentTimeMillis() - timeConsumption;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
index dc9d6e1..0b2b951 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
@@ -27,7 +27,7 @@ import java.util.Collections;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.merge.manage.MergeResource;
-import org.apache.iotdb.db.engine.merge.inplace.task.MergeTask;
+import org.apache.iotdb.db.engine.merge.inplace.task.InplaceMergeTask;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.exception.MetadataErrorException;
 import org.apache.iotdb.db.exception.PathErrorException;
@@ -60,9 +60,9 @@ public class MergeTaskTest extends MergeTest {
 
   @Test
   public void testMerge() throws Exception {
-    MergeTask mergeTask =
-        new MergeTask(new MergeResource(seqResources, unseqResources), tempSGDir.getPath(), (k, v
-            , l) -> {}, "test", false, 1, MERGE_TEST_SG);
+    InplaceMergeTask mergeTask =
+        new InplaceMergeTask(new MergeResource(seqResources, unseqResources), tempSGDir.getPath(), (k, v
+            , l, n) -> {}, "test", false, 1, MERGE_TEST_SG);
     mergeTask.call();
 
     QueryContext context = new QueryContext();
@@ -81,8 +81,9 @@ public class MergeTaskTest extends MergeTest {
 
   @Test
   public void testFullMerge() throws Exception {
-    MergeTask mergeTask =
-        new MergeTask(new MergeResource(seqResources, unseqResources), tempSGDir.getPath(), (k, v, l) -> {}, "test",
+    InplaceMergeTask mergeTask =
+        new InplaceMergeTask(new MergeResource(seqResources, unseqResources), tempSGDir.getPath()
+            , (k, v, l, n) -> {}, "test",
             true, 1, MERGE_TEST_SG);
     mergeTask.call();
 
@@ -103,8 +104,9 @@ public class MergeTaskTest extends MergeTest {
   @Test
   public void testChunkNumThreshold() throws Exception {
     IoTDBDescriptor.getInstance().getConfig().setChunkMergePointThreshold(Integer.MAX_VALUE);
-    MergeTask mergeTask =
-        new MergeTask(new MergeResource(seqResources, unseqResources), tempSGDir.getPath(), (k, v, l) -> {}, "test",
+    InplaceMergeTask mergeTask =
+        new InplaceMergeTask(new MergeResource(seqResources, unseqResources), tempSGDir.getPath()
+            , (k, v, l, n) -> {}, "test",
             false, 1, MERGE_TEST_SG);
     mergeTask.call();
 
@@ -124,9 +126,9 @@ public class MergeTaskTest extends MergeTest {
 
   @Test
   public void testPartialMerge1() throws Exception {
-    MergeTask mergeTask =
-        new MergeTask(new MergeResource(seqResources, unseqResources.subList(0, 1)), tempSGDir.getPath(),
-            (k, v, l) -> {}, "test", false, 1, MERGE_TEST_SG);
+    InplaceMergeTask mergeTask =
+        new InplaceMergeTask(new MergeResource(seqResources, unseqResources.subList(0, 1)), tempSGDir.getPath(),
+            (k, v, l, n) -> {}, "test", false, 1, MERGE_TEST_SG);
     mergeTask.call();
 
     QueryContext context = new QueryContext();
@@ -149,9 +151,9 @@ public class MergeTaskTest extends MergeTest {
 
   @Test
   public void testPartialMerge2() throws Exception {
-    MergeTask mergeTask =
-        new MergeTask(new MergeResource(seqResources, unseqResources.subList(5, 6)), tempSGDir.getPath(),
-            (k, v, l) -> {}, "test", false, 1, MERGE_TEST_SG);
+    InplaceMergeTask mergeTask =
+        new InplaceMergeTask(new MergeResource(seqResources, unseqResources.subList(5, 6)), tempSGDir.getPath(),
+            (k, v, l, n) -> {}, "test", false, 1, MERGE_TEST_SG);
     mergeTask.call();
 
     QueryContext context = new QueryContext();
@@ -170,9 +172,9 @@ public class MergeTaskTest extends MergeTest {
 
   @Test
   public void testPartialMerge3() throws Exception {
-    MergeTask mergeTask =
-        new MergeTask(new MergeResource(seqResources, unseqResources.subList(0, 5)), tempSGDir.getPath(),
-            (k, v, l) -> {}, "test", false, 1, MERGE_TEST_SG);
+    InplaceMergeTask mergeTask =
+        new InplaceMergeTask(new MergeResource(seqResources, unseqResources.subList(0, 5)), tempSGDir.getPath(),
+            (k, v, l, n) -> {}, "test", false, 1, MERGE_TEST_SG);
     mergeTask.call();
 
     QueryContext context = new QueryContext();
@@ -200,9 +202,9 @@ public class MergeTaskTest extends MergeTest {
     seqResources.get(0).getModFile().close();
 
 
-    MergeTask mergeTask =
-        new MergeTask(new MergeResource(seqResources, unseqResources.subList(0, 1)), tempSGDir.getPath(),
-            (k, v, l) -> {
+    InplaceMergeTask mergeTask =
+        new InplaceMergeTask(new MergeResource(seqResources, unseqResources.subList(0, 1)), tempSGDir.getPath(),
+            (k, v, l, n) -> {
               try {
                 seqResources.get(0).removeModFile();
               } catch (IOException e) {