You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/08/22 03:14:15 UTC

[incubator-iotdb] 04/05: complete sync sender module

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit b15ec6bf1bc5dcd9779cfc6e1de5b9afbca69691
Author: lta <li...@163.com>
AuthorDate: Thu Aug 22 11:11:56 2019 +0800

    complete sync sender module
---
 .../org/apache/iotdb/db/sync/package-info.java     |  19 +
 .../db/sync/receiver/transfer/SyncServiceImpl.java | 302 ----------------
 .../apache/iotdb/db/sync/sender/conf/Constans.java |  15 +-
 .../db/sync/sender/manage/ISyncFileManager.java    |  24 ++
 .../db/sync/sender/manage/SyncFileManager.java     |  47 ++-
 .../sender/recover/ISyncSenderLogAnalyzer.java     |   9 +-
 .../db/sync/sender/recover/ISyncSenderLogger.java  |  26 ++
 .../sync/sender/recover/SyncSenderLogAnalyzer.java |   3 +-
 .../sync/sender/transfer/DataTransferManager.java  | 397 ++++++++++++---------
 .../sync/sender/transfer/IDataTransferManager.java |  42 ++-
 10 files changed, 377 insertions(+), 507 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/sync/package-info.java b/server/src/main/java/org/apache/iotdb/db/sync/package-info.java
index 97a4ec5..f3110e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/package-info.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/package-info.java
@@ -17,4 +17,23 @@
  * under the License.
  */
 
+/**
+ * <p>
+ * Package Sync is a suite tool that periodically uploads persistent tsfiles from the sender disk to
+ * the receiver and loads them. With merge module, synchronous update of write, update and delete
+ * operations can be synced.
+ *
+ * On the sender side of the sync, the sync module is a separate process, independent of the IoTDB
+ * process. It can be started and closed through a separate script.
+ *
+ * On the receiver side of the sync, the sync module is embedded in the engine of IoTDB and is in
+ * the same process with IoTDB. The receiver module listens for a separate port. Before using it, it
+ * needs to set up a whitelist at the sync receiver, which is expressed as a network segment. The
+ * receiver only accepts the data transferred from the sender located in the whitelist segment.
+ *
+ * Due to the IoTDB system supports multiple directories of data files, it will perform sub-tasks
+ * according to disks in every complete synchronization task, because hard links are needed in the
+ * execution process. Hard links can not be operated across disk partitions, and a synchronization
+ * task will be performed in turn according to disks.
+ */
 package org.apache.iotdb.db.sync;
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index 02bb5b6..61ff2ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -391,305 +391,6 @@ public class SyncServiceImpl implements SyncService.Iface {
   }
 
   /**
-   * Get all tsfiles' info which are sent from sender, it is preparing for merging these data
-   */
-  public void getFileNodeInfo() throws IOException {
-    File dataFileRoot = new File(syncDataPath);
-    File[] files = dataFileRoot.listFiles();
-    int processedNum = 0;
-    for (File storageGroupPB : files) {
-      List<String> filesPath = new ArrayList<>();
-      File[] filesSG = storageGroupPB.listFiles();
-      for (File fileTF : filesSG) { // fileTF means TsFiles
-        Map<String, Long> startTimeMap = new HashMap<>();
-        Map<String, Long> endTimeMap = new HashMap<>();
-        TsFileSequenceReader reader = null;
-        try {
-          reader = new TsFileSequenceReader(fileTF.getPath());
-          Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
-          Iterator<String> it = deviceIdMap.keySet().iterator();
-          while (it.hasNext()) {
-            String key = it.next();
-            TsDeviceMetadataIndex device = deviceIdMap.get(key);
-            startTimeMap.put(key, device.getStartTime());
-            endTimeMap.put(key, device.getEndTime());
-          }
-        } catch (IOException e) {
-          logger.error("Unable to read tsfile {}", fileTF.getPath());
-          throw new IOException(e);
-        } finally {
-          try {
-            if (reader != null) {
-              reader.close();
-            }
-          } catch (IOException e) {
-            logger.error("Cannot close tsfile stream {}", fileTF.getPath());
-            throw new IOException(e);
-          }
-        }
-        fileNodeStartTime.get().put(fileTF.getPath(), startTimeMap);
-        fileNodeEndTime.get().put(fileTF.getPath(), endTimeMap);
-        filesPath.add(fileTF.getPath());
-        processedNum++;
-        logger.info(String
-            .format("Get tsfile info has complete : %d/%d", processedNum, fileNum.get()));
-        fileNodeMap.get().put(storageGroupPB.getName(), filesPath);
-      }
-    }
-  }
-
-
-  /**
-   * It is to merge data. If data in the tsfile is new, append the tsfile to the storage group
-   * directly. If data in the tsfile is old, it has two strategy to merge.It depends on the
-   * possibility of updating historical data.
-   */
-  public void loadData() throws StorageEngineException {
-    syncDataPath = FilePathUtils.regularizePath(syncDataPath);
-    int processedNum = 0;
-    for (String storageGroup : fileNodeMap.get().keySet()) {
-      List<String> filesPath = fileNodeMap.get().get(storageGroup);
-      /**  before load external tsFile, it is necessary to order files in the same storage group **/
-      Collections.sort(filesPath, (o1, o2) -> {
-        Map<String, Long> startTimePath1 = fileNodeStartTime.get().get(o1);
-        Map<String, Long> endTimePath2 = fileNodeEndTime.get().get(o2);
-        for (Entry<String, Long> entry : endTimePath2.entrySet()) {
-          if (startTimePath1.containsKey(entry.getKey())) {
-            if (startTimePath1.get(entry.getKey()) > entry.getValue()) {
-              return 1;
-            } else {
-              return -1;
-            }
-          }
-        }
-        return 0;
-      });
-
-      for (String path : filesPath) {
-        // get startTimeMap and endTimeMap
-        Map<String, Long> startTimeMap = fileNodeStartTime.get().get(path);
-        Map<String, Long> endTimeMap = fileNodeEndTime.get().get(path);
-
-        // create a new fileNode
-        String header = syncDataPath;
-        String relativePath = path.substring(header.length());
-        TsFileResource fileNode = new TsFileResource(
-            new File(DirectoryManager.getInstance().getNextFolderIndexForSequenceFile() +
-                File.separator + relativePath), startTimeMap, endTimeMap
-        );
-        // call interface of load external file
-        try {
-          if (!STORAGE_GROUP_MANAGER.appendFileToStorageGroupProcessor(storageGroup, fileNode, path)) {
-            // it is a file with unsequence data
-            if (config.isUpdateHistoricalDataPossibility()) {
-              loadOldData(path);
-            } else {
-              List<String> overlapFiles = STORAGE_GROUP_MANAGER.getOverlapFiles(
-                  storageGroup,
-                  fileNode, uuid.get());
-              if (overlapFiles.isEmpty()) {
-                loadOldData(path);
-              } else {
-                loadOldData(path, overlapFiles);
-              }
-            }
-          }
-        } catch (StorageEngineException | IOException | ProcessorException e) {
-          logger.error("Can not load external file {}", path);
-          throw new StorageEngineException(e);
-        }
-        processedNum++;
-        logger.info(String
-            .format("Merging files has completed : %d/%d", processedNum, fileNum.get()));
-      }
-    }
-  }
-
-  /**
-   * Insert all data in the tsfile into IoTDB.
-   */
-  public void loadOldData(String filePath) throws IOException, ProcessorException {
-    Set<String> timeseriesSet = new HashSet<>();
-    TsFileSequenceReader reader = null;
-    QueryProcessExecutor insertExecutor = new QueryProcessExecutor();
-    try {
-      /** use tsfile reader to get data **/
-      reader = new TsFileSequenceReader(filePath);
-      Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
-      Iterator<Entry<String, TsDeviceMetadataIndex>> entryIterator = deviceIdMap.entrySet()
-          .iterator();
-      while (entryIterator.hasNext()) {
-        Entry<String, TsDeviceMetadataIndex> deviceMIEntry = entryIterator.next();
-        String deviceId = deviceMIEntry.getKey();
-        TsDeviceMetadataIndex deviceMI = deviceMIEntry.getValue();
-        TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(deviceMI);
-        List<ChunkGroupMetaData> rowGroupMetadataList = deviceMetadata.getChunkGroupMetaDataList();
-        timeseriesSet.clear();
-        /** firstly, get all timeseries in the same device **/
-        for (ChunkGroupMetaData chunkGroupMetaData : rowGroupMetadataList) {
-          List<ChunkMetaData> chunkMetaDataList = chunkGroupMetaData
-              .getChunkMetaDataList();
-          for (ChunkMetaData chunkMetaData : chunkMetaDataList) {
-            String measurementUID = chunkMetaData.getMeasurementUid();
-            measurementUID = deviceId + "." + measurementUID;
-            timeseriesSet.add(measurementUID);
-          }
-        }
-        /** Secondly, use tsFile Reader to form InsertPlan **/
-        ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader);
-        List<Path> paths = new ArrayList<>();
-        paths.clear();
-        for (String timeseries : timeseriesSet) {
-          paths.add(new Path(timeseries));
-        }
-        QueryExpression queryExpression = QueryExpression.create(paths, null);
-        QueryDataSet queryDataSet = readTsFile.query(queryExpression);
-        while (queryDataSet.hasNext()) {
-          RowRecord record = queryDataSet.next();
-          List<Field> fields = record.getFields();
-          List<String> measurementList = new ArrayList<>();
-          List<String> insertValues = new ArrayList<>();
-          for (int i = 0; i < fields.size(); i++) {
-            Field field = fields.get(i);
-            if (!field.isNull()) {
-              measurementList.add(paths.get(i).getMeasurement());
-              if (fields.get(i).getDataType() == TSDataType.TEXT) {
-                insertValues.add(String.format("'%s'", field.toString()));
-              } else {
-                insertValues.add(String.format("%s", field.toString()));
-              }
-            }
-          }
-          if (insertExecutor.insert(new InsertPlan(deviceId, record.getTimestamp(),
-              measurementList.toArray(new String[0]), insertValues.toArray(new String[0])))) {
-            throw new IOException("Inserting series data to IoTDB engine has failed.");
-          }
-        }
-      }
-    } catch (IOException e) {
-      logger.error("Can not parse tsfile into SQL", e);
-      throw new IOException(e);
-    } catch (ProcessorException e) {
-      logger.error("Meet error while processing non-query.");
-      throw new ProcessorException(e);
-    } finally {
-      try {
-        if (reader != null) {
-          reader.close();
-        }
-      } catch (IOException e) {
-        logger.error("Cannot close file stream {}", filePath, e);
-      }
-    }
-  }
-
-  /**
-   * Insert those valid data in the tsfile into IoTDB
-   *
-   * @param overlapFiles:files which are conflict with the sync file
-   */
-  public void loadOldData(String filePath, List<String> overlapFiles)
-      throws IOException, ProcessorException {
-    Set<String> timeseriesList = new HashSet<>();
-    QueryProcessExecutor insertExecutor = new QueryProcessExecutor();
-    Map<String, ReadOnlyTsFile> tsfilesReaders = openReaders(filePath, overlapFiles);
-    try {
-      TsFileSequenceReader reader = new TsFileSequenceReader(filePath);
-      Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
-      Iterator<String> it = deviceIdMap.keySet().iterator();
-      while (it.hasNext()) {
-        String deviceID = it.next();
-        TsDeviceMetadataIndex deviceMI = deviceIdMap.get(deviceID);
-        TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(deviceMI);
-        List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata
-            .getChunkGroupMetaDataList();
-        timeseriesList.clear();
-        /** firstly, get all timeseries in the same device **/
-        for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
-          List<ChunkMetaData> chunkMetaDataList = chunkGroupMetaData.getChunkMetaDataList();
-          for (ChunkMetaData timeSeriesChunkMetaData : chunkMetaDataList) {
-            String measurementUID = timeSeriesChunkMetaData.getMeasurementUid();
-            measurementUID = deviceID + "." + measurementUID;
-            timeseriesList.add(measurementUID);
-          }
-        }
-        reader.close();
-
-        /** secondly, use tsFile Reader to form SQL **/
-        ReadOnlyTsFile readOnlyTsFile = tsfilesReaders.get(filePath);
-        List<Path> paths = new ArrayList<>();
-        /** compare data with one timeseries in a round to get valid data **/
-        for (String timeseries : timeseriesList) {
-          paths.clear();
-          paths.add(new Path(timeseries));
-          Set<InsertPlan> originDataPoints = new HashSet<>();
-          QueryExpression queryExpression = QueryExpression.create(paths, null);
-          QueryDataSet queryDataSet = readOnlyTsFile.query(queryExpression);
-          Set<InsertPlan> newDataPoints = convertToInserPlans(queryDataSet, paths, deviceID);
-
-          /** get all data with the timeseries in all overlap files. **/
-          for (String overlapFile : overlapFiles) {
-            ReadOnlyTsFile readTsFileOverlap = tsfilesReaders.get(overlapFile);
-            QueryDataSet queryDataSetOverlap = readTsFileOverlap.query(queryExpression);
-            originDataPoints.addAll(convertToInserPlans(queryDataSetOverlap, paths, deviceID));
-          }
-
-          /** If there has no overlap data with the timeseries, inserting all data in the sync file **/
-          if (originDataPoints.isEmpty()) {
-            for (InsertPlan insertPlan : newDataPoints) {
-              if (insertExecutor.insert(insertPlan)) {
-                throw new IOException("Inserting series data to IoTDB engine has failed.");
-              }
-            }
-          } else {
-            /** Compare every data to get valid data **/
-            for (InsertPlan insertPlan : newDataPoints) {
-              if (!originDataPoints.contains(insertPlan)) {
-                if (insertExecutor.insert(insertPlan)) {
-                  throw new IOException("Inserting series data to IoTDB engine has failed.");
-                }
-              }
-            }
-          }
-        }
-      }
-    } catch (IOException e) {
-      logger.error("Can not parse tsfile into SQL", e);
-      throw new IOException(e);
-    } catch (ProcessorException e) {
-      logger.error("Meet error while processing non-query.", e);
-      throw new ProcessorException(e);
-    } finally {
-      try {
-        closeReaders(tsfilesReaders);
-      } catch (IOException e) {
-        logger.error("Cannot close file stream {}", filePath, e);
-      }
-    }
-  }
-
-  private Set<InsertPlan> convertToInserPlans(QueryDataSet queryDataSet, List<Path> paths, String deviceID) throws IOException {
-    Set<InsertPlan> plans = new HashSet<>();
-    while (queryDataSet.hasNext()) {
-      RowRecord record = queryDataSet.next();
-      List<Field> fields = record.getFields();
-      /** get all data with the timeseries in the sync file **/
-      for (int i = 0; i < fields.size(); i++) {
-        Field field = fields.get(i);
-        String[] measurementList = new String[1];
-        if (!field.isNull()) {
-          measurementList[0] = paths.get(i).getMeasurement();
-          InsertPlan insertPlan = new InsertPlan(deviceID, record.getTimestamp(),
-              measurementList, new String[]{field.getDataType() == TSDataType.TEXT ? String.format("'%s'", field.toString())
-              : field.toString()});
-          plans.add(insertPlan);
-        }
-      }
-    }
-    return plans;
-  }
-
-  /**
    * Open all tsfile reader and cache
    */
   private Map<String, ReadOnlyTsFile> openReaders(String filePath, List<String> overlapFiles)
@@ -716,11 +417,8 @@ public class SyncServiceImpl implements SyncService.Iface {
    */
   @Override
   public void cleanUp() {
-    uuid.remove();
     fileNum.remove();
     fileNodeMap.remove();
-    fileNodeStartTime.remove();
-    fileNodeEndTime.remove();
     schemaFromSenderPath.remove();
     try {
       FileUtils.deleteDirectory(new File(syncFolderPath));
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
index dfe3c52..dffa1e0 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
@@ -27,15 +27,22 @@ public class Constans {
   public static final String SYNC_SENDER = "sync-sender";
   public static final String SYNC_RECEIVER = "sync-receiver";
 
+  public static final String MESSAGE_DIGIT_NAME = "MD5";
+  public static final String SYNC_DIR_NAME_SEPARATOR = "_";
+
+  // sender section
+
   public static final String LOCK_FILE_NAME = "sync_lock";
+
   public static final String SCHEMA_POS_FILE_NAME = "sync_schema_pos";
+
   public static final String LAST_LOCAL_FILE_NAME = "last_local_files.txt";
+
   public static final String CURRENT_LOCAL_FILE_NAME = "current_local_files.txt";
+
   public static final String DATA_SNAPSHOT_NAME = "snapshot";
-  public static final String SYNC_LOG_NAME = "sync.log";
 
-  public static final String MESSAGE_DIGIT_NAME = "MD5";
-  public static final String SYNC_DIR_NAME_SEPARATOR = "_";
+  public static final String SYNC_LOG_NAME = "sync.log";
 
   /**
    * Split data file , block size at each transmission
@@ -43,7 +50,7 @@ public class Constans {
   public static final int DATA_CHUNK_SIZE = 64 * 1024 * 1024;
 
   /**
-   * Max try when syncing the same file to receiver fails.
+   * Maximum try when syncing the same file to receiver fails.
    */
   public static final int MAX_SYNC_FILE_TRY = 5;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
index 1a529be..684db79 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
@@ -21,11 +21,35 @@ package org.apache.iotdb.db.sync.sender.manage;
 import java.io.File;
 import java.io.IOException;
 
+/**
+ * This interface is used to manage deleted files and new closed files that need to be synchronized in each
+ * sync task.
+ */
 public interface ISyncFileManager {
 
+  /**
+   * Find out all closed and unmodified files, which means there has a .resource file and doesn't
+   * have a .mod file. For these files, they will eventually generate a new tsfile file as the merge
+   * operation is executed and executed in subsequent synchronization tasks.
+   *
+   * @param dataDir data directory
+   */
   void getCurrentLocalFiles(String dataDir);
 
+  /**
+   * Load last local files from file<lastLocalFile> which does not contain those tsfiles which are
+   * not synced successfully in previous sync tasks.
+   *
+   * @param lastLocalFile last local file, which may not exist in first sync task.
+   */
   void getLastLocalFiles(File lastLocalFile) throws IOException;
 
+  /**
+   * Based on current local files and last local files, we can distinguish two kinds of files
+   * between them, one is deleted files, the other is new files. These two kinds of files are valid
+   * files that need to be synchronized to the receiving end.
+   *
+   * @param dataDir data directory
+   */
   void getValidFiles(String dataDir) throws IOException;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
index c08b64f..23af20c 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
@@ -39,12 +39,32 @@ public class SyncFileManager implements ISyncFileManager {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(SyncFileManager.class);
 
+  /**
+   * All storage groups on the disk where the current sync task is executed
+   */
+  private Set<String> allSG;
+
+  /**
+   * Key is storage group, value is the set of current sealed tsfile in the sg.
+   */
   private Map<String, Set<File>> currentSealedLocalFilesMap;
 
+  /**
+   * Key is storage group, value is the set of last local tsfiles in the sg, which don't contains
+   * those tsfiles which are not synced successfully.
+   */
   private Map<String, Set<File>> lastLocalFilesMap;
 
+  /**
+   * Key is storage group, value is the valid set of deleted tsfiles which need to be synced to
+   * receiver end in the sg.
+   */
   private Map<String, Set<File>> deletedFilesMap;
 
+  /**
+   * Key is storage group, value is the valid set of new tsfiles which need to be synced to
+   * receiver end in the sg.
+   */
   private Map<String, Set<File>> toBeSyncedFilesMap;
 
   private SyncFileManager() {
@@ -58,15 +78,18 @@ public class SyncFileManager implements ISyncFileManager {
   @Override
   public void getCurrentLocalFiles(String dataDir) {
     LOGGER.info("Start to get current local files in data folder {}", dataDir);
+
     // get all files in data dir sequence folder
     Map<String, Set<File>> currentAllLocalFiles = new HashMap<>();
     File[] allSGFolders = new File(
         dataDir + File.separatorChar + IoTDBConstant.SEQUENCE_FLODER_NAME)
         .listFiles();
     for (File sgFolder : allSGFolders) {
+      allSG.add(sgFolder.getName());
       currentAllLocalFiles.putIfAbsent(sgFolder.getName(), new HashSet<>());
-      Arrays.stream(sgFolder.listFiles()).forEach(file -> currentAllLocalFiles.get(sgFolder.getName())
-          .add(new File(sgFolder.getAbsolutePath(), file.getName())));
+      Arrays.stream(sgFolder.listFiles())
+          .forEach(file -> currentAllLocalFiles.get(sgFolder.getName())
+              .add(new File(sgFolder.getAbsolutePath(), file.getName())));
     }
 
     // get sealed tsfiles
@@ -92,10 +115,14 @@ public class SyncFileManager implements ISyncFileManager {
     LOGGER.info("Start to get last local files from last local file info {}",
         lastLocalFileInfo.getAbsoluteFile());
     lastLocalFilesMap = new HashMap<>();
+    if(!lastLocalFileInfo.exists()){
+      return;
+    }
     try (BufferedReader reader = new BufferedReader(new FileReader(lastLocalFileInfo))) {
       String fileName;
       while ((fileName = reader.readLine()) != null) {
         String sgName = new File(fileName).getParent();
+        allSG.add(sgName);
         lastLocalFilesMap.putIfAbsent(sgName, new HashSet<>());
         lastLocalFilesMap.get(sgName).add(new File(fileName));
       }
@@ -104,21 +131,21 @@ public class SyncFileManager implements ISyncFileManager {
 
   @Override
   public void getValidFiles(String dataDir) throws IOException {
+    allSG = new HashSet<>();
     getCurrentLocalFiles(dataDir);
     getLastLocalFiles(new File(SyncSenderDescriptor.getInstance().getConfig().getLastFileInfo()));
     toBeSyncedFilesMap = new HashMap<>();
     deletedFilesMap = new HashMap<>();
-    for(Entry<String, Set<File>> entry: currentSealedLocalFilesMap.entrySet()){
-      String sgName = entry.getKey();
+    for (String sgName : allSG) {
       toBeSyncedFilesMap.putIfAbsent(sgName, new HashSet<>());
       deletedFilesMap.putIfAbsent(sgName, new HashSet<>());
-      for(File newFile:currentSealedLocalFilesMap.get(sgName)){
-        if(!lastLocalFilesMap.get(sgName).contains(newFile)){
+      for (File newFile : currentSealedLocalFilesMap.getOrDefault(sgName, new HashSet<>())) {
+        if (!lastLocalFilesMap.getOrDefault(sgName, new HashSet<>()).contains(newFile)) {
           toBeSyncedFilesMap.get(sgName).add(newFile);
         }
       }
-      for(File oldFile:lastLocalFilesMap.get(sgName)){
-        if(!currentSealedLocalFilesMap.get(sgName).contains(oldFile)){
+      for (File oldFile : lastLocalFilesMap.getOrDefault(sgName, new HashSet<>())) {
+        if (!currentSealedLocalFilesMap.getOrDefault(sgName, new HashSet<>()).contains(oldFile)) {
           deletedFilesMap.get(sgName).add(oldFile);
         }
       }
@@ -137,6 +164,10 @@ public class SyncFileManager implements ISyncFileManager {
     return toBeSyncedFilesMap;
   }
 
+  public Set<String> getAllSG() {
+    return allSG;
+  }
+
   private static class SyncFileManagerHolder {
 
     private static final SyncFileManager INSTANCE = new SyncFileManager();
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
index 53cb54f..d0c09b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
@@ -20,14 +20,19 @@ package org.apache.iotdb.db.sync.sender.recover;
 
 import java.util.Set;
 
+/**
+ * This interface is used to restore and clean up the status of the historical synchronization task
+ * with abnormal termination. Through the analysis of the synchronization task log, the completed
+ * progress is merged to prepare for the next synchronization task.
+ */
 public interface ISyncSenderLogAnalyzer {
 
+  void recover();
+
   void loadLastLocalFiles(Set<String> lastLocalFiles);
 
   void loadLogger(Set<String> deletedFiles, Set<String> newFiles);
 
-  void recover();
-
   void clearLogger(Set<String> currentLocalFiles);
 
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
index d1cdf9e..0229579 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
@@ -21,14 +21,40 @@ package org.apache.iotdb.db.sync.sender.recover;
 import java.io.File;
 import java.io.IOException;
 
+/**
+ * This interface is used to log progress in the process of synchronization tasks. If the
+ * synchronization tasks are completed normally and there are no exceptions, the log records will be
+ * deleted; otherwise, the status can be restored according to the log at the start of each task. It
+ * ensures the correctness of synchronization module when system crash or network abnormality
+ * occur.
+ */
 public interface ISyncSenderLogger {
 
+  /**
+   * Start sync deleted files name
+   * @throws IOException
+   */
   void startSyncDeletedFilesName() throws IOException;
 
+  /**
+   * After a deleted file name is synced to the receiver end, record it in sync log.
+   * @param file the deleted tsfile
+   * @throws IOException
+   */
   void finishSyncDeletedFileName(File file) throws IOException;
 
+  /**
+   * Start sync new tsfiles
+   * @throws IOException
+   */
   void startSyncTsFiles() throws IOException;
 
+  /**
+   *
+   * After a new tsfile is synced to the receiver end, record it in sync log.
+   * @param file new tsfile
+   * @throws IOException
+   */
   void finishSyncTsfile(File file) throws IOException;
 
   void close() throws IOException;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
index 9649806..3acfd66 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.db.sync.sender.conf.Constans;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SyncSenderLogAnalyzer implements ISyncSenderLogAnalyzer{
+public class SyncSenderLogAnalyzer implements ISyncSenderLogAnalyzer {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(SyncSenderLogAnalyzer.class);
   private File currentLocalFile;
@@ -109,6 +109,7 @@ public class SyncSenderLogAnalyzer implements ISyncSenderLogAnalyzer{
     } catch (IOException e) {
       LOGGER.error("Can not clear sync log {}", syncLogFile.getAbsoluteFile(), e);
     }
+    lastLocalFile.deleteOnExit();
     currentLocalFile.renameTo(lastLocalFile);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
index fc68960..12803f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
@@ -1,4 +1,21 @@
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.sync.sender.transfer;
 
 import java.io.BufferedReader;
@@ -19,15 +36,18 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.SyncConnectionException;
 import org.apache.iotdb.db.metadata.MetadataConstant;
 import org.apache.iotdb.db.sync.sender.conf.Constans;
@@ -60,30 +80,38 @@ public class DataTransferManager implements IDataTransferManager {
 
   private static final int BATCH_LINE = 1000;
 
+  /**
+   * When transferring schema information, it is a better choice to transfer only new schema
+   * information, avoiding duplicate data transmission. The schema log is self-increasing, so the
+   * location is recorded once after each synchronization task for the next synchronization task to
+   * use.
+   */
   private int schemaFileLinePos;
 
   private TTransport transport;
 
   private SyncService.Client serviceClient;
 
-  /**
-   * Files that need to be synchronized
-   */
+  private Set<String> allSG;
+
   private Map<String, Set<File>> toBeSyncedFilesMap;
 
   private Map<String, Set<File>> deletedFilesMap;
 
-  private Map<String, Set<File>> sucessSyncedFilesMap;
+  private Map<String, Set<File>> lastLocalFilesMap;
 
-  private Map<String, Set<File>> successDeletedFilesMap;
+  private Map<String, Set<File>> successSyncedFilesMap = new HashMap<>();
 
-  private Map<String, Set<File>> lastLocalFilesMap;
+  private Map<String, Set<File>> successDeletedFilesMap = new HashMap<>();
 
   /**
    * If true, sync is in execution.
    **/
   private volatile boolean syncStatus = false;
 
+  /**
+   * Record sync progress in log.
+   */
   private SyncSenderLogger syncLog;
 
   private SyncFileManager syncFileManager = SyncFileManager.getInstance();
@@ -99,7 +127,7 @@ public class DataTransferManager implements IDataTransferManager {
   }
 
   /**
-   * Create a sender and sync files to the receiver.
+   * Create a sender and sync files to the receiver periodically.
    */
   public static void main(String[] args) throws IOException {
     Thread.currentThread().setName(ThreadName.SYNC_CLIENT.getName());
@@ -109,10 +137,9 @@ public class DataTransferManager implements IDataTransferManager {
     fileSenderImpl.startTimedTask();
   }
 
-
   /**
-   * The method is to verify whether the client lock file is locked or not, ensuring that only one
-   * client is running.
+   * Verify whether the client lock file is locked or not, ensuring that only one client is
+   * running.
    */
   private void verifySingleton() throws IOException {
     File lockFile = new File(config.getLockFilePath());
@@ -165,7 +192,7 @@ public class DataTransferManager implements IDataTransferManager {
   }
 
   /**
-   * Start Monitor Thread, monitor sync status
+   * Start monitor thread, which monitor sync status.
    */
   private void startMonitor() {
     executorService.scheduleWithFixedDelay(() -> {
@@ -195,11 +222,12 @@ public class DataTransferManager implements IDataTransferManager {
     executorService = null;
   }
 
+  @Override
   public void syncAll() throws SyncConnectionException, IOException, TException {
 
     // 1. Connect to sync receiver and confirm identity
     establishConnection(config.getServerIp(), config.getServerPort());
-    if (!confirmIdentity(config.getSenderPath())) {
+    if (!confirmIdentity()) {
       logger.error("Sorry, you do not have the permission to connect to sync receiver.");
       System.exit(1);
     }
@@ -212,16 +240,18 @@ public class DataTransferManager implements IDataTransferManager {
     for (String dataDir : dataDirs) {
       logger.info("Start to sync data in data dir {}", dataDir);
       config.update(dataDir);
-      SyncFileManager.getInstance().getValidFiles(dataDir);
-      lastLocalFilesMap = SyncFileManager.getInstance().getLastLocalFilesMap();
-      deletedFilesMap = SyncFileManager.getInstance().getDeletedFilesMap();
-      toBeSyncedFilesMap = SyncFileManager.getInstance().getToBeSyncedFilesMap();
+      syncFileManager.getValidFiles(dataDir);
+      allSG = syncFileManager.getAllSG();
+      lastLocalFilesMap = syncFileManager.getLastLocalFilesMap();
+      deletedFilesMap = syncFileManager.getDeletedFilesMap();
+      toBeSyncedFilesMap = syncFileManager.getToBeSyncedFilesMap();
       checkRecovery();
       if (SyncUtils.isEmpty(deletedFilesMap) && SyncUtils.isEmpty(toBeSyncedFilesMap)) {
         logger.info("There has no data to sync in data dir {}", dataDir);
         continue;
       }
       sync();
+      endSync();
       logger.info("Finish to sync data in data dir {}", dataDir);
     }
 
@@ -236,31 +266,161 @@ public class DataTransferManager implements IDataTransferManager {
     }
   }
 
+  private void checkRecovery() {
+    new SyncSenderLogAnalyzer(config.getSenderPath()).recover();
+  }
+
+  @Override
+  public void establishConnection(String serverIp, int serverPort) throws SyncConnectionException {
+    transport = new TSocket(serverIp, serverPort);
+    TProtocol protocol = new TBinaryProtocol(transport);
+    serviceClient = new SyncService.Client(protocol);
+    try {
+      transport.open();
+    } catch (TTransportException e) {
+      logger.error("Cannot connect to the receiver.");
+      throw new SyncConnectionException(e);
+    }
+  }
+
+  @Override
+  public boolean confirmIdentity() throws SyncConnectionException {
+    try {
+      return serviceClient.checkIdentity(InetAddress.getLocalHost().getHostAddress())
+          == ResultStatus.SUCCESS;
+    } catch (Exception e) {
+      logger.error("Cannot confirm identity with the receiver.");
+      throw new SyncConnectionException(e);
+    }
+  }
+
+  @Override
+  public void syncSchema() throws SyncConnectionException, TException {
+    int retryCount = 0;
+    serviceClient.initSyncData(MetadataConstant.METADATA_LOG);
+    while (true) {
+      if (retryCount > Constans.MAX_SYNC_FILE_TRY) {
+        throw new SyncConnectionException(String
+            .format("Can not sync schema after %s retries.", Constans.MAX_SYNC_FILE_TRY));
+      }
+      try {
+        if (tryToSyncSchema()) {
+          writeSyncSchemaPos(getSchemaPosFile());
+          break;
+        }
+      } finally {
+        retryCount++;
+      }
+    }
+  }
+
+  private boolean tryToSyncSchema() {
+    int schemaPos = readSyncSchemaPos(getSchemaPosFile());
+
+    // start to sync file data and get md5 of this file.
+    try (BufferedReader br = new BufferedReader(new FileReader(getSchemaLogFile()));
+        ByteArrayOutputStream bos = new ByteArrayOutputStream(Constans.DATA_CHUNK_SIZE)) {
+      schemaFileLinePos = 0;
+      while (schemaFileLinePos++ <= schemaPos) {
+        br.readLine();
+      }
+      MessageDigest md = MessageDigest.getInstance(Constans.MESSAGE_DIGIT_NAME);
+      String line;
+      int cntLine = 0;
+      while ((line = br.readLine()) != null) {
+        schemaFileLinePos++;
+        byte[] singleLineData = BytesUtils.stringToBytes(line);
+        bos.write(singleLineData);
+        md.update(singleLineData);
+        if (cntLine++ == BATCH_LINE) {
+          ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
+          bos.reset();
+          if (serviceClient.syncData(buffToSend) == ResultStatus.FAILURE) {
+            logger.error("Receiver failed to receive metadata, retry.");
+            return false;
+          }
+          cntLine = 0;
+        }
+      }
+      if (bos.size() != 0) {
+        ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
+        bos.reset();
+        if (serviceClient.syncData(buffToSend) == ResultStatus.FAILURE) {
+          logger.error("Receiver failed to receive metadata, retry.");
+          return false;
+        }
+      }
+
+      // check md5
+      return checkMD5ForSchema((new BigInteger(1, md.digest())).toString(16));
+    } catch (NoSuchAlgorithmException | IOException | TException e) {
+      logger.error("Can not finish transfer schema to receiver", e);
+      return false;
+    }
+  }
+
   /**
-   * Execute a sync task.
+   * Check MD5 of schema to make sure that the receiver receives the schema correctly
    */
+  private boolean checkMD5ForSchema(String md5OfSender) throws TException {
+    String md5OfReceiver = serviceClient.checkDataMD5(md5OfSender);
+    if (md5OfSender.equals(md5OfReceiver)) {
+      logger.info("Receiver has received schema successfully, retry.");
+      return true;
+    } else {
+      logger
+          .error("MD5 check of schema file {} failed, retry", getSchemaLogFile().getAbsoluteFile());
+      return false;
+    }
+  }
+
+  private int readSyncSchemaPos(File syncSchemaLogFile) {
+    try {
+      if (syncSchemaLogFile.exists()) {
+        try (BufferedReader br = new BufferedReader(new FileReader(syncSchemaLogFile))) {
+          return Integer.parseInt(br.readLine());
+        }
+      }
+    } catch (IOException e) {
+      logger.error("Can not find file {}", syncSchemaLogFile.getAbsoluteFile(), e);
+    }
+    return 0;
+  }
+
+  private void writeSyncSchemaPos(File syncSchemaLogFile) {
+    try {
+      if (!syncSchemaLogFile.exists()) {
+        syncSchemaLogFile.createNewFile();
+      }
+      try (BufferedWriter br = new BufferedWriter(new FileWriter(syncSchemaLogFile))) {
+        br.write(Integer.toString(schemaFileLinePos));
+      }
+    } catch (IOException e) {
+      logger.error("Can not find file {}", syncSchemaLogFile.getAbsoluteFile(), e);
+    }
+  }
+
   @Override
   public void sync() throws IOException {
     try {
       syncStatus = true;
       syncLog = new SyncSenderLogger(getSchemaLogFile());
+      successSyncedFilesMap = new HashMap<>();
+      successDeletedFilesMap = new HashMap<>();
 
-      // 1. Sync data
-      for (Entry<String, Set<File>> entry : deletedFilesMap.entrySet()) {
-        checkRecovery();
+      for (String sgName : allSG) {
         syncLog = new SyncSenderLogger(getSyncLogFile());
-        // TODO deal with the situation
         try {
-          if (serviceClient.init(entry.getKey()) == ResultStatus.FAILURE) {
+          if (serviceClient.init(sgName) == ResultStatus.FAILURE) {
             throw new SyncConnectionException("unable init receiver");
           }
         } catch (TException | SyncConnectionException e) {
           throw new SyncConnectionException("Unable to connect to receiver", e);
         }
-        logger.info("Sync process starts to transfer data of storage group {}", entry.getKey());
-        syncDeletedFilesName(entry.getKey(), entry.getValue());
-        syncDataFilesInOneGroup(entry.getKey(), entry.getValue());
-        clearSyncLog();
+        logger.info("Sync process starts to transfer data of storage group {}", sgName);
+        syncDeletedFilesNameInOneGroup(sgName,
+            deletedFilesMap.getOrDefault(sgName, new HashSet<>()));
+        syncDataFilesInOneGroup(sgName, toBeSyncedFilesMap.getOrDefault(sgName, new HashSet<>()));
       }
 
     } catch (SyncConnectionException e) {
@@ -273,12 +433,9 @@ public class DataTransferManager implements IDataTransferManager {
     }
   }
 
-  private void checkRecovery() {
-    new SyncSenderLogAnalyzer(config.getSenderPath()).recover();
-  }
-
   @Override
-  public void syncDeletedFilesName(String sgName, Set<File> deletedFilesName) throws IOException {
+  public void syncDeletedFilesNameInOneGroup(String sgName, Set<File> deletedFilesName)
+      throws IOException {
     if (deletedFilesName.isEmpty()) {
       logger.info("There has no deleted files to be synced in storage group {}", sgName);
       return;
@@ -313,7 +470,8 @@ public class DataTransferManager implements IDataTransferManager {
       try {
         snapshotFile = makeFileSnapshot(tsfile);
         syncSingleFile(snapshotFile);
-        sucessSyncedFilesMap.get(sgName).add(tsfile);
+        syncSingleFile(new File(snapshotFile, TsFileResource.RESOURCE_SUFFIX));
+        successSyncedFilesMap.get(sgName).add(tsfile);
         syncLog.finishSyncTsfile(tsfile);
         logger.info("Task of synchronization has completed {}/{}.", cnt, toBeSyncFiles.size());
       } catch (IOException e) {
@@ -329,6 +487,11 @@ public class DataTransferManager implements IDataTransferManager {
     logger.info("Sync process has finished storage group {}.", sgName);
   }
 
+  /**
+   * Make snapshot<hard link> for new tsfile and its .restore file.
+   *
+   * @param file new tsfile to be synced
+   */
   private File makeFileSnapshot(File file) throws IOException {
     File snapshotFile = SyncUtils.getSnapshotFile(file);
     if (!snapshotFile.getParentFile().exists()) {
@@ -337,11 +500,16 @@ public class DataTransferManager implements IDataTransferManager {
     Path link = FileSystems.getDefault().getPath(snapshotFile.getAbsolutePath());
     Path target = FileSystems.getDefault().getPath(snapshotFile.getAbsolutePath());
     Files.createLink(link, target);
+    link = FileSystems.getDefault()
+        .getPath(snapshotFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+    target = FileSystems.getDefault()
+        .getPath(snapshotFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+    Files.createLink(link, target);
     return snapshotFile;
   }
 
   /**
-   * Transfer data of a storage group to receiver.
+   * Transfer data of a tsfile to the receiver.
    */
   private void syncSingleFile(File snapshotFile) throws SyncConnectionException {
     try {
@@ -387,156 +555,18 @@ public class DataTransferManager implements IDataTransferManager {
     }
   }
 
-
-  /**
-   * Establish a connection between the sender and the receiver.
-   *
-   * @param serverIp the ip address of the receiver
-   * @param serverPort must be same with port receiver set.
-   */
-  @Override
-  public void establishConnection(String serverIp, int serverPort) throws SyncConnectionException {
-    transport = new TSocket(serverIp, serverPort);
-    TProtocol protocol = new TBinaryProtocol(transport);
-    serviceClient = new SyncService.Client(protocol);
-    try {
-      transport.open();
-    } catch (TTransportException e) {
-      logger.error("Cannot connect to server");
-      throw new SyncConnectionException(e);
-    }
-  }
-
-  /**
-   * UUID marks the identity of sender for receiver.
-   */
-  @Override
-  public boolean confirmIdentity(String uuidPath) throws SyncConnectionException {
-    try {
-      return serviceClient.checkIdentity(InetAddress.getLocalHost().getHostAddress())
-          == ResultStatus.SUCCESS;
-    } catch (Exception e) {
-      logger.error("Cannot confirm identity with receiver");
-      throw new SyncConnectionException(e);
-    }
-  }
-
-  /**
-   * Sync schema with receiver.
-   */
-  @Override
-  public void syncSchema() throws SyncConnectionException, TException {
-    int retryCount = 0;
-    serviceClient.initSyncData(MetadataConstant.METADATA_LOG);
-    while (true) {
-      if (retryCount > Constans.MAX_SYNC_FILE_TRY) {
-        throw new SyncConnectionException(String
-            .format("Can not sync schema after %s tries.", Constans.MAX_SYNC_FILE_TRY));
-      }
-      try {
-        if (tryToSyncSchema()) {
-          writeSyncSchemaPos(getSchemaPosFile());
-          break;
-        }
-      } finally {
-        retryCount++;
-      }
-    }
-  }
-
-  private boolean tryToSyncSchema() {
-    int schemaPos = readSyncSchemaPos(getSchemaPosFile());
-
-    // start to sync file data and get md5 of this file.
-    try (BufferedReader br = new BufferedReader(new FileReader(getSchemaLogFile()));
-        ByteArrayOutputStream bos = new ByteArrayOutputStream(Constans.DATA_CHUNK_SIZE)) {
-      schemaFileLinePos = 0;
-      while (schemaFileLinePos++ <= schemaPos) {
-        br.readLine();
-      }
-      MessageDigest md = MessageDigest.getInstance(Constans.MESSAGE_DIGIT_NAME);
-      String line;
-      int cntLine = 0;
-      while ((line = br.readLine()) != null) {
-        schemaFileLinePos++;
-        byte[] singleLineData = BytesUtils.stringToBytes(line);
-        bos.write(singleLineData);
-        md.update(singleLineData);
-        if (cntLine++ == BATCH_LINE) {
-          ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
-          bos.reset();
-          // PROCESSING_STATUS represents there is still schema buffer to send.
-          if (serviceClient.syncData(buffToSend) == ResultStatus.FAILURE) {
-            logger.error("Receiver failed to receive metadata, retry.");
-            return false;
-          }
-          cntLine = 0;
-        }
-      }
-      if (bos.size() != 0) {
-        ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
-        bos.reset();
-        if (serviceClient.syncData(buffToSend) == ResultStatus.FAILURE) {
-          logger.error("Receiver failed to receive metadata, retry.");
-          return false;
-        }
-      }
-
-      // check md5
-      return checkMD5ForSchema((new BigInteger(1, md.digest())).toString(16));
-    } catch (NoSuchAlgorithmException | IOException | TException e) {
-      logger.error("Can not finish transfer schema to receiver", e);
-      return false;
-    }
-  }
-
-
-  private boolean checkMD5ForSchema(String md5OfSender) throws TException {
-    String md5OfReceiver = serviceClient.checkDataMD5(md5OfSender);
-    if (md5OfSender.equals(md5OfReceiver)) {
-      logger.info("Receiver has received schema successfully, retry.");
-      return true;
-    } else {
-      logger
-          .error("MD5 check of schema file {} failed, retry", getSchemaLogFile().getAbsoluteFile());
-      return false;
-    }
-  }
-
-  private int readSyncSchemaPos(File syncSchemaLogFile) {
-    try {
-      if (syncSchemaLogFile.exists()) {
-        try (BufferedReader br = new BufferedReader(new FileReader(syncSchemaLogFile))) {
-          return Integer.parseInt(br.readLine());
-        }
-      }
-    } catch (IOException e) {
-      logger.error("Can not find file {}", syncSchemaLogFile.getAbsoluteFile(), e);
-    }
-    return 0;
-  }
-
-  private void writeSyncSchemaPos(File syncSchemaLogFile) {
-    try {
-      if (syncSchemaLogFile.exists()) {
-        try (BufferedWriter br = new BufferedWriter(new FileWriter(syncSchemaLogFile))) {
-          br.write(Integer.toString(schemaFileLinePos));
-        }
-      }
-    } catch (IOException e) {
-      logger.error("Can not find file {}", syncSchemaLogFile.getAbsoluteFile(), e);
-    }
-  }
-
-  private void clearSyncLog() {
+  private void endSync() {
+    // 1. Organize current local files based on sync result
     for (Entry<String, Set<File>> entry : lastLocalFilesMap.entrySet()) {
       entry.getValue()
           .removeAll(successDeletedFilesMap.getOrDefault(entry.getKey(), new HashSet<>()));
       entry.getValue()
-          .removeAll(sucessSyncedFilesMap.getOrDefault(entry.getKey(), new HashSet<>()));
+          .removeAll(successSyncedFilesMap.getOrDefault(entry.getKey(), new HashSet<>()));
     }
     File currentLocalFile = getCurrentLogFile();
     File lastLocalFile = new File(config.getLastFileInfo());
+
+    // 2. Write file list to currentLocalFile
     try (BufferedWriter bw = new BufferedWriter(new FileWriter(currentLocalFile))) {
       for (Set<File> currentLocalFiles : lastLocalFilesMap.values()) {
         for (File file : currentLocalFiles) {
@@ -548,7 +578,20 @@ public class DataTransferManager implements IDataTransferManager {
     } catch (IOException e) {
       logger.error("Can not clear sync log {}", lastLocalFile.getAbsoluteFile(), e);
     }
+
+    // 3. Rename currentLocalFile to lastLocalFile
+    lastLocalFile.deleteOnExit();
     currentLocalFile.renameTo(lastLocalFile);
+
+    // 4. delete snapshot directory
+    try {
+      FileUtils.deleteDirectory(new File(config.getSnapshotPath()));
+    } catch (IOException e) {
+      logger.error("Can not clear snapshot directory {}", config.getSnapshotPath(), e);
+    }
+
+    // 5. delete sync log file
+    getSyncLogFile().deleteOnExit();
   }
 
 
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
index 3366b91..bff3f2f 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
@@ -25,45 +25,61 @@ import org.apache.iotdb.db.exception.SyncConnectionException;
 import org.apache.thrift.TException;
 
 /**
- * SyncSender defines the methods of a sender in sync module.
+ * This interface is used to realize the data transmission part of synchronization task, and is also
+ * the most important part of synchronization task. By screening out all transmission files to be
+ * synchronized in <class>SyncFileManager</class>, these files are synchronized to the receiving end
+ * to complete the synchronization task.
  */
 public interface IDataTransferManager {
 
-  /**
-   * Init
-   */
   void init();
 
   /**
-   * Connect to server.
+   * Establish a connection to receiver end.
    */
   void establishConnection(String serverIp, int serverPort) throws SyncConnectionException;
 
   /**
-   * Transfer UUID to receiver.
+   * Confirm identity, the receiver will check whether the sender has synchronization privileges.
    */
-  boolean confirmIdentity(String uuidPath) throws SyncConnectionException, IOException;
+  boolean confirmIdentity() throws SyncConnectionException, IOException;
 
   /**
-   * Send schema file to receiver.
+   * Sync schema file to receiver before all data to be synced.
    */
   void syncSchema() throws SyncConnectionException, TException;
 
-  void syncDeletedFilesName(String sgName, Set<File> deletedFilesName)
+  /**
+   * For deleted files in a storage group, sync them to receiver side and load these data in
+   * receiver.
+   *
+   * @param sgName storage group name
+   * @param deletedFilesName list of deleted file names
+   */
+  void syncDeletedFilesNameInOneGroup(String sgName, Set<File> deletedFilesName)
       throws SyncConnectionException, IOException;
 
   /**
-   * For all valid files, send it to receiver side and load these data in receiver.
+   * Execute a sync task for all data directory.
    */
-  void syncDataFilesInOneGroup(String sgName, Set<File> deletedFilesName)
-      throws SyncConnectionException, IOException;
+  void syncAll() throws SyncConnectionException, IOException, TException;
 
   /**
-   * Execute a sync task.
+   * Execute a sync task for a data directory.
    */
   void sync() throws SyncConnectionException, IOException;
 
   /**
+   * For new valid files in a storage group, sync them to receiver side and load these data in
+   * receiver.
+   *
+   * @param sgName storage group name
+   * @param toBeSyncFiles list of new tsfile names
+   */
+  void syncDataFilesInOneGroup(String sgName, Set<File> toBeSyncFiles)
+      throws SyncConnectionException, IOException;
+
+  /**
    * Stop sync process
    */
   void stop();