You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/07/11 08:53:08 UTC

[incubator-iotdb] branch dev_merge updated: add merge recovery

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev_merge
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/dev_merge by this push:
     new ad82c80  add merge recovery
ad82c80 is described below

commit ad82c803313466780fe1795fdf290bfc985b296c
Author: 江天 <jt...@163.com>
AuthorDate: Thu Jul 11 16:50:48 2019 +0800

    add merge recovery
---
 .../apache/iotdb/db/engine/merge/MergeLogger.java  | 117 ++++++++
 .../apache/iotdb/db/engine/merge/MergeTask.java    | 151 ++++++++---
 .../iotdb/db/engine/merge/RecoverMergeTask.java    | 296 +++++++++++++++++++++
 .../db/engine/storagegroup/TsFileResource.java     |   6 +-
 .../write/TsFileNotCompleteException.java          |  40 +++
 .../org/apache/iotdb/tsfile/read/common/Path.java  |   2 +-
 .../write/writer/ForceAppendTsFileWriter.java      |  52 +---
 .../write/writer/RestorableTsFileIOWriter.java     |   2 +-
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |  39 ++-
 9 files changed, 619 insertions(+), 86 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeLogger.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeLogger.java
new file mode 100644
index 0000000..64aeac9
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeLogger.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.read.common.Path;
+
+public class MergeLogger {
+
+  public static final String MERGE_LOG_NAME = "merge.log";
+
+  public static final String STR_SEQ_FILES = "seqFiles";
+  public static final String STR_UNSEQ_FILES = "unseqFiles";
+  public static final String STR_START = "start";
+  public static final String STR_END = "end";
+  public static final String STR_ALL_TS_END = "all ts end";
+  public static final String STR_MERGE_START = "merge start";
+  public static final String STR_MERGE_END = "merge end";
+
+  private BufferedWriter logStream;
+
+  public MergeLogger(String storageGroupDir) throws IOException {
+    logStream = new BufferedWriter(new FileWriter(new File(storageGroupDir, MERGE_LOG_NAME), true));
+  }
+
+  public void close() throws IOException {
+    logStream.close();
+  }
+
+  public void logSeqFiles(List<TsFileResource> seqFiles) throws IOException {
+    logStream.write(STR_SEQ_FILES);
+    logStream.newLine();
+    for (TsFileResource tsFileResource : seqFiles) {
+      logStream.write(tsFileResource.getFile().getAbsolutePath());
+    }
+    logStream.flush();
+  }
+
+  public void logUnseqFiles(List<TsFileResource> unseqFiles) throws IOException {
+    logStream.write(STR_UNSEQ_FILES);
+    logStream.newLine();
+    for (TsFileResource tsFileResource : unseqFiles) {
+      logStream.write(tsFileResource.getFile().getAbsolutePath());
+    }
+    logStream.flush();
+  }
+
+  public void logMergeStart() throws IOException {
+    logStream.write(STR_MERGE_START);
+    logStream.newLine();
+    logStream.flush();
+  }
+
+  public void logTSStart(Path path) throws IOException {
+    logStream.write(path.getFullPath() + " " + STR_START);
+    logStream.newLine();
+    logStream.flush();
+  }
+
+  public void logFilePositionUpdate(File file) throws IOException {
+    logStream.write(String.format("%s %d", file.getAbsolutePath(), file.length()));
+    logStream.newLine();
+    logStream.flush();
+  }
+
+  public void logTSEnd(Path path) throws IOException {
+    logStream.write(path.getFullPath() + " " + STR_END);
+    logStream.newLine();
+    logStream.flush();
+  }
+
+  public void logAllTsEnd() throws IOException {
+    logStream.write(STR_ALL_TS_END);
+    logStream.newLine();
+    logStream.flush();
+  }
+
+  public void logFileMergeStart(File file, long position) throws IOException {
+    logStream.write(String.format("%s %d", file.getAbsolutePath(), position));
+    logStream.newLine();
+    logStream.flush();
+  }
+
+  public void logFileMergeEnd(File file) throws IOException {
+    logStream.write(file.getAbsolutePath() + " " + STR_END);
+    logStream.newLine();
+    logStream.flush();
+  }
+
+  public void logMergeEnd() throws IOException {
+    logStream.write(STR_MERGE_END);
+    logStream.newLine();
+    logStream.flush();
+  }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
index d0df83b..4ddbc2e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.factory.SeriesReaderFactoryImpl;
 import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
 import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -73,22 +74,24 @@ import org.slf4j.LoggerFactory;
 public class MergeTask implements Callable<Void> {
 
   private static final int MIN_CHUNK_POINT_NUM = 4096;
-  private static final String MERGE_SUFFIX = ".merge";
+  public static final String MERGE_SUFFIX = ".merge";
   private static final Logger logger = LoggerFactory.getLogger(MergeTask.class);
 
-  private List<TsFileResource> seqFiles;
-  private List<TsFileResource> unseqFiles;
+  protected List<TsFileResource> seqFiles;
+  protected List<TsFileResource> unseqFiles;
+  protected String storageGroupDir;
+  protected MergeLogger mergeLogger;
 
   private TimeValuePair currTimeValuePair;
 
-  private Map<TsFileResource, TsFileSequenceReader> fileReaderCache = new HashMap<>();
-  private Map<TsFileResource, RestorableTsFileIOWriter> fileWriterCache = new HashMap<>();
-  private Map<TsFileResource, List<Modification>> modificationCache = new HashMap<>();
-  private Map<TsFileResource, MetadataQuerier> metadataQuerierCache = new HashMap<>();
+  protected Map<TsFileResource, TsFileSequenceReader> fileReaderCache = new HashMap<>();
+  protected Map<TsFileResource, RestorableTsFileIOWriter> fileWriterCache = new HashMap<>();
+  protected Map<TsFileResource, List<Modification>> modificationCache = new HashMap<>();
+  protected Map<TsFileResource, MetadataQuerier> metadataQuerierCache = new HashMap<>();
 
-  private Map<TsFileResource, Integer> mergedChunkCnt = new HashMap<>();
-  private Map<TsFileResource, Integer> unmergedChunkCnt = new HashMap<>();
-  private Map<TsFileResource, Map<Path, List<Long>>> unmergedChunkStartTimes = new HashMap<>();
+  protected Map<TsFileResource, Integer> mergedChunkCnt = new HashMap<>();
+  protected Map<TsFileResource, Integer> unmergedChunkCnt = new HashMap<>();
+  protected Map<TsFileResource, Map<Path, List<Long>>> unmergedChunkStartTimes = new HashMap<>();
 
   private QueryContext mergeContext = new QueryContext();
 
@@ -96,10 +99,16 @@ public class MergeTask implements Callable<Void> {
 
   private long currDeviceMaxTime;
 
+  protected MergeTask() {
+
+  }
+
   public MergeTask(List<TsFileResource> seqFiles,
-      List<TsFileResource> unseqFiles) {
+      List<TsFileResource> unseqFiles, String storageGroupDir) throws IOException {
     this.seqFiles = seqFiles;
     this.unseqFiles = unseqFiles;
+    this.storageGroupDir = storageGroupDir;
+    this.mergeLogger = new MergeLogger(storageGroupDir);
   }
 
   @Override
@@ -109,20 +118,22 @@ public class MergeTask implements Callable<Void> {
   }
 
   private void doMerge() throws MetadataErrorException, IOException {
+
+    logFiles();
+
     List<Path> unmergedSeries = collectPathsInUnseqFiles();
+    mergeSeries(unmergedSeries);
 
-    for (TsFileResource seqFile : seqFiles) {
-      unmergedChunkStartTimes.put(seqFile, new HashMap<>());
-    }
+    List<TsFileResource> unmergedFiles = seqFiles;
+    mergeFiles(unmergedFiles);
 
-    // merge each series and write data into each seqFile's temp merge file
-    for (Path path : unmergedSeries) {
-      mergeOnePath(path);
-    }
+    cleanUp(true);
+  }
 
+  protected void mergeFiles(List<TsFileResource> unmergedFiles) throws IOException {
     // decide whether to write the unmerged chunks to the merge files or to move the merged data
     // back to the origin seqFile's
-    for (TsFileResource seqFile : seqFiles) {
+    for (TsFileResource seqFile : unmergedFiles) {
       int mergedChunkNum = mergedChunkCnt.getOrDefault(seqFile, 0);
       int unmergedChunkNum = unmergedChunkCnt.getOrDefault(seqFile, 0);
       if (mergedChunkNum >= unmergedChunkNum) {
@@ -133,11 +144,30 @@ public class MergeTask implements Callable<Void> {
         moveMergedToOld(seqFile);
       }
     }
+    mergeLogger.logMergeEnd();
+  }
+
+  protected void logFiles() throws IOException {
+    mergeLogger.logSeqFiles(seqFiles);
+    mergeLogger.logUnseqFiles(unseqFiles);
+    mergeLogger.logMergeStart();
+  }
 
-    cleanUp();
+  protected void mergeSeries(List<Path> unmergedSeries) throws IOException {
+    for (TsFileResource seqFile : seqFiles) {
+      unmergedChunkStartTimes.put(seqFile, new HashMap<>());
+    }
+    // merge each series and write data into each seqFile's temp merge file
+    for (Path path : unmergedSeries) {
+      mergeLogger.logTSStart(path);
+      mergeOnePath(path);
+      mergeLogger.logTSEnd(path);
+    }
+    mergeLogger.logAllTsEnd();
   }
 
-  private void cleanUp() throws IOException {
+
+  protected void cleanUp(boolean executeCallback) throws IOException {
     for (TsFileSequenceReader sequenceReader : fileReaderCache.values()) {
       sequenceReader.close();
     }
@@ -149,12 +179,38 @@ public class MergeTask implements Callable<Void> {
     mergedChunkCnt.clear();
     unmergedChunkCnt.clear();
     unmergedChunkStartTimes.clear();
+
+    mergeLogger.close();
+    File logFile = new File(storageGroupDir, MergeLogger.MERGE_LOG_NAME);
+    logFile.delete();
+    for (TsFileResource seqFile : seqFiles) {
+      File mergeFile = new File(seqFile.getFile().getPath() + MERGE_SUFFIX);
+      mergeFile.delete();
+    }
   }
 
   private void moveMergedToOld(TsFileResource seqFile) throws IOException {
+    int mergedChunkNum = mergedChunkCnt.getOrDefault(seqFile, 0);
+    if (mergedChunkNum == 0) {
+      RestorableTsFileIOWriter newFileWriter = getMergeFileWriter(seqFile);
+      newFileWriter.close();
+      newFileWriter.getFile().delete();
+      fileWriterCache.remove(seqFile);
+      return;
+    }
+
     seqFile.getMergeQueryLock().writeLock().lock();
     try {
-      ForceAppendTsFileWriter oldFileWriter = new ForceAppendTsFileWriter(seqFile.getFile());
+      TsFileIOWriter oldFileWriter;
+      try {
+        oldFileWriter = new ForceAppendTsFileWriter(seqFile.getFile());
+        mergeLogger.logFileMergeStart(seqFile.getFile(), ((ForceAppendTsFileWriter) oldFileWriter).getTruncatePosition());
+        ((ForceAppendTsFileWriter) oldFileWriter).doTruncate();
+      } catch (TsFileNotCompleteException e) {
+        // this file may already be truncated if this merge is a system reboot merge
+        oldFileWriter = new RestorableTsFileIOWriter(seqFile.getFile());
+      }
+
       oldFileWriter.filterChunks(unmergedChunkStartTimes.get(seqFile));
 
       RestorableTsFileIOWriter newFileWriter = getMergeFileWriter(seqFile);
@@ -168,6 +224,10 @@ public class MergeTask implements Callable<Void> {
         }
       }
       oldFileWriter.endFile(new FileSchema(oldFileWriter.getKnownSchema()));
+
+      seqFile.serialize();
+      mergeLogger.logFileMergeEnd(seqFile.getFile());
+
       newFileWriter.getFile().delete();
     } finally {
       seqFile.getMergeQueryLock().writeLock().unlock();
@@ -192,31 +252,39 @@ public class MergeTask implements Callable<Void> {
     fileWriter.endChunkGroup(maxVersion);
   }
 
-
-
   private void moveUnmergedToNew(TsFileResource seqFile) throws IOException {
     Map<Path, List<Long>> fileUnmergedChunkStartTimes = this.unmergedChunkStartTimes.get(seqFile);
     RestorableTsFileIOWriter fileWriter = getMergeFileWriter(seqFile);
     ChunkLoader chunkLoader = new ChunkLoaderImpl(getFileReader(seqFile));
 
-    for (Entry<Path, List<Long>> entry : fileUnmergedChunkStartTimes.entrySet()) {
-      Path path = entry.getKey();
-      List<Long> chunkStartTimes = entry.getValue();
-      if (chunkStartTimes.isEmpty()) {
-        continue;
-      }
+    mergeLogger.logFileMergeStart(fileWriter.getFile(), fileWriter.getFile().length());
 
-      List<ChunkMetaData> chunkMetaDataList = queryChunkMetadata(path, seqFile);
-      MeasurementSchema measurementSchema = getSchema(path);
-      IChunkWriter chunkWriter = getChunkWriter(measurementSchema);
+    int unmergedChunkNum = unmergedChunkCnt.getOrDefault(seqFile, 0);
 
-      fileWriter.startChunkGroup(path.getDevice());
-      long maxVersion = writeUnmergedChunks(chunkStartTimes, chunkMetaDataList, chunkLoader,
-          chunkWriter, measurementSchema, fileWriter);
-      fileWriter.endChunkGroup(maxVersion);
+    if (unmergedChunkNum > 0) {
+      for (Entry<Path, List<Long>> entry : fileUnmergedChunkStartTimes.entrySet()) {
+        Path path = entry.getKey();
+        List<Long> chunkStartTimes = entry.getValue();
+        if (chunkStartTimes.isEmpty()) {
+          continue;
+        }
+
+        List<ChunkMetaData> chunkMetaDataList = queryChunkMetadata(path, seqFile);
+        MeasurementSchema measurementSchema = getSchema(path);
+        IChunkWriter chunkWriter = getChunkWriter(measurementSchema);
+
+        fileWriter.startChunkGroup(path.getDevice());
+        long maxVersion = writeUnmergedChunks(chunkStartTimes, chunkMetaDataList, chunkLoader,
+            chunkWriter, measurementSchema, fileWriter);
+        fileWriter.endChunkGroup(maxVersion);
+      }
     }
+
     fileWriter.endFile(new FileSchema(fileWriter.getKnownSchema()));
 
+    seqFile.serialize();
+    mergeLogger.logFileMergeEnd(fileWriter.getFile());
+
     seqFile.getMergeQueryLock().writeLock().lock();
     try {
       FileUtils.moveFile(fileWriter.getFile(), seqFile.getFile());
@@ -298,6 +366,7 @@ public class MergeTask implements Callable<Void> {
     if (mergeChunks(seqChunkMeta, fileLimitTime, chunkLoader, measurementSchema,
         unseqReader, mergeFileWriter, currTsFile, path)) {
       mergeFileWriter.endChunkGroup(seqChunkMeta.get(seqChunkMeta.size() - 1).getVersion());
+      mergeLogger.logFilePositionUpdate(mergeFileWriter.getFile());
     }
     currTsFile.updateTime(path.getDevice(), currDeviceMaxTime);
   }
@@ -455,7 +524,7 @@ public class MergeTask implements Callable<Void> {
     return pathModifications;
   }
 
-  private List<Path> collectPathsInUnseqFiles() throws MetadataErrorException {
+  protected List<Path> collectPathsInUnseqFiles() throws MetadataErrorException {
     Set<String> devices = new HashSet<>();
     for (TsFileResource tsFileResource : unseqFiles) {
       devices.addAll(tsFileResource.getEndTimeMap().keySet());
@@ -481,7 +550,7 @@ public class MergeTask implements Callable<Void> {
     return null;
   }
 
-  private RestorableTsFileIOWriter getMergeFileWriter(TsFileResource resource) throws IOException {
+  protected RestorableTsFileIOWriter getMergeFileWriter(TsFileResource resource) throws IOException {
     RestorableTsFileIOWriter writer = fileWriterCache.get(resource);
     if (writer == null) {
       writer = new RestorableTsFileIOWriter(new File(resource.getFile().getPath() + MERGE_SUFFIX));
@@ -490,7 +559,7 @@ public class MergeTask implements Callable<Void> {
     return writer;
   }
 
-  private MetadataQuerier getMetadataQuerier(TsFileResource seqFile) throws IOException {
+  protected MetadataQuerier getMetadataQuerier(TsFileResource seqFile) throws IOException {
     MetadataQuerier metadataQuerier = metadataQuerierCache.get(seqFile);
     if (metadataQuerier == null) {
       metadataQuerier = new MetadataQuerierByFileImpl(getFileReader(seqFile));
@@ -499,7 +568,7 @@ public class MergeTask implements Callable<Void> {
     return metadataQuerier;
   }
 
-  private List<ChunkMetaData> queryChunkMetadata(Path path, TsFileResource seqFile)
+  protected List<ChunkMetaData> queryChunkMetadata(Path path, TsFileResource seqFile)
       throws IOException {
     MetadataQuerier metadataQuerier = getMetadataQuerier(seqFile);
     List<ChunkMetaData> chunkMetaDataList = metadataQuerier.getChunkMetaDataList(path);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java
new file mode 100644
index 0000000..9234443
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.MetadataErrorException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+
+public class RecoverMergeTask extends MergeTask {
+
+  private String currLine;
+  private List<TsFileResource> mergeSeqFiles = new ArrayList<>();
+  private List<TsFileResource> mergeUnseqFiles = new ArrayList<>();
+
+  private Map<File, Long> fileLastPositions = new HashMap<>();
+  private Map<File, Long> tempFileLastPositions = new HashMap<>();
+
+  private List<Path> unmergedPaths;
+  private List<Path> mergedPaths = new ArrayList<>();
+  private List<TsFileResource> unmergedFiles;
+
+  public RecoverMergeTask(
+      List<TsFileResource> allSeqFiles,
+      List<TsFileResource> allUnseqFiles,
+      String storageGroupDir) throws IOException {
+    super();
+    this.seqFiles = allSeqFiles;
+    this.unseqFiles = allUnseqFiles;
+    this.storageGroupDir = storageGroupDir;
+  }
+
+  public void recoverMerge(boolean continueMerge) throws IOException, MetadataErrorException {
+    File logFile = new File(storageGroupDir, MergeLogger.MERGE_LOG_NAME);
+    if (!logFile.exists()) {
+      return;
+    }
+    mergeSeqFiles = new ArrayList<>();
+    mergeUnseqFiles = new ArrayList<>();
+
+    Status status = determineStatus(logFile);
+    switch (status) {
+      case NONE:
+        logFile.delete();
+        break;
+      case FILES_LOGGED:
+        if (continueMerge) {
+          mergeLogger = new MergeLogger(storageGroupDir);
+          truncateFiles();
+          recoverChunkCounts();
+          mergeSeries(unmergedPaths);
+
+          mergeFiles(seqFiles);
+        }
+        cleanUp(continueMerge);
+        break;
+      case ALL_TS_MERGED:
+        if (continueMerge) {
+          mergeLogger = new MergeLogger(storageGroupDir);
+          truncateFiles();
+          recoverChunkCounts();
+          mergeFiles(unmergedFiles);
+        } else {
+          // NOTICE: although some of the seqFiles may have been truncated, later TsFile recovery
+          // will recover them, so they are not a concern here
+          truncateFiles();
+        }
+        cleanUp(continueMerge);
+        break;
+      case MERGE_END:
+        cleanUp(continueMerge);
+        break;
+    }
+  }
+
+
+  // scan metadata to compute how many chunks are merged/unmerged so at last we can decide to
+  // move the merged chunks or the unmerged chunks
+  private void recoverChunkCounts() throws IOException {
+    for (TsFileResource tsFileResource : seqFiles) {
+      RestorableTsFileIOWriter mergeFileWriter = getMergeFileWriter(tsFileResource);
+      mergeFileWriter.makeMetadataVisible();
+      unmergedChunkStartTimes.put(tsFileResource, new HashMap<>());
+      for(Path path : mergedPaths) {
+        recoverChunkCounts(path, tsFileResource, mergeFileWriter);
+      }
+    }
+  }
+
+  private void recoverChunkCounts(Path path, TsFileResource tsFileResource,
+      RestorableTsFileIOWriter mergeFileWriter) throws IOException {
+    unmergedChunkStartTimes.get(tsFileResource).put(path, new ArrayList<>());
+
+    List<ChunkMetaData> seqFileChunks = queryChunkMetadata(path, tsFileResource);
+    List<ChunkMetaData> mergeFileChunks =
+        mergeFileWriter.getVisibleMetadatas(path.getDevice(), path.getMeasurement(), null);
+    mergedChunkCnt.compute(tsFileResource, (k, v) -> v == null ? mergeFileChunks.size() :
+        v + mergeFileChunks.size());
+    int seqIndex = 0;
+    int mergeIndex = 0;
+    int unmergedCnt = 0;
+    while (seqIndex < seqFileChunks.size() && mergeIndex < mergeFileChunks.size()) {
+      ChunkMetaData seqChunk = seqFileChunks.get(seqIndex);
+      ChunkMetaData mergedChunk = mergeFileChunks.get(mergeIndex);
+      if (seqChunk.getStartTime() < mergedChunk.getStartTime()) {
+        // this seqChunk is unmerged
+        unmergedCnt ++;
+        seqIndex ++;
+        unmergedChunkStartTimes.get(tsFileResource).get(path).add(seqChunk.getStartTime());
+      } else if (mergedChunk.getStartTime() <= seqChunk.getStartTime() &&
+          seqChunk.getStartTime() <= mergedChunk.getEndTime()) {
+        // this seqChunk is merged
+        seqIndex ++;
+      } else {
+        // seqChunk.startTime > mergeChunk.endTime, find next mergedChunk that may cover the
+        // seqChunk
+        mergeIndex ++;
+      }
+    }
+    int finalUnmergedCnt = unmergedCnt;
+    unmergedChunkCnt.compute(tsFileResource, (k, v) -> v == null ? finalUnmergedCnt :
+        v + finalUnmergedCnt);
+  }
+
+  private void truncateFiles() throws IOException {
+    for (Entry<File, Long> entry : fileLastPositions.entrySet()) {
+      File file = entry.getKey();
+      Long lastPosition = entry.getValue();
+      if (file.exists() && file.length() != lastPosition) {
+        try (FileInputStream fileInputStream = new FileInputStream(file)) {
+          FileChannel channel = fileInputStream.getChannel();
+          channel.truncate(lastPosition);
+          channel.close();
+        }
+      }
+    }
+  }
+
+  private Status determineStatus(File logFile) throws IOException, MetadataErrorException {
+    Status status = Status.NONE;
+    try (BufferedReader bufferedReader =
+        new BufferedReader(new FileReader(logFile))) {
+      currLine = bufferedReader.readLine();
+      if (currLine != null) {
+        if (currLine.equals(MergeLogger.STR_SEQ_FILES)) {
+          analyzeSeqFiles(bufferedReader);
+        }
+        if (currLine.equals(MergeLogger.STR_UNSEQ_FILES)) {
+          analyzeUnseqFiles(bufferedReader);
+        }
+        if (currLine.equals(MergeLogger.STR_MERGE_START)) {
+          status = Status.FILES_LOGGED;
+          seqFiles = mergeSeqFiles;
+          unseqFiles = mergeUnseqFiles;
+          for (TsFileResource seqFile : seqFiles) {
+            File mergeFile = new File(seqFile.getFile().getPath() + MergeTask.MERGE_SUFFIX);
+            fileLastPositions.put(mergeFile, 0L);
+          }
+          unmergedPaths = collectPathsInUnseqFiles();
+          analyzeMergedSeries(bufferedReader, unmergedPaths);
+        }
+        if (currLine.equals(MergeLogger.STR_ALL_TS_END)) {
+          status = Status.ALL_TS_MERGED;
+          unmergedFiles = seqFiles;
+          analyzeMergedFiles(bufferedReader);
+        }
+        if (currLine.equals(MergeLogger.STR_MERGE_END)) {
+          status = Status.MERGE_END;
+        }
+      }
+    }
+    return status;
+  }
+
+  private void analyzeSeqFiles(BufferedReader bufferedReader) throws IOException {
+    while ((currLine = bufferedReader.readLine()) != null) {
+      if (currLine.equals(MergeLogger.STR_UNSEQ_FILES)) {
+        return;
+      }
+      Iterator<TsFileResource> iterator = seqFiles.iterator();
+      while (iterator.hasNext()) {
+        TsFileResource seqFile = iterator.next();
+        if (seqFile.getFile().getAbsolutePath().equals(currLine)) {
+          mergeSeqFiles.add(seqFile);
+          iterator.remove();
+          break;
+        }
+      }
+    }
+  }
+
+  private void analyzeUnseqFiles(BufferedReader bufferedReader) throws IOException {
+    while ((currLine = bufferedReader.readLine()) != null) {
+      if (currLine.equals(MergeLogger.STR_MERGE_START)) {
+        return;
+      }
+      Iterator<TsFileResource> iterator = unseqFiles.iterator();
+      while (iterator.hasNext()) {
+        TsFileResource unseqFile = iterator.next();
+        if (unseqFile.getFile().getAbsolutePath().equals(currLine)) {
+          mergeUnseqFiles.add(unseqFile);
+          iterator.remove();
+          break;
+        }
+      }
+    }
+  }
+
+  private void analyzeMergedSeries(BufferedReader bufferedReader, List<Path> unmergedPaths) throws IOException {
+    Path currTS = null;
+    while ((currLine = bufferedReader.readLine()) != null) {
+      if (currLine.equals(MergeLogger.STR_ALL_TS_END)) {
+        return;
+      }
+      if (currLine.contains(MergeLogger.STR_START)) {
+        // a TS starts to merge
+        String[] splits = currLine.split(" ");
+        currTS = new Path(splits[0]);
+        tempFileLastPositions.clear();
+      } else if (!currLine.contains(MergeLogger.STR_END)) {
+        // file position
+        String[] splits = currLine.split(" ");
+        File file = new File(splits[0]);
+        Long position = Long.parseLong(splits[1]);
+        tempFileLastPositions.put(file, position);
+      } else {
+        // a TS ends merging
+        unmergedPaths.remove(currTS);
+        for (Entry<File, Long> entry : tempFileLastPositions.entrySet()) {
+          fileLastPositions.put(entry.getKey(), entry.getValue());
+        }
+        mergedPaths.add(currTS);
+      }
+    }
+  }
+
+  private void analyzeMergedFiles(BufferedReader bufferedReader) throws IOException {
+    File currFile = null;
+    while ((currLine = bufferedReader.readLine()) != null) {
+      if (currLine.equals(MergeLogger.STR_MERGE_END)) {
+        return;
+      }
+      if (currLine.contains(MergeLogger.STR_START)) {
+        String[] splits = currLine.split(" ");
+        currFile = new File(splits[0]);
+        Long lastPost = Long.parseLong(splits[2]);
+        fileLastPositions.put(currFile, lastPost);
+      } else if (currLine.contains(MergeLogger.STR_END)) {
+        fileLastPositions.remove(currFile);
+        String seqFilePath = currFile.getAbsolutePath().replace(MergeTask.MERGE_SUFFIX, "");
+        Iterator<TsFileResource> unmergedFileIter = unmergedFiles.iterator();
+        while (unmergedFileIter.hasNext()) {
+          TsFileResource seqFile = unmergedFileIter.next();
+          if (seqFile.getFile().getAbsolutePath().equals(seqFilePath)) {
+            unmergedFileIter.remove();
+            break;
+          }
+        }
+      }
+    }
+  }
+
+  enum Status {
+    NONE, FILES_LOGGED, ALL_TS_MERGED, MERGE_END
+  }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 6744473..df979c8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
@@ -42,6 +43,7 @@ public class TsFileResource {
   private File file;
 
   public static final String RESOURCE_SUFFIX = ".resource";
+  public static final String TEMP_SUFFIX = ".temp";
 
   /**
    * device -> start time
@@ -109,7 +111,7 @@ public class TsFileResource {
 
   public void serialize() throws IOException {
     try (OutputStream outputStream = new BufferedOutputStream(
-        new FileOutputStream(file + RESOURCE_SUFFIX))) {
+        new FileOutputStream(file + RESOURCE_SUFFIX + TEMP_SUFFIX))) {
       ReadWriteIOUtils.write(this.startTimeMap.size(), outputStream);
       for (Entry<String, Long> entry : this.startTimeMap.entrySet()) {
         ReadWriteIOUtils.write(entry.getKey(), outputStream);
@@ -121,6 +123,8 @@ public class TsFileResource {
         ReadWriteIOUtils.write(entry.getValue(), outputStream);
       }
     }
+    FileUtils.moveFile(new File(file + RESOURCE_SUFFIX + TEMP_SUFFIX),
+        new File(file + RESOURCE_SUFFIX));
   }
 
   public void deSerialize() throws IOException {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/write/TsFileNotCompleteException.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/write/TsFileNotCompleteException.java
new file mode 100644
index 0000000..aa38fd9
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/write/TsFileNotCompleteException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.exception.write;
+
+import java.io.IOException;
+
+public class TsFileNotCompleteException extends IOException {
+
+  public TsFileNotCompleteException() {
+  }
+
+  public TsFileNotCompleteException(String message) {
+    super(message);
+  }
+
+  public TsFileNotCompleteException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public TsFileNotCompleteException(Throwable cause) {
+    super(cause);
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java
index 3d448d7..d552cd4 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java
@@ -189,7 +189,7 @@ public class Path implements Serializable {
 
   @Override
   public boolean equals(Object obj) {
-    return obj != null && obj instanceof Path && this.fullPath.equals(((Path) obj).fullPath);
+    return obj instanceof Path && this.fullPath.equals(((Path) obj).fullPath);
   }
 
   public boolean equals(String obj) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
index 736b080..b5b0382 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
@@ -21,17 +21,13 @@ package org.apache.iotdb.tsfile.write.writer;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
 import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
 import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
 import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 /**
@@ -41,6 +37,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 public class ForceAppendTsFileWriter extends TsFileIOWriter{
 
   private Map<String, MeasurementSchema> knownSchemas = new HashMap<>();
+  private long truncatePosition;
 
   public ForceAppendTsFileWriter(File file) throws IOException {
     this.out = new DefaultTsFileOutput(file, true);
@@ -48,14 +45,14 @@ public class ForceAppendTsFileWriter extends TsFileIOWriter{
 
     // file doesn't exist
     if (file.length() == 0 || !file.exists()) {
-      throw new IOException("File " + file.getPath() + " is not a complete TsFile");
+      throw new TsFileNotCompleteException("File " + file.getPath() + " is not a complete TsFile");
     }
 
     try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), true)) {
 
       // this tsfile is not complete
       if (!reader.isComplete()) {
-        throw new IOException("File " + file.getPath() + " is not a complete TsFile");
+        throw new TsFileNotCompleteException("File " + file.getPath() + " is not a complete TsFile");
       }
       TsFileMetaData fileMetaData = reader.readFileMetadata();
       Map<String, TsDeviceMetadataIndex> deviceMap = fileMetaData.getDeviceMap();
@@ -69,45 +66,18 @@ public class ForceAppendTsFileWriter extends TsFileIOWriter{
             deviceMetadataIndex.getOffset() : firstDeviceMetaPos;
       }
       // truncate metadata and marker
-      out.truncate(firstDeviceMetaPos - 1);
+      truncatePosition = firstDeviceMetaPos - 1;
       knownSchemas = fileMetaData.getMeasurementSchema();
+
     }
   }
 
-  /**
-   * Remove such ChunkMetadata that its startTime is not in chunkStartTimes
-   * @param chunkStartTimes
-   */
-  public void filterChunks(Map<Path, List<Long>> chunkStartTimes) {
-    Map<Path, Integer> startTimeIdxes = new HashMap<>();
-    chunkStartTimes.forEach((p, t) -> startTimeIdxes.put(p, 0));
+  public void doTruncate() throws IOException {
+    out.truncate(truncatePosition);
+  }
 
-    Iterator<ChunkGroupMetaData> chunkGroupMetaDataIterator = chunkGroupMetaDataList.iterator();
-    while (chunkGroupMetaDataIterator.hasNext()) {
-      ChunkGroupMetaData chunkGroupMetaData = chunkGroupMetaDataIterator.next();
-      String deviceId = chunkGroupMetaData.getDeviceID();
-      int chunkNum = chunkGroupMetaData.getChunkMetaDataList().size();
-      Iterator<ChunkMetaData> chunkMetaDataIterator =
-          chunkGroupMetaData.getChunkMetaDataList().iterator();
-      while (chunkMetaDataIterator.hasNext()) {
-        ChunkMetaData chunkMetaData = chunkMetaDataIterator.next();
-        Path path = new Path(deviceId, chunkMetaData.getMeasurementUid());
-        int startTimeIdx = startTimeIdxes.get(path);
-        List<Long> pathChunkStartTimes = chunkStartTimes.get(path);
-        boolean chunkValid = startTimeIdx < pathChunkStartTimes.size()
-            && pathChunkStartTimes.get(startTimeIdx) == chunkMetaData.getStartTime();
-        if (!chunkValid) {
-          chunkGroupMetaDataIterator.remove();
-          chunkNum--;
-          invalidChunkNum++;
-        } else {
-          startTimeIdxes.put(path, startTimeIdx + 1);
-        }
-      }
-      if (chunkNum == 0) {
-        chunkGroupMetaDataIterator.remove();
-      }
-    }
+  public long getTruncatePosition() {
+    return truncatePosition;
   }
 
   @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index e7802a6..2c2399a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -128,7 +128,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
         // filter: if a device'sensor is defined as float type, and data has been persistent.
         // Then someone deletes the timeseries and recreate it with Int type. We have to ignore
         // all the stale data.
-        if (dataType.equals(chunkMetaData.getTsDataType())) {
+        if (dataType == null || dataType.equals(chunkMetaData.getTsDataType())) {
           chunkMetaDatas.add(chunkMetaData);
         }
       }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 2828b6f..f4750a3 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -42,6 +43,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.utils.BytesUtils;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -356,7 +358,6 @@ public class TsFileIOWriter {
 
   /**
    * close the outputStream or file channel without writing FileMetadata.
-   * This is just used for Testing.
    */
   public void close() throws IOException {
     canWrite = false;
@@ -389,4 +390,40 @@ public class TsFileIOWriter {
   public File getFile() {
     return file;
   }
+
+  /**
+   * Remove such ChunkMetadata that its startTime is not in chunkStartTimes
+   * @param chunkStartTimes
+   */
+  public void filterChunks(Map<Path, List<Long>> chunkStartTimes) {
+    Map<Path, Integer> startTimeIdxes = new HashMap<>();
+    chunkStartTimes.forEach((p, t) -> startTimeIdxes.put(p, 0));
+
+    Iterator<ChunkGroupMetaData> chunkGroupMetaDataIterator = chunkGroupMetaDataList.iterator();
+    while (chunkGroupMetaDataIterator.hasNext()) {
+      ChunkGroupMetaData chunkGroupMetaData = chunkGroupMetaDataIterator.next();
+      String deviceId = chunkGroupMetaData.getDeviceID();
+      int chunkNum = chunkGroupMetaData.getChunkMetaDataList().size();
+      Iterator<ChunkMetaData> chunkMetaDataIterator =
+          chunkGroupMetaData.getChunkMetaDataList().iterator();
+      while (chunkMetaDataIterator.hasNext()) {
+        ChunkMetaData chunkMetaData = chunkMetaDataIterator.next();
+        Path path = new Path(deviceId, chunkMetaData.getMeasurementUid());
+        int startTimeIdx = startTimeIdxes.get(path);
+        List<Long> pathChunkStartTimes = chunkStartTimes.get(path);
+        boolean chunkValid = startTimeIdx < pathChunkStartTimes.size()
+            && pathChunkStartTimes.get(startTimeIdx) == chunkMetaData.getStartTime();
+        if (!chunkValid) {
+          chunkGroupMetaDataIterator.remove();
+          chunkNum--;
+          invalidChunkNum++;
+        } else {
+          startTimeIdxes.put(path, startTimeIdx + 1);
+        }
+      }
+      if (chunkNum == 0) {
+        chunkGroupMetaDataIterator.remove();
+      }
+    }
+  }
 }