You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/09/02 09:50:12 UTC

[incubator-iotdb] 02/03: complete load tsfiles module

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 1785cab5b4244c5251a9b43113b064e88913c5dd
Author: lta <li...@163.com>
AuthorDate: Mon Sep 2 17:22:39 2019 +0800

    complete load tsfiles module
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  11 +
 .../iotdb/db/engine/merge/task/MergeFileTask.java  |  23 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 247 +++++++++++++++++++--
 .../db/engine/storagegroup/TsFileResource.java     |   4 +
 .../iotdb/db/sync/receiver/load/FileLoader.java    |  43 +++-
 .../sender/recover/ISyncSenderLogAnalyzer.java     |   3 +-
 .../sync/sender/recover/SyncSenderLogAnalyzer.java |   7 +-
 .../sync/sender/transfer/DataTransferManager.java  |   2 +-
 .../sender/recover/SyncSenderLogAnalyzerTest.java  |  25 +++
 9 files changed, 338 insertions(+), 27 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 3e36516..c96ef2e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.StorageEngineFailureException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -337,4 +338,14 @@ public class StorageEngine implements IService {
     return true;
   }
 
+
+  public void loadNewTsFile(File newTsFile, TsFileResource resource)
+      throws TsFileProcessorException {
+    processorMap.get(newTsFile.getParentFile().getName()).loadNewTsFile(newTsFile, resource);
+  }
+
+  public void deleteTsfile(File deletedTsfile){
+    processorMap.get(deletedTsfile.getParentFile().getName()).deleteTsfile(deletedTsfile);
+  }
+
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index cf1f62d..68d3141 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.db.engine.merge.task;
 
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -148,6 +151,12 @@ class MergeFileTask {
       logger.debug("{} moved merged chunks of {} to the old file", taskName, seqFile);
 
       newFileWriter.getFile().delete();
+
+      File nextMergeVersionFile = getNextMergeVersionFile(seqFile.getFile());
+      FileUtils.moveFile(seqFile.getFile(), nextMergeVersionFile);
+      FileUtils.moveFile(new File(seqFile.getFile(), TsFileResource.RESOURCE_SUFFIX),
+          new File(nextMergeVersionFile, TsFileResource.RESOURCE_SUFFIX));
+      seqFile.setFile(nextMergeVersionFile);
     } finally {
       seqFile.getMergeQueryLock().writeLock().unlock();
     }
@@ -209,12 +218,24 @@ class MergeFileTask {
       TsFileMetaDataCache.getInstance().remove(seqFile);
       DeviceMetaDataCache.getInstance().remove(seqFile);
       seqFile.getFile().delete();
-      FileUtils.moveFile(fileWriter.getFile(), seqFile.getFile());
+
+      File nextMergeVersionFile = getNextMergeVersionFile(seqFile.getFile());
+      FileUtils.moveFile(fileWriter.getFile(), nextMergeVersionFile);
+      FileUtils.moveFile(new File(seqFile.getFile(), TsFileResource.RESOURCE_SUFFIX),
+          new File(nextMergeVersionFile, TsFileResource.RESOURCE_SUFFIX));
+      seqFile.setFile(nextMergeVersionFile);
     } finally {
       seqFile.getMergeQueryLock().writeLock().unlock();
     }
   }
 
+  private File getNextMergeVersionFile(File seqFile) {
+    String[] splits = seqFile.getName().replace(TSFILE_SUFFIX, "").split("-");
+    int mergeVersion = Integer.parseInt(splits[2]) + 1;
+    return new File(seqFile.getParentFile(),
+        splits[0] + "-" + splits[1] + "-" + mergeVersion + TSFILE_SUFFIX);
+  }
+
   private long writeUnmergedChunks(List<Long> chunkStartTimes,
       List<ChunkMetaData> chunkMetaDataList, TsFileSequenceReader reader,
       RestorableTsFileIOWriter fileWriter) throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 1e41c57..4da8e30 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -38,6 +38,7 @@ import java.util.Set;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager;
@@ -67,17 +68,17 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.JobFileManager;
-import org.apache.iotdb.rpc.TSStatusType;
 import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
 import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
+import org.apache.iotdb.rpc.TSStatusType;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.schema.Schema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -206,7 +207,8 @@ public class StorageGroupProcessor {
 
     try {
       // collect TsFiles from sequential and unsequential data directory
-      List<TsFileResource> seqTsFiles = getAllFiles(DirectoryManager.getInstance().getAllSequenceFileFolders());
+      List<TsFileResource> seqTsFiles = getAllFiles(
+          DirectoryManager.getInstance().getAllSequenceFileFolders());
       List<TsFileResource> unseqTsFiles =
           getAllFiles(DirectoryManager.getInstance().getAllUnSequenceFileFolders());
 
@@ -222,7 +224,8 @@ public class StorageGroupProcessor {
           storageGroupSysDir.getPath(), this::mergeEndAction, taskName,
           IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(), storageGroupName);
       logger.info("{} a RecoverMergeTask {} starts...", storageGroupName, taskName);
-      recoverMergeTask.recoverMerge(IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
+      recoverMergeTask
+          .recoverMerge(IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
       if (!IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()) {
         mergingMods.delete();
       }
@@ -297,7 +300,7 @@ public class StorageGroupProcessor {
   }
 
   // TsFileNameComparator compares TsFiles by the version number in its name
-  // ({systemTime}-{versionNum}.tsfile)
+  // ({systemTime}-{versionNum}-{mergeNum}.tsfile)
   public int compareFileName(File o1, File o2) {
     String[] items1 = o1.getName().replace(TSFILE_SUFFIX, "").split("-");
     String[] items2 = o2.getName().replace(TSFILE_SUFFIX, "").split("-");
@@ -397,7 +400,8 @@ public class StorageGroupProcessor {
     boolean result = tsFileProcessor.insertBatch(batchInsertPlan, indexes, results);
 
     // try to update the latest time of the device of this tsRecord
-    if (result && latestTimeForEachDevice.get(batchInsertPlan.getDeviceId()) < batchInsertPlan.getMaxTime()) {
+    if (result && latestTimeForEachDevice.get(batchInsertPlan.getDeviceId()) < batchInsertPlan
+        .getMaxTime()) {
       latestTimeForEachDevice.put(batchInsertPlan.getDeviceId(), batchInsertPlan.getMaxTime());
     }
 
@@ -472,8 +476,9 @@ public class StorageGroupProcessor {
           e);
       IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
     } catch (IOException e) {
-      logger.error("meet IOException when creating TsFileProcessor, change system mode to read-only",
-          e);
+      logger
+          .error("meet IOException when creating TsFileProcessor, change system mode to read-only",
+              e);
       IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
     }
     return tsFileProcessor;
@@ -490,7 +495,7 @@ public class StorageGroupProcessor {
     new File(baseDir, storageGroupName).mkdirs();
 
     String filePath = Paths.get(baseDir, storageGroupName,
-        System.currentTimeMillis() + "-" + versionController.nextVersion()).toString()
+        System.currentTimeMillis() + "-" + versionController.nextVersion()).toString() + "-0"
         + TSFILE_SUFFIX;
 
     if (sequence) {
@@ -552,7 +557,7 @@ public class StorageGroupProcessor {
       this.latestFlushedTimeForEachDevice.clear();
       this.latestTimeForEachDevice.clear();
     } catch (IOException e) {
-      logger.error("Cannot delete files in storage group {}, because", storageGroupName, e);
+      logger.error("Cannot delete files in storage group {}", storageGroupName, e);
     } finally {
       writeUnlock();
     }
@@ -608,7 +613,8 @@ public class StorageGroupProcessor {
           deviceId, measurementId, context);
       List<TsFileResource> unseqResources = getFileReSourceListForQuery(unSequenceFileList,
           deviceId, measurementId, context);
-      QueryDataSource dataSource =  new QueryDataSource(new Path(deviceId, measurementId), seqResources, unseqResources);
+      QueryDataSource dataSource = new QueryDataSource(new Path(deviceId, measurementId),
+          seqResources, unseqResources);
       // used files should be added before mergeLock is unlocked, or they may be deleted by
       // running merge
       // is null only in tests
@@ -855,8 +861,10 @@ public class StorageGroupProcessor {
         mergeResource.setCacheDeviceMeta(true);
 
         MergeTask mergeTask = new MergeTask(mergeResource, storageGroupSysDir.getPath(),
-            this::mergeEndAction, taskName, fullMerge, fileSelector.getConcurrentMergeNum(), storageGroupName);
-        mergingModification = new ModificationFile(storageGroupSysDir + File.separator + MERGING_MODIFICAITON_FILE_NAME);
+            this::mergeEndAction, taskName, fullMerge, fileSelector.getConcurrentMergeNum(),
+            storageGroupName);
+        mergingModification = new ModificationFile(
+            storageGroupSysDir + File.separator + MERGING_MODIFICAITON_FILE_NAME);
         MergeManager.getINSTANCE().submitMainTask(mergeTask);
         if (logger.isInfoEnabled()) {
           logger.info("{} submits a merge task {}, merging {} seqFiles, {} unseqFiles",
@@ -950,6 +958,219 @@ public class StorageGroupProcessor {
     logger.info("{} a merge task ends", storageGroupName);
   }
 
+  /**
+   * Load a new tsfile to storage group processor
+   *
+   * Firstly, determine the loading type of the file, whether it needs to be loaded in sequence list
+   * or unsequence list.
+   *
+   * Secondly, execute the loading process by the type.
+   *
+   * Finally, update the latestTimeForEachDevice and latestFlushedTimeForEachDevice.
+   *
+   * @param newTsFile new tsfile
+   * @param newTsFileResource tsfile resource
+   * @UsedBy sync module.
+   */
+  public void loadNewTsFile(File newTsFile, TsFileResource newTsFileResource)
+      throws TsFileProcessorException {
+    writeLock();
+    mergeLock.writeLock().lock();
+    try {
+      boolean isOverlap = false;
+      int preIndex = -1, subsequentIndex = sequenceFileList.size();
+      // check new tsfile
+      outer:
+      for (int i = 0; i < sequenceFileList.size(); i++) {
+        if (i == sequenceFileList.size() - 1 && sequenceFileList.get(i).getEndTimeMap().isEmpty()) {
+          continue;
+        }
+        int preCnt = 0, subsequenceCnt = 0;
+        for (String device : newTsFileResource.getStartTimeMap().keySet()) {
+          if (sequenceFileList.get(i).getStartTimeMap().containsKey(device)) {
+            long startTime1 = sequenceFileList.get(i).getStartTimeMap().get(device);
+            long endTime1 = sequenceFileList.get(i).getEndTimeMap().get(device);
+            long startTime2 = newTsFileResource.getStartTimeMap().get(device);
+            long endTime2 = newTsFileResource.getEndTimeMap().get(device);
+            if (startTime1 > endTime2) {
+              subsequenceCnt++;
+            } else if (startTime2 > endTime1) {
+              preCnt++;
+            } else {
+              isOverlap = true;
+              break outer;
+            }
+          }
+        }
+        if (preCnt != 0 && subsequenceCnt != 0) {
+          isOverlap = true;
+          break;
+        }
+        if (preCnt == 0 && subsequenceCnt != 0) {
+          subsequentIndex = i;
+          break;
+        }
+        if (preCnt != 0 && subsequenceCnt == 0) {
+          preIndex = i;
+        }
+      }
+
+      // loading tsfile by type
+      if (isOverlap) {
+        loadTsFileByType(-1, newTsFile, newTsFileResource, unSequenceFileList.size());
+      } else {
+        if (subsequentIndex != sequenceFileList.size()) {
+          loadTsFileByType(1, newTsFile, newTsFileResource, subsequentIndex);
+        } else {
+          if (preIndex != -1) {
+            loadTsFileByType(1, newTsFile, newTsFileResource, preIndex + 1);
+          } else {
+            loadTsFileByType(1, newTsFile, newTsFileResource, sequenceFileList.size());
+          }
+        }
+      }
+
+      // update latest time map
+      updateLatestTimeMap(newTsFileResource);
+    } catch (TsFileProcessorException e) {
+      logger.error("Failed to append the tsfile {} to storage group processor {}.",
+          newTsFile.getAbsolutePath(), newTsFile.getParentFile().getName());
+      throw new TsFileProcessorException(e);
+    } finally {
+      mergeLock.writeLock().unlock();
+      writeUnlock();
+    }
+  }
+
+  /**
+   * Update latest time in latestTimeForEachDevice and latestFlushedTimeForEachDevice.
+   *
+   * @UsedBy sync module
+   */
+  private void updateLatestTimeMap(TsFileResource newTsFileResource) {
+    for (Entry<String, Long> entry : newTsFileResource.getEndTimeMap().entrySet()) {
+      String device = entry.getKey();
+      long endTime = newTsFileResource.getEndTimeMap().get(device);
+      if(!latestTimeForEachDevice.containsKey(device) || latestTimeForEachDevice.get(device) < endTime){
+        latestTimeForEachDevice.put(device, endTime);
+      }
+      if(!latestFlushedTimeForEachDevice.containsKey(device) || latestFlushedTimeForEachDevice.get(device) < endTime){
+        latestFlushedTimeForEachDevice.put(device, endTime);
+      }
+    }
+  }
+
+  /**
+   * Execute the loading process by the type.
+   *
+   * @param type load type: 1 sequence tsfile ; 2 unsequence tsfile
+   * @param tsFile tsfile to be loaded
+   * @param tsFileResource tsfile resource to be loaded
+   * @param index the index in sequenceFileList/unSequenceFileList
+   * @UsedBy sync module
+   */
+  private void loadTsFileByType(int type, File tsFile, TsFileResource tsFileResource, int index)
+      throws TsFileProcessorException {
+    File targetFile;
+    if (type == -1) {
+      targetFile =
+          new File(tsFile.getParentFile().getParentFile().getParentFile().getParentFile()
+              .getParentFile(), IoTDBConstant.UNSEQUENCE_FLODER_NAME
+              + File.separatorChar + tsFile.getParentFile().getName() + File.separatorChar
+              + tsFile.getName());
+      tsFileResource.setFile(targetFile);
+      unSequenceFileList.add(index, tsFileResource);
+    } else {
+      targetFile =
+          new File(tsFile.getParentFile().getParentFile().getParentFile().getParentFile()
+              .getParentFile(), IoTDBConstant.SEQUENCE_FLODER_NAME
+              + File.separatorChar + tsFile.getParentFile().getName() + File.separatorChar
+              + tsFile.getName());
+      tsFileResource.setFile(targetFile);
+      sequenceFileList.add(index, tsFileResource);
+    }
+
+    // move file from sync dir to data dir
+    if (!targetFile.getParentFile().exists()) {
+      targetFile.getParentFile().mkdirs();
+    }
+    if (!new File(tsFile, TsFileResource.RESOURCE_SUFFIX).exists() && !new File(
+        targetFile, TsFileResource.RESOURCE_SUFFIX).exists()) {
+      throw new TsFileProcessorException(
+          String
+              .format("The new .resource file {%s} to be loaded does not exist.",
+                  tsFile.getAbsolutePath()));
+    }
+    if (!new File(targetFile, TsFileResource.RESOURCE_SUFFIX).exists() && !new File(
+        tsFile, TsFileResource.RESOURCE_SUFFIX)
+        .renameTo(new File(targetFile, TsFileResource.RESOURCE_SUFFIX))) {
+      throw new TsFileProcessorException(String.format(
+          "File renaming failed when loading .resource file. Origin: %s, Target: %s",
+          new File(tsFile, TsFileResource.RESOURCE_SUFFIX).getAbsolutePath(),
+          new File(targetFile, TsFileResource.RESOURCE_SUFFIX).getAbsolutePath()));
+    }
+    if (!tsFile.exists() && !targetFile.exists()) {
+      throw new TsFileProcessorException(String
+          .format("The new tsfile {%s} to be loaded does not exist.",
+              tsFile.getAbsolutePath()));
+    }
+    if (!targetFile.exists() && !tsFile.renameTo(targetFile)) {
+      throw new TsFileProcessorException(String.format(
+          "File renaming failed when loading tsfile. Origin: %s, Target: %s",
+          tsFile.getAbsolutePath(), targetFile.getAbsolutePath()));
+    }
+  }
+
+  /**
+   * Delete tsfile if it exists which.
+   *
+   * Firstly, remove the TsFileResource from sequenceFileList/unSequenceFileList.
+   *
+   * Secondly, delete the tsfile and .resource file.
+   *
+   * @param deletedTsfile tsfile to be deleted
+   * @UsedBy sync module.
+   */
+  public void deleteTsfile(File deletedTsfile) {
+    writeLock();
+    mergeLock.writeLock().lock();
+    TsFileResource deletedTsFileResource = null;
+    try {
+      Iterator<TsFileResource> sequenceIterator = sequenceFileList.iterator();
+      while (sequenceIterator.hasNext()) {
+        TsFileResource sequenceResource = sequenceIterator.next();
+        if (sequenceResource.getFile().getName().equals(deletedTsfile.getName())) {
+          deletedTsFileResource = sequenceResource;
+          sequenceIterator.remove();
+          break;
+        }
+      }
+      if(deletedTsFileResource == null) {
+        Iterator<TsFileResource> unsequenceIterator = unSequenceFileList.iterator();
+        while (unsequenceIterator.hasNext()) {
+          TsFileResource unsequenceResource = unsequenceIterator.next();
+          if (unsequenceResource.getFile().getName().equals(deletedTsfile.getName())) {
+            deletedTsFileResource = unsequenceResource;
+            unsequenceIterator.remove();
+            break;
+          }
+        }
+      }
+    } finally {
+      mergeLock.writeLock().unlock();
+      writeUnlock();
+    }
+    if(deletedTsFileResource == null){
+      return;
+    }
+    deletedTsFileResource.getMergeQueryLock().writeLock().lock();
+    try {
+      deletedTsFileResource.getFile().delete();
+      new File(deletedTsFileResource.getFile(), TsFileResource.RESOURCE_SUFFIX).delete();
+    } finally {
+      deletedTsFileResource.getMergeQueryLock().writeLock().unlock();
+    }
+  }
 
   public TsFileProcessor getWorkSequenceTsFileProcessor() {
     return workSequenceTsFileProcessor;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 3a7af7e..1aa2136 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -189,6 +189,10 @@ public class TsFileResource {
     return modFile;
   }
 
+  public void setFile(File file) {
+    this.file = file;
+  }
+
   public boolean containsDevice(String deviceId) {
     return startTimeMap.containsKey(deviceId);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
index 19f7909..6bd7d49 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.db.sync.receiver.load;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayDeque;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.sync.sender.conf.Constans;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,10 +79,15 @@ public class FileLoader implements IFileLoader {
           }
         }
         if (!queue.isEmpty()) {
-          handleLoadTask(queue.poll());
+          LoadTask task = queue.poll();
+          try {
+            handleLoadTask(task);
+          }catch (IOException e){
+            LOGGER.error("Can not load task {}", task, e);
+          }
         }
       }
-    } catch (InterruptedException | IOException e) {
+    } catch (InterruptedException e) {
       LOGGER.error("Can not handle load task", e);
     }
   };
@@ -111,32 +119,39 @@ public class FileLoader implements IFileLoader {
   public void handleLoadTask(LoadTask task) throws IOException {
     switch (task.type) {
       case ADD:
-        loadDeletedFile(task.file);
+        loadNewTsfile(task.file);
         break;
       case DELETE:
-        loadNewTsfile(task.file);
+        loadDeletedFile(task.file);
         break;
       default:
         LOGGER.error("Wrong load task type {}", task.type);
     }
   }
 
-  private void loadDeletedFile(File file) throws IOException {
+  private void loadNewTsfile(File newTsFile) throws IOException {
     if (curType != LoadType.DELETE) {
       loadLog.startLoadDeletedFiles();
       curType = LoadType.DELETE;
     }
-    // TODO load deleted file
-    loadLog.finishLoadDeletedFile(file);
+    TsFileResource tsFileResource = new TsFileResource(
+        new File(newTsFile, TsFileResource.RESOURCE_SUFFIX));
+    tsFileResource.deSerialize();
+    try {
+      StorageEngine.getInstance().loadNewTsFile(newTsFile, tsFileResource);
+    } catch (TsFileProcessorException e) {
+      LOGGER.error("Can not load new tsfile {}", newTsFile.getAbsolutePath(), e);
+    }
+    loadLog.finishLoadDeletedFile(newTsFile);
   }
 
-  private void loadNewTsfile(File file) throws IOException {
+  private void loadDeletedFile(File deletedTsFile) throws IOException {
     if (curType != LoadType.ADD) {
       loadLog.startLoadTsFiles();
       curType = LoadType.ADD;
     }
-    // TODO load new tsfile
-    loadLog.finishLoadTsfile(file);
+    StorageEngine.getInstance().deleteTsfile(deletedTsFile);
+    loadLog.finishLoadTsfile(deletedTsFile);
   }
 
 
@@ -165,5 +180,13 @@ public class FileLoader implements IFileLoader {
       this.file = file;
       this.type = type;
     }
+
+    @Override
+    public String toString() {
+      return "LoadTask{" +
+          "file=" + file +
+          ", type=" + type +
+          '}';
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
index d0c09b1..3d7a356 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.sync.sender.recover;
 
+import java.io.IOException;
 import java.util.Set;
 
 /**
@@ -27,7 +28,7 @@ import java.util.Set;
  */
 public interface ISyncSenderLogAnalyzer {
 
-  void recover();
+  void recover() throws IOException;
 
   void loadLastLocalFiles(Set<String> lastLocalFiles);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
index af4f00c..6eaec50 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
@@ -26,6 +26,7 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
+import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.sync.sender.conf.Constans;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,18 +34,20 @@ import org.slf4j.LoggerFactory;
 public class SyncSenderLogAnalyzer implements ISyncSenderLogAnalyzer {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(SyncSenderLogAnalyzer.class);
+  private String senderPath;
   private File currentLocalFile;
   private File lastLocalFile;
   private File syncLogFile;
 
   public SyncSenderLogAnalyzer(String senderPath) {
+    this.senderPath = senderPath;
     this.currentLocalFile = new File(senderPath, Constans.CURRENT_LOCAL_FILE_NAME);
     this.lastLocalFile = new File(senderPath, Constans.LAST_LOCAL_FILE_NAME);
     this.syncLogFile = new File(senderPath, Constans.SYNC_LOG_NAME);
   }
 
   @Override
-  public void recover() {
+  public void recover() throws IOException {
     if (currentLocalFile.exists() && !lastLocalFile.exists()) {
       currentLocalFile.renameTo(lastLocalFile);
     } else {
@@ -57,6 +60,8 @@ public class SyncSenderLogAnalyzer implements ISyncSenderLogAnalyzer {
       lastLocalFiles.addAll(newFiles);
       clearLogger(lastLocalFiles);
     }
+    FileUtils.deleteDirectory(new File(senderPath, Constans.DATA_SNAPSHOT_NAME));
+    syncLogFile.delete();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
index 4969e19..6331d2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
@@ -257,7 +257,7 @@ public class DataTransferManager implements IDataTransferManager {
     }
   }
 
-  private void checkRecovery() {
+  private void checkRecovery() throws IOException {
     new SyncSenderLogAnalyzer(config.getSenderFolderPath()).recover();
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java
index 6b65776..c30eb8f 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java
@@ -100,6 +100,31 @@ public class SyncSenderLogAnalyzerTest {
       assert lastFilesMap.get(entry.getKey()).size() == entry.getValue().size();
       assert lastFilesMap.get(entry.getKey()).containsAll(entry.getValue());
     }
+
+    // delete some files
+    assert !new File(config.getSenderFolderPath(), Constans.SYNC_LOG_NAME).exists();
+    senderLogger = new SyncSenderLogger(
+        new File(config.getSenderFolderPath(), Constans.SYNC_LOG_NAME));
+    manager.getValidFiles(dataDir);
+    assert !isEmpty(manager.getLastLocalFilesMap());
+    senderLogger.startSyncDeletedFilesName();
+    for(Set<File> newTsFiles:allFileList.values()){
+      for(File file: newTsFiles){
+        senderLogger.finishSyncDeletedFileName(file);
+      }
+    }
+    senderLogger.close();
+    // recover log
+    senderLogAnalyzer.recover();
+    manager.getValidFiles(dataDir);
+    assert isEmpty(manager.getLastLocalFilesMap());
+    assert isEmpty(manager.getDeletedFilesMap());
+    Map<String, Set<File>> toBeSyncedFilesMap = manager.getToBeSyncedFilesMap();
+    for (Entry<String, Set<File>> entry : allFileList.entrySet()) {
+      assert toBeSyncedFilesMap.containsKey(entry.getKey());
+      assert toBeSyncedFilesMap.get(entry.getKey()).size() == entry.getValue().size();
+      assert toBeSyncedFilesMap.get(entry.getKey()).containsAll(entry.getValue());
+    }
   }
 
   private boolean isEmpty(Map<String, Set<File>> sendingFileList) {