You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/07/17 03:36:13 UTC

[incubator-iotdb] branch dev_merge updated: refactor RecoverMergeTask

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev_merge
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/dev_merge by this push:
     new b5b7400  refactor RecoverMergeTask
b5b7400 is described below

commit b5b7400d00b21aa6a94b406fe572f2be2e3698ab
Author: 江天 <jt...@163.com>
AuthorDate: Wed Jul 17 11:33:49 2019 +0800

    refactor RecoverMergeTask
---
 .../LogAnalyzer.java}                              | 188 ++++------------
 .../iotdb/db/engine/merge/task/MergeTask.java      |   1 +
 .../db/engine/merge/task/RecoverMergeTask.java     | 239 ++++-----------------
 3 files changed, 87 insertions(+), 341 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java
similarity index 52%
copy from iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
copy to iotdb/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java
index ebcc425..5c7a004 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java
@@ -17,175 +17,49 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.engine.merge.task;
+package org.apache.iotdb.db.engine.merge.recover;
 
 import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileReader;
 import java.io.IOException;
-import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
-import org.apache.iotdb.db.utils.MergeUtils;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.task.MergeTask;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.db.utils.MergeUtils;
 import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class RecoverMergeTask extends MergeTask {
+public class LogAnalyzer {
 
-  private static final Logger logger = LoggerFactory.getLogger(RecoverMergeTask.class);
+  private static final Logger logger = LoggerFactory.getLogger(LogAnalyzer.class);
 
-  private String currLine;
-  private List<TsFileResource> mergeSeqFiles = new ArrayList<>();
-  private List<TsFileResource> mergeUnseqFiles = new ArrayList<>();
+  private MergeResource resource;
+  private String taskName;
+  private File logFile;
 
   private Map<File, Long> fileLastPositions = new HashMap<>();
   private Map<File, Long> tempFileLastPositions = new HashMap<>();
 
-  private List<Path> unmergedPaths;
   private List<Path> mergedPaths = new ArrayList<>();
+  private List<Path> unmergedPaths;
   private List<TsFileResource> unmergedFiles;
+  private String currLine;
 
-  public RecoverMergeTask(String storageGroupDir, MergeCallback callback, String taskName,
-      boolean fullMerge) throws IOException {
-    super(null, null, storageGroupDir, callback, taskName, fullMerge);
-  }
-
-  public void recoverMerge(boolean continueMerge) throws IOException {
-    File logFile = new File(storageGroupDir, MergeLogger.MERGE_LOG_NAME);
-    if (!logFile.exists()) {
-      logger.info("{} no merge.log, merge recovery ends", taskName);
-      return;
-    }
-    mergeSeqFiles = new ArrayList<>();
-    mergeUnseqFiles = new ArrayList<>();
-    long startTime = System.currentTimeMillis();
-
-    Status status = determineStatus(logFile);
-    if (logger.isInfoEnabled()) {
-      logger.info("{} merge recovery status determined: {} after {}ms", taskName, status,
-          (System.currentTimeMillis() - startTime));
-    }
-    switch (status) {
-      case NONE:
-        logFile.delete();
-        break;
-      case FILES_LOGGED:
-        if (continueMerge) {
-          mergeLogger = new MergeLogger(storageGroupDir);
-          truncateFiles();
-          recoverChunkCounts();
-          MergeSeriesTask mergeSeriesTask = new MergeSeriesTask(mergedChunkCnt, unmergedChunkCnt,
-              unmergedChunkStartTimes, taskName, mergeLogger, resource, fullMerge, unmergedPaths);
-          mergeSeriesTask.mergeSeries();
-
-          MergeFileTask mergeFileTask = new MergeFileTask(taskName,mergedChunkCnt, unmergedChunkCnt,
-              unmergedChunkStartTimes, mergeLogger, resource, resource.getSeqFiles());
-          mergeFileTask.mergeFiles();
-        }
-        cleanUp(continueMerge);
-        break;
-      case ALL_TS_MERGED:
-        if (continueMerge) {
-          mergeLogger = new MergeLogger(storageGroupDir);
-          truncateFiles();
-          recoverChunkCounts();
-          resource.setSeqFiles(unmergedFiles);
-          MergeFileTask mergeFileTask = new MergeFileTask(taskName,mergedChunkCnt, unmergedChunkCnt,
-              unmergedChunkStartTimes, mergeLogger, resource, unmergedFiles);
-          mergeFileTask.mergeFiles();
-        } else {
-          // NOTICE: although some of the seqFiles may have been truncated, later TsFile recovery
-          // will recover them, so they are not a concern here
-          truncateFiles();
-        }
-        cleanUp(continueMerge);
-        break;
-      case MERGE_END:
-        cleanUp(continueMerge);
-        break;
-    }
-    if (logger.isInfoEnabled()) {
-      logger.info("{} merge recovery ends after {}ms", taskName,
-          (System.currentTimeMillis() - startTime));
-    }
-  }
-
-
-  // scan metadata to compute how many chunks are merged/unmerged so at last we can decide to
-  // move the merged chunks or the unmerged chunks
-  private void recoverChunkCounts() throws IOException {
-    logger.debug("{} recovering chunk counts", taskName);
-    for (TsFileResource tsFileResource : resource.getSeqFiles()) {
-      RestorableTsFileIOWriter mergeFileWriter = resource.getMergeFileWriter(tsFileResource);
-      mergeFileWriter.makeMetadataVisible();
-      unmergedChunkStartTimes.put(tsFileResource, new HashMap<>());
-      for(Path path : mergedPaths) {
-        recoverChunkCounts(path, tsFileResource, mergeFileWriter);
-      }
-    }
-  }
-
-  private void recoverChunkCounts(Path path, TsFileResource tsFileResource,
-      RestorableTsFileIOWriter mergeFileWriter) throws IOException {
-    unmergedChunkStartTimes.get(tsFileResource).put(path, new ArrayList<>());
-
-    List<ChunkMetaData> seqFileChunks = resource.queryChunkMetadata(path, tsFileResource);
-    List<ChunkMetaData> mergeFileChunks =
-        mergeFileWriter.getVisibleMetadataList(path.getDevice(), path.getMeasurement(), null);
-    mergedChunkCnt.compute(tsFileResource, (k, v) -> v == null ? mergeFileChunks.size() :
-        v + mergeFileChunks.size());
-    int seqIndex = 0;
-    int mergeIndex = 0;
-    int unmergedCnt = 0;
-    while (seqIndex < seqFileChunks.size() && mergeIndex < mergeFileChunks.size()) {
-      ChunkMetaData seqChunk = seqFileChunks.get(seqIndex);
-      ChunkMetaData mergedChunk = mergeFileChunks.get(mergeIndex);
-      if (seqChunk.getStartTime() < mergedChunk.getStartTime()) {
-        // this seqChunk is unmerged
-        unmergedCnt ++;
-        seqIndex ++;
-        unmergedChunkStartTimes.get(tsFileResource).get(path).add(seqChunk.getStartTime());
-      } else if (mergedChunk.getStartTime() <= seqChunk.getStartTime() &&
-          seqChunk.getStartTime() <= mergedChunk.getEndTime()) {
-        // this seqChunk is merged
-        seqIndex ++;
-      } else {
-        // seqChunk.startTime > mergeChunk.endTime, find next mergedChunk that may cover the
-        // seqChunk
-        mergeIndex ++;
-      }
-    }
-    int finalUnmergedCnt = unmergedCnt;
-    unmergedChunkCnt.compute(tsFileResource, (k, v) -> v == null ? finalUnmergedCnt :
-        v + finalUnmergedCnt);
-  }
-
-  private void truncateFiles() throws IOException {
-    logger.debug("{} truncating {} files", taskName, fileLastPositions.size());
-    for (Entry<File, Long> entry : fileLastPositions.entrySet()) {
-      File file = entry.getKey();
-      Long lastPosition = entry.getValue();
-      if (file.exists() && file.length() != lastPosition) {
-        try (FileInputStream fileInputStream = new FileInputStream(file)) {
-          FileChannel channel = fileInputStream.getChannel();
-          channel.truncate(lastPosition);
-          channel.close();
-        }
-      }
-    }
+  public LogAnalyzer(MergeResource resource, String taskName, File logFile) {
+    this.resource = resource;
+    this.taskName = taskName;
+    this.logFile = logFile;
   }
 
-  private Status determineStatus(File logFile) throws IOException {
+  public Status analyze() throws IOException {
     Status status = Status.NONE;
     try (BufferedReader bufferedReader =
         new BufferedReader(new FileReader(logFile))) {
@@ -199,8 +73,6 @@ public class RecoverMergeTask extends MergeTask {
         }
         if (currLine.equals(MergeLogger.STR_MERGE_START)) {
           status = Status.FILES_LOGGED;
-          resource.setSeqFiles(mergeSeqFiles);
-          resource.setUnseqFiles(mergeUnseqFiles);
           for (TsFileResource seqFile : resource.getSeqFiles()) {
             File mergeFile = new File(seqFile.getFile().getPath() + MergeTask.MERGE_SUFFIX);
             fileLastPositions.put(mergeFile, 0L);
@@ -223,6 +95,7 @@ public class RecoverMergeTask extends MergeTask {
 
   private void analyzeSeqFiles(BufferedReader bufferedReader) throws IOException {
     long startTime = System.currentTimeMillis();
+    List<TsFileResource> mergeSeqFiles = new ArrayList<>();
     while ((currLine = bufferedReader.readLine()) != null) {
       if (currLine.equals(MergeLogger.STR_UNSEQ_FILES)) {
         break;
@@ -241,10 +114,12 @@ public class RecoverMergeTask extends MergeTask {
       logger.debug("{} found {} seq files after {}ms", taskName, mergeSeqFiles.size(),
           (System.currentTimeMillis() - startTime));
     }
+    resource.setSeqFiles(mergeSeqFiles);
   }
 
   private void analyzeUnseqFiles(BufferedReader bufferedReader) throws IOException {
     long startTime = System.currentTimeMillis();
+    List<TsFileResource> mergeUnseqFiles = new ArrayList<>();
     while ((currLine = bufferedReader.readLine()) != null) {
       if (currLine.equals(MergeLogger.STR_MERGE_START)) {
         break;
@@ -263,6 +138,7 @@ public class RecoverMergeTask extends MergeTask {
       logger.debug("{} found {} seq files after {}ms", taskName, mergeUnseqFiles.size(),
           (System.currentTimeMillis() - startTime));
     }
+    resource.setUnseqFiles(mergeUnseqFiles);
   }
 
   private void analyzeMergedSeries(BufferedReader bufferedReader, List<Path> unmergedPaths) throws IOException {
@@ -294,7 +170,7 @@ public class RecoverMergeTask extends MergeTask {
     }
     if (logger.isDebugEnabled()) {
       logger.debug("{} found {} series have already been merged after {}ms", taskName,
-          mergeSeqFiles.size(), (System.currentTimeMillis() - startTime));
+          mergedPaths.size(), (System.currentTimeMillis() - startTime));
     }
   }
 
@@ -304,7 +180,7 @@ public class RecoverMergeTask extends MergeTask {
     int mergedCnt = 0;
     while ((currLine = bufferedReader.readLine()) != null) {
       if (currLine.equals(MergeLogger.STR_MERGE_END)) {
-         break;
+        break;
       }
       if (currLine.contains(MergeLogger.STR_START)) {
         String[] splits = currLine.split(" ");
@@ -327,11 +203,27 @@ public class RecoverMergeTask extends MergeTask {
     }
     if (logger.isDebugEnabled()) {
       logger.debug("{} found {} files have already been merged after {}ms", taskName,
-         mergedCnt, (System.currentTimeMillis() - startTime));
+          mergedCnt, (System.currentTimeMillis() - startTime));
     }
   }
 
-  enum Status {
+  public enum Status {
     NONE, FILES_LOGGED, ALL_TS_MERGED, MERGE_END
   }
+
+  public List<Path> getUnmergedPaths() {
+    return unmergedPaths;
+  }
+
+  public List<TsFileResource> getUnmergedFiles() {
+    return unmergedFiles;
+  }
+
+  public List<Path> getMergedPaths() {
+    return mergedPaths;
+  }
+
+  public Map<File, Long> getFileLastPositions() {
+    return fileLastPositions;
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
index 43e3d0e..64dd4da 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
@@ -134,6 +134,7 @@ public class MergeTask implements Callable<Void> {
 
     File logFile = new File(storageGroupDir, MergeLogger.MERGE_LOG_NAME);
     if (executeCallback) {
+      // make sure merge.log is not deleted before unseqFiles are cleared
       callback.call(resource.getSeqFiles(), resource.getUnseqFiles(), logFile);
     } else {
       logFile.delete();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
index ebcc425..809be34 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
@@ -19,20 +19,17 @@
 
 package org.apache.iotdb.db.engine.merge.task;
 
-import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileReader;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Map.Entry;
+import org.apache.iotdb.db.engine.merge.recover.LogAnalyzer;
+import org.apache.iotdb.db.engine.merge.recover.LogAnalyzer.Status;
 import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
-import org.apache.iotdb.db.utils.MergeUtils;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -44,16 +41,8 @@ public class RecoverMergeTask extends MergeTask {
 
   private static final Logger logger = LoggerFactory.getLogger(RecoverMergeTask.class);
 
-  private String currLine;
-  private List<TsFileResource> mergeSeqFiles = new ArrayList<>();
-  private List<TsFileResource> mergeUnseqFiles = new ArrayList<>();
+  private LogAnalyzer analyzer;
 
-  private Map<File, Long> fileLastPositions = new HashMap<>();
-  private Map<File, Long> tempFileLastPositions = new HashMap<>();
-
-  private List<Path> unmergedPaths;
-  private List<Path> mergedPaths = new ArrayList<>();
-  private List<TsFileResource> unmergedFiles;
 
   public RecoverMergeTask(String storageGroupDir, MergeCallback callback, String taskName,
       boolean fullMerge) throws IOException {
@@ -66,11 +55,10 @@ public class RecoverMergeTask extends MergeTask {
       logger.info("{} no merge.log, merge recovery ends", taskName);
       return;
     }
-    mergeSeqFiles = new ArrayList<>();
-    mergeUnseqFiles = new ArrayList<>();
     long startTime = System.currentTimeMillis();
 
-    Status status = determineStatus(logFile);
+    analyzer = new LogAnalyzer(resource, taskName, logFile);
+    Status status = analyzer.analyze();
     if (logger.isInfoEnabled()) {
       logger.info("{} merge recovery status determined: {} after {}ms", taskName, status,
           (System.currentTimeMillis() - startTime));
@@ -80,39 +68,16 @@ public class RecoverMergeTask extends MergeTask {
         logFile.delete();
         break;
       case FILES_LOGGED:
-        if (continueMerge) {
-          mergeLogger = new MergeLogger(storageGroupDir);
-          truncateFiles();
-          recoverChunkCounts();
-          MergeSeriesTask mergeSeriesTask = new MergeSeriesTask(mergedChunkCnt, unmergedChunkCnt,
-              unmergedChunkStartTimes, taskName, mergeLogger, resource, fullMerge, unmergedPaths);
-          mergeSeriesTask.mergeSeries();
-
-          MergeFileTask mergeFileTask = new MergeFileTask(taskName,mergedChunkCnt, unmergedChunkCnt,
-              unmergedChunkStartTimes, mergeLogger, resource, resource.getSeqFiles());
-          mergeFileTask.mergeFiles();
-        }
-        cleanUp(continueMerge);
+        resumeAfterFilesLogged(continueMerge);
         break;
       case ALL_TS_MERGED:
-        if (continueMerge) {
-          mergeLogger = new MergeLogger(storageGroupDir);
-          truncateFiles();
-          recoverChunkCounts();
-          resource.setSeqFiles(unmergedFiles);
-          MergeFileTask mergeFileTask = new MergeFileTask(taskName,mergedChunkCnt, unmergedChunkCnt,
-              unmergedChunkStartTimes, mergeLogger, resource, unmergedFiles);
-          mergeFileTask.mergeFiles();
-        } else {
-          // NOTICE: although some of the seqFiles may have been truncated, later TsFile recovery
-          // will recover them, so they are not a concern here
-          truncateFiles();
-        }
-        cleanUp(continueMerge);
+        resumeAfterAllTsMerged(continueMerge);
         break;
       case MERGE_END:
         cleanUp(continueMerge);
         break;
+      default:
+        throw new UnsupportedOperationException(taskName + " found unrecognized status " + status);
     }
     if (logger.isInfoEnabled()) {
       logger.info("{} merge recovery ends after {}ms", taskName,
@@ -120,6 +85,40 @@ public class RecoverMergeTask extends MergeTask {
     }
   }
 
+  private void resumeAfterFilesLogged(boolean continueMerge) throws IOException {
+    if (continueMerge) {
+      resumeMerge();
+      MergeSeriesTask mergeSeriesTask = new MergeSeriesTask(mergedChunkCnt, unmergedChunkCnt,
+          unmergedChunkStartTimes, taskName, mergeLogger, resource, fullMerge, analyzer.getUnmergedPaths());
+      mergeSeriesTask.mergeSeries();
+
+      MergeFileTask mergeFileTask = new MergeFileTask(taskName,mergedChunkCnt, unmergedChunkCnt,
+          unmergedChunkStartTimes, mergeLogger, resource, resource.getSeqFiles());
+      mergeFileTask.mergeFiles();
+    }
+    cleanUp(continueMerge);
+  }
+
+  private void resumeAfterAllTsMerged(boolean continueMerge) throws IOException {
+    if (continueMerge) {
+      resumeMerge();
+      MergeFileTask mergeFileTask = new MergeFileTask(taskName,mergedChunkCnt, unmergedChunkCnt,
+          unmergedChunkStartTimes, mergeLogger, resource, analyzer.getUnmergedFiles());
+      mergeFileTask.mergeFiles();
+    } else {
+      // NOTICE: although some of the seqFiles may have been truncated, later TsFile recovery
+      // will recover them, so they are not a concern here
+      truncateFiles();
+    }
+    cleanUp(continueMerge);
+  }
+
+  private void resumeMerge() throws IOException {
+    mergeLogger = new MergeLogger(storageGroupDir);
+    truncateFiles();
+    recoverChunkCounts();
+  }
+
 
   // scan metadata to compute how many chunks are merged/unmerged so at last we can decide to
   // move the merged chunks or the unmerged chunks
@@ -129,7 +128,7 @@ public class RecoverMergeTask extends MergeTask {
       RestorableTsFileIOWriter mergeFileWriter = resource.getMergeFileWriter(tsFileResource);
       mergeFileWriter.makeMetadataVisible();
       unmergedChunkStartTimes.put(tsFileResource, new HashMap<>());
-      for(Path path : mergedPaths) {
+      for(Path path : analyzer.getMergedPaths()) {
         recoverChunkCounts(path, tsFileResource, mergeFileWriter);
       }
     }
@@ -171,8 +170,8 @@ public class RecoverMergeTask extends MergeTask {
   }
 
   private void truncateFiles() throws IOException {
-    logger.debug("{} truncating {} files", taskName, fileLastPositions.size());
-    for (Entry<File, Long> entry : fileLastPositions.entrySet()) {
+    logger.debug("{} truncating {} files", taskName, analyzer.getFileLastPositions().size());
+    for (Entry<File, Long> entry : analyzer.getFileLastPositions().entrySet()) {
       File file = entry.getKey();
       Long lastPosition = entry.getValue();
       if (file.exists() && file.length() != lastPosition) {
@@ -185,153 +184,7 @@ public class RecoverMergeTask extends MergeTask {
     }
   }
 
-  private Status determineStatus(File logFile) throws IOException {
-    Status status = Status.NONE;
-    try (BufferedReader bufferedReader =
-        new BufferedReader(new FileReader(logFile))) {
-      currLine = bufferedReader.readLine();
-      if (currLine != null) {
-        if (currLine.equals(MergeLogger.STR_SEQ_FILES)) {
-          analyzeSeqFiles(bufferedReader);
-        }
-        if (currLine.equals(MergeLogger.STR_UNSEQ_FILES)) {
-          analyzeUnseqFiles(bufferedReader);
-        }
-        if (currLine.equals(MergeLogger.STR_MERGE_START)) {
-          status = Status.FILES_LOGGED;
-          resource.setSeqFiles(mergeSeqFiles);
-          resource.setUnseqFiles(mergeUnseqFiles);
-          for (TsFileResource seqFile : resource.getSeqFiles()) {
-            File mergeFile = new File(seqFile.getFile().getPath() + MergeTask.MERGE_SUFFIX);
-            fileLastPositions.put(mergeFile, 0L);
-          }
-          unmergedPaths = MergeUtils.collectPaths(resource);
-          analyzeMergedSeries(bufferedReader, unmergedPaths);
-        }
-        if (currLine.equals(MergeLogger.STR_ALL_TS_END)) {
-          status = Status.ALL_TS_MERGED;
-          unmergedFiles = resource.getSeqFiles();
-          analyzeMergedFiles(bufferedReader);
-        }
-        if (currLine.equals(MergeLogger.STR_MERGE_END)) {
-          status = Status.MERGE_END;
-        }
-      }
-    }
-    return status;
-  }
 
-  private void analyzeSeqFiles(BufferedReader bufferedReader) throws IOException {
-    long startTime = System.currentTimeMillis();
-    while ((currLine = bufferedReader.readLine()) != null) {
-      if (currLine.equals(MergeLogger.STR_UNSEQ_FILES)) {
-        break;
-      }
-      Iterator<TsFileResource> iterator = resource.getSeqFiles().iterator();
-      while (iterator.hasNext()) {
-        TsFileResource seqFile = iterator.next();
-        if (seqFile.getFile().getAbsolutePath().equals(currLine)) {
-          mergeSeqFiles.add(seqFile);
-          iterator.remove();
-          break;
-        }
-      }
-    }
-    if (logger.isDebugEnabled()) {
-      logger.debug("{} found {} seq files after {}ms", taskName, mergeSeqFiles.size(),
-          (System.currentTimeMillis() - startTime));
-    }
-  }
 
-  private void analyzeUnseqFiles(BufferedReader bufferedReader) throws IOException {
-    long startTime = System.currentTimeMillis();
-    while ((currLine = bufferedReader.readLine()) != null) {
-      if (currLine.equals(MergeLogger.STR_MERGE_START)) {
-        break;
-      }
-      Iterator<TsFileResource> iterator = resource.getUnseqFiles().iterator();
-      while (iterator.hasNext()) {
-        TsFileResource unseqFile = iterator.next();
-        if (unseqFile.getFile().getAbsolutePath().equals(currLine)) {
-          mergeUnseqFiles.add(unseqFile);
-          iterator.remove();
-          break;
-        }
-      }
-    }
-    if (logger.isDebugEnabled()) {
-      logger.debug("{} found {} seq files after {}ms", taskName, mergeUnseqFiles.size(),
-          (System.currentTimeMillis() - startTime));
-    }
-  }
 
-  private void analyzeMergedSeries(BufferedReader bufferedReader, List<Path> unmergedPaths) throws IOException {
-    Path currTS = null;
-    long startTime = System.currentTimeMillis();
-    while ((currLine = bufferedReader.readLine()) != null) {
-      if (currLine.equals(MergeLogger.STR_ALL_TS_END)) {
-        break;
-      }
-      if (currLine.contains(MergeLogger.STR_START)) {
-        // a TS starts to merge
-        String[] splits = currLine.split(" ");
-        currTS = new Path(splits[0]);
-        tempFileLastPositions.clear();
-      } else if (!currLine.contains(MergeLogger.STR_END)) {
-        // file position
-        String[] splits = currLine.split(" ");
-        File file = new File(splits[0]);
-        Long position = Long.parseLong(splits[1]);
-        tempFileLastPositions.put(file, position);
-      } else {
-        // a TS ends merging
-        unmergedPaths.remove(currTS);
-        for (Entry<File, Long> entry : tempFileLastPositions.entrySet()) {
-          fileLastPositions.put(entry.getKey(), entry.getValue());
-        }
-        mergedPaths.add(currTS);
-      }
-    }
-    if (logger.isDebugEnabled()) {
-      logger.debug("{} found {} series have already been merged after {}ms", taskName,
-          mergeSeqFiles.size(), (System.currentTimeMillis() - startTime));
-    }
-  }
-
-  private void analyzeMergedFiles(BufferedReader bufferedReader) throws IOException {
-    File currFile = null;
-    long startTime = System.currentTimeMillis();
-    int mergedCnt = 0;
-    while ((currLine = bufferedReader.readLine()) != null) {
-      if (currLine.equals(MergeLogger.STR_MERGE_END)) {
-         break;
-      }
-      if (currLine.contains(MergeLogger.STR_START)) {
-        String[] splits = currLine.split(" ");
-        currFile = new File(splits[0]);
-        Long lastPost = Long.parseLong(splits[2]);
-        fileLastPositions.put(currFile, lastPost);
-      } else if (currLine.contains(MergeLogger.STR_END)) {
-        fileLastPositions.remove(currFile);
-        String seqFilePath = currFile.getAbsolutePath().replace(MergeTask.MERGE_SUFFIX, "");
-        Iterator<TsFileResource> unmergedFileIter = unmergedFiles.iterator();
-        while (unmergedFileIter.hasNext()) {
-          TsFileResource seqFile = unmergedFileIter.next();
-          if (seqFile.getFile().getAbsolutePath().equals(seqFilePath)) {
-            mergedCnt ++;
-            unmergedFileIter.remove();
-            break;
-          }
-        }
-      }
-    }
-    if (logger.isDebugEnabled()) {
-      logger.debug("{} found {} files have already been merged after {}ms", taskName,
-         mergedCnt, (System.currentTimeMillis() - startTime));
-    }
-  }
-
-  enum Status {
-    NONE, FILES_LOGGED, ALL_TS_MERGED, MERGE_END
-  }
 }