You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/08/22 03:14:13 UTC

[incubator-iotdb] 02/05: complete manage module

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit ab86f899065649273be9cf188e7620c48fe120e0
Author: lta <li...@163.com>
AuthorDate: Wed Aug 21 15:39:25 2019 +0800

    complete manage module
---
 server/iotdb/conf/iotdb-sync-client.properties     |  12 +-
 .../org/apache/iotdb/db/conf/IoTDBConstant.java    |   4 +
 .../db/conf/directories/DirectoryManager.java      |   8 +-
 .../org/apache/iotdb/db/metadata/MManager.java     |   2 +-
 .../db/sync/receiver/transfer/SyncServiceImpl.java |  65 +-
 .../receiver/transfer/SyncServiceImplBackup.java   | 736 +++++++++++++++++++++
 .../iotdb/db/sync/sender/SyncFileManager.java      | 208 ------
 .../apache/iotdb/db/sync/sender/conf/Constans.java |  17 +-
 .../db/sync/sender/conf/SyncSenderConfig.java      | 129 ++--
 .../db/sync/sender/conf/SyncSenderDescriptor.java  |  36 +-
 .../{IFileManager.java => ISyncFileManager.java}   |  12 +-
 .../db/sync/sender/manage/SyncFileManager.java     | 157 +++++
 .../db/sync/sender/recover/ISyncSenderLogger.java  |  21 +-
 .../db/sync/sender/recover/SyncSenderLogger.java   |  86 +++
 .../sync/sender/transfer/DataTransferManager.java  | 542 ++++++++-------
 .../sync/sender/transfer/IDataTransferManager.java |  16 +-
 .../java/org/apache/iotdb/db/utils/SyncUtils.java  |  39 +-
 .../iotdb/db/sync/sender/SingleClientSyncTest.java |   4 +-
 .../iotdb/db/sync/sender/SyncFileManagerTest.java  |   2 +-
 service-rpc/src/main/thrift/sync.thrift            |  21 +-
 20 files changed, 1460 insertions(+), 657 deletions(-)

diff --git a/server/iotdb/conf/iotdb-sync-client.properties b/server/iotdb/conf/iotdb-sync-client.properties
index 65b3074..3606ab4 100644
--- a/server/iotdb/conf/iotdb-sync-client.properties
+++ b/server/iotdb/conf/iotdb-sync-client.properties
@@ -17,19 +17,11 @@
 # under the License.
 #
 
-# Sync server port address
+# Sync receiver server address
 server_ip=127.0.0.1
 
-# Sync client port
+# Sync receiver server port
 server_port=5555
 
 # The period time of sync process, the time unit is second.
 sync_period_in_second=600
-
-# Set bufferWrite data absolute path of IoTDB
-# It needs to be set with iotdb_schema_directory, they have to belong to the same IoTDB
-# iotdb_bufferWrite_directory = D:\\iotdb\\data\\data\\settled
-
-# Set schema file absolute path of IoTDB
-# It needs to be set with iotdb_bufferWrite_directory, they have to belong to the same IoTDB
-# iotdb_schema_directory = D:\\iotdb\\data\\system\\schema\\mlog.txt
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index 3ea1621..13042aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -65,4 +65,8 @@ public class IoTDBConstant {
   public static final String USER = "User";
   public static final String PRIVILEGE = "Privilege";
 
+  // data folder name
+  public static final String SEQUENCE_FLODER_NAME = "sequence";
+  public static final String UNSEQUENCE_FLODER_NAME = "unsequence";
+
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java
index 9b4b58d..e3b8d5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.strategy.DirectoryStrategy;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
@@ -44,14 +45,16 @@ public class DirectoryManager {
     sequenceFileFolders =
         new ArrayList<>(Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getDataDirs()));
     for (int i = 0; i < sequenceFileFolders.size(); i++) {
-      sequenceFileFolders.set(i, sequenceFileFolders.get(i) + File.separator + "sequence");
+      sequenceFileFolders
+          .set(i, sequenceFileFolders.get(i) + File.separator + IoTDBConstant.SEQUENCE_FLODER_NAME);
     }
     mkDirs(sequenceFileFolders);
 
     unsequenceFileFolders =
         new ArrayList<>(Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getDataDirs()));
     for (int i = 0; i < unsequenceFileFolders.size(); i++) {
-      unsequenceFileFolders.set(i, unsequenceFileFolders.get(i) + File.separator + "unsequence");
+      unsequenceFileFolders.set(i,
+          unsequenceFileFolders.get(i) + File.separator + IoTDBConstant.UNSEQUENCE_FLODER_NAME);
     }
     mkDirs(unsequenceFileFolders);
 
@@ -120,6 +123,7 @@ public class DirectoryManager {
   }
 
   private static class DirectoriesHolder {
+
     private static final DirectoryManager INSTANCE = new DirectoryManager();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 1116ff9..ad50573 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -80,7 +80,7 @@ public class MManager {
   private MManager() {
 
     schemaDir =
-        IoTDBDescriptor.getInstance().getConfig().getSystemDir() + File.separator + "schema";
+        IoTDBDescriptor.getInstance().getConfig().getSchemaDir();
 
     File systemFolder = new File(schemaDir);
     if (!systemFolder.exists()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index 4866d41..02bb5b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -45,10 +45,10 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.MetadataErrorException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metadata.MetadataConstant;
 import org.apache.iotdb.db.metadata.MetadataOperationType;
@@ -57,6 +57,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.sync.sender.conf.Constans;
 import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.db.utils.SyncUtils;
+import org.apache.iotdb.service.sync.thrift.ResultStatus;
 import org.apache.iotdb.service.sync.thrift.SyncDataStatus;
 import org.apache.iotdb.service.sync.thrift.SyncService;
 import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
@@ -73,6 +74,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.expression.QueryExpression;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,22 +88,13 @@ public class SyncServiceImpl implements SyncService.Iface {
    **/
   private static final MManager metadataManger = MManager.getInstance();
 
-  private static final String SYNC_SERVER = Constans.SYNC_SERVER;
+  private static final String SYNC_SERVER = Constans.SYNC_RECEIVER;
 
-  private ThreadLocal<String> uuid = new ThreadLocal<>();
   /**
    * String means storage group,List means the set of new files(path) in local IoTDB and String
    * means path of new Files
    **/
   private ThreadLocal<Map<String, List<String>>> fileNodeMap = new ThreadLocal<>();
-  /**
-   * Map String1 means timeseries String2 means path of new Files, long means startTime
-   **/
-  private ThreadLocal<Map<String, Map<String, Long>>> fileNodeStartTime = new ThreadLocal<>();
-  /**
-   * Map String1 means timeseries String2 means path of new Files, long means endTime
-   **/
-  private ThreadLocal<Map<String, Map<String, Long>>> fileNodeEndTime = new ThreadLocal<>();
 
   /**
    * Total num of files that needs to be loaded
@@ -131,28 +124,41 @@ public class SyncServiceImpl implements SyncService.Iface {
   /**
    * Sync folder path of server
    **/
-  private String syncFolderPath;
+  private ThreadLocal<String> syncFolderPath = new ThreadLocal<>();
 
   /**
    * Sync data path of server
    */
-  private String syncDataPath;
+  private ThreadLocal<String> syncDataPath = new ThreadLocal<>();
+
+  private ThreadLocal<String> currentSG = new ThreadLocal<>();
+
+
+  /**
+   * Verify IP address of sender
+   */
+  @Override
+  public ResultStatus checkIdentity(String ipAddress) {
+    Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName());
+    initPath();
+    return SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress) ? ResultStatus.SUCCESS
+        : ResultStatus.FAILURE;
+  }
 
   /**
    * Init threadLocal variable and delete old useless files.
    */
   @Override
-  public boolean init(String storageGroup) {
+  public ResultStatus init(String storageGroup) {
     logger.info("Sync process starts to receive data of storage group {}", storageGroup);
+    currentSG.set(storageGroup);
     fileNum.set(0);
     fileNodeMap.set(new HashMap<>());
-    fileNodeStartTime.set(new HashMap<>());
-    fileNodeEndTime.set(new HashMap<>());
     try {
       FileUtils.deleteDirectory(new File(syncDataPath));
     } catch (IOException e) {
       logger.error("cannot delete directory {} ", syncFolderPath);
-      return false;
+      return ResultStatus.FAILURE;
     }
     for (String bufferWritePath : bufferWritePaths) {
       bufferWritePath = FilePathUtils.regularizePath(bufferWritePath);
@@ -163,22 +169,11 @@ public class SyncServiceImpl implements SyncService.Iface {
           FileUtils.deleteDirectory(backupDirectory);
         } catch (IOException e) {
           logger.error("cannot delete directory {} ", syncFolderPath);
-          return false;
+          return ResultStatus.FAILURE;
         }
       }
     }
-    return true;
-  }
-
-  /**
-   * Verify IP address of sender
-   */
-  @Override
-  public boolean checkIdentity(String uuid, String ipAddress) {
-    Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName());
-    this.uuid.set(uuid);
-    initPath();
-    return SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress);
+    return ResultStatus.SUCCESS;
   }
 
   /**
@@ -242,6 +237,16 @@ public class SyncServiceImpl implements SyncService.Iface {
     return md5OfReceiver;
   }
 
+  @Override
+  public String checkDataMD5(String md5) throws TException {
+    return null;
+  }
+
+  @Override
+  public void endSync() throws TException {
+
+  }
+
   /**
    * Load metadata from sender
    */
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImplBackup.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImplBackup.java
new file mode 100644
index 0000000..6f2f754
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImplBackup.java
@@ -0,0 +1,736 @@
+///**
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied.  See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// */
+//package org.apache.iotdb.db.sync.receiver.transfer;
+//
+//import java.io.BufferedReader;
+//import java.io.File;
+//import java.io.FileInputStream;
+//import java.io.FileNotFoundException;
+//import java.io.FileOutputStream;
+//import java.io.IOException;
+//import java.math.BigInteger;
+//import java.nio.ByteBuffer;
+//import java.nio.channels.FileChannel;
+//import java.security.MessageDigest;
+//import java.util.ArrayList;
+//import java.util.Collections;
+//import java.util.HashMap;
+//import java.util.HashSet;
+//import java.util.Iterator;
+//import java.util.List;
+//import java.util.Map;
+//import java.util.Map.Entry;
+//import java.util.Set;
+//import org.apache.commons.io.FileUtils;
+//import org.apache.commons.lang3.StringUtils;
+//import org.apache.iotdb.db.concurrent.ThreadName;
+//import org.apache.iotdb.db.conf.IoTDBConfig;
+//import org.apache.iotdb.db.conf.IoTDBDescriptor;
+//import org.apache.iotdb.db.conf.directories.DirectoryManager;
+//import org.apache.iotdb.db.engine.StorageEngine;
+//import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+//import org.apache.iotdb.db.exception.MetadataErrorException;
+//import org.apache.iotdb.db.exception.PathErrorException;
+//import org.apache.iotdb.db.exception.ProcessorException;
+//import org.apache.iotdb.db.exception.StorageEngineException;
+//import org.apache.iotdb.db.metadata.MManager;
+//import org.apache.iotdb.db.metadata.MetadataConstant;
+//import org.apache.iotdb.db.metadata.MetadataOperationType;
+//import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
+//import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+//import org.apache.iotdb.db.sync.sender.conf.Constans;
+//import org.apache.iotdb.db.utils.FilePathUtils;
+//import org.apache.iotdb.db.utils.SyncUtils;
+//import org.apache.iotdb.service.sync.thrift.SyncDataStatus;
+//import org.apache.iotdb.service.sync.thrift.SyncService;
+//import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+//import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+//import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
+//import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
+//import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+//import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+//import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+//import org.apache.iotdb.tsfile.read.ReadOnlyTsFile;
+//import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+//import org.apache.iotdb.tsfile.read.common.Field;
+//import org.apache.iotdb.tsfile.read.common.Path;
+//import org.apache.iotdb.tsfile.read.common.RowRecord;
+//import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+//import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//public class SyncServiceImplBackup implements SyncService.Iface {
+//
+//  private static final Logger logger = LoggerFactory.getLogger(SyncServiceImplBackup.class);
+//
+//  private static final StorageEngine STORAGE_GROUP_MANAGER = StorageEngine.getInstance();
+//  /**
+//   * Metadata manager
+//   **/
+//  private static final MManager metadataManger = MManager.getInstance();
+//
+//  private static final String SYNC_SERVER = Constans.SYNC_RECEIVER;
+//
+//  private ThreadLocal<String> uuid = new ThreadLocal<>();
+//  /**
+//   * String means storage group,List means the set of new files(path) in local IoTDB and String
+//   * means path of new Files
+//   **/
+//  private ThreadLocal<Map<String, List<String>>> fileNodeMap = new ThreadLocal<>();
+//  /**
+//   * Map String1 means timeseries String2 means path of new Files, long means startTime
+//   **/
+//  private ThreadLocal<Map<String, Map<String, Long>>> fileNodeStartTime = new ThreadLocal<>();
+//  /**
+//   * Map String1 means timeseries String2 means path of new Files, long means endTime
+//   **/
+//  private ThreadLocal<Map<String, Map<String, Long>>> fileNodeEndTime = new ThreadLocal<>();
+//
+//  /**
+//   * Total num of files that needs to be loaded
+//   */
+//  private ThreadLocal<Integer> fileNum = new ThreadLocal<>();
+//
+//  /**
+//   * IoTDB config
+//   **/
+//  private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+//
+//  /**
+//   * IoTDB data directory
+//   **/
+//  private String baseDir = config.getBaseDir();
+//
+//  /**
+//   * IoTDB  multiple bufferWrite directory
+//   **/
+//  private String[] bufferWritePaths = config.getDataDirs();
+//
+//  /**
+//   * The path to store metadata file of sender
+//   */
+//  private ThreadLocal<String> schemaFromSenderPath = new ThreadLocal<>();
+//
+//  /**
+//   * Sync folder path of server
+//   **/
+//  private String syncFolderPath;
+//
+//  /**
+//   * Sync data path of server
+//   */
+//  private String syncDataPath;
+//
+//  /**
+//   * Init threadLocal variable and delete old useless files.
+//   */
+//  @Override
+//  public boolean init(String storageGroup) {
+//    logger.info("Sync process starts to receive data of storage group {}", storageGroup);
+//    fileNum.set(0);
+//    fileNodeMap.set(new HashMap<>());
+//    fileNodeStartTime.set(new HashMap<>());
+//    fileNodeEndTime.set(new HashMap<>());
+//    try {
+//      FileUtils.deleteDirectory(new File(syncDataPath));
+//    } catch (IOException e) {
+//      logger.error("cannot delete directory {} ", syncFolderPath);
+//      return false;
+//    }
+//    for (String bufferWritePath : bufferWritePaths) {
+//      bufferWritePath = FilePathUtils.regularizePath(bufferWritePath);
+//      String backupPath = bufferWritePath + SYNC_SERVER + File.separator;
+//      File backupDirectory = new File(backupPath, this.uuid.get());
+//      if (backupDirectory.exists() && backupDirectory.list().length != 0) {
+//        try {
+//          FileUtils.deleteDirectory(backupDirectory);
+//        } catch (IOException e) {
+//          logger.error("cannot delete directory {} ", syncFolderPath);
+//          return false;
+//        }
+//      }
+//    }
+//    return true;
+//  }
+//
+//  /**
+//   * Verify IP address of sender
+//   */
+//  @Override
+//  public boolean checkIdentity(String uuid, String ipAddress) {
+//    Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName());
+//    this.uuid.set(uuid);
+//    initPath();
+//    return SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress);
+//  }
+//
+//  /**
+//   * Init file path and clear data if last sync process failed.
+//   */
+//  private void initPath() {
+//    baseDir = FilePathUtils.regularizePath(baseDir);
+//    syncFolderPath = baseDir + SYNC_SERVER + File.separatorChar + this.uuid.get();
+//    syncDataPath = syncFolderPath + File.separatorChar + Constans.DATA_SNAPSHOT_NAME;
+//    schemaFromSenderPath
+//        .set(syncFolderPath + File.separator + MetadataConstant.METADATA_LOG);
+//  }
+//
+//  /**
+//   * Acquire schema from sender
+//   *
+//   * @param status: FINIFSH_STATUS, SUCCESS_STATUS or PROCESSING_STATUS. status = FINISH_STATUS :
+//   * finish receiving schema file, start to sync schema. status = PROCESSING_STATUS : the schema
+//   * file has not received completely.SUCCESS_STATUS: load metadata.
+//   */
+//  @Override
+//  public String syncSchema(String md5, ByteBuffer schema, SyncDataStatus status) {
+//    String md5OfReceiver = Boolean.toString(Boolean.TRUE);
+//    if (status == SyncDataStatus.SUCCESS_STATUS) {
+//      /** sync metadata, include storage group and timeseries **/
+//      return Boolean.toString(loadMetadata());
+//    } else if (status == SyncDataStatus.PROCESSING_STATUS) {
+//      File file = new File(schemaFromSenderPath.get());
+//      if (!file.getParentFile().exists()) {
+//        try {
+//          file.getParentFile().mkdirs();
+//          file.createNewFile();
+//        } catch (IOException e) {
+//          logger.error("Cannot make schema file {}.", file.getPath(), e);
+//          md5OfReceiver = Boolean.toString(Boolean.FALSE);
+//        }
+//      }
+//      try (FileOutputStream fos = new FileOutputStream(file, true);
+//          FileChannel channel = fos.getChannel()) {
+//        channel.write(schema);
+//      } catch (Exception e) {
+//        logger.error("Cannot insert data to file {}.", file.getPath(), e);
+//        md5OfReceiver = Boolean.toString(Boolean.FALSE);
+//      }
+//    } else {
+//      try (FileInputStream fis = new FileInputStream(schemaFromSenderPath.get())) {
+//        MessageDigest md = MessageDigest.getInstance("MD5");
+//        byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE];
+//        int n;
+//        while ((n = fis.read(buffer)) != -1) {
+//          md.update(buffer, 0, n);
+//        }
+//        md5OfReceiver = (new BigInteger(1, md.digest())).toString(16);
+//        if (!md5.equals(md5OfReceiver)) {
+//          FileUtils.forceDelete(new File(schemaFromSenderPath.get()));
+//        }
+//      } catch (Exception e) {
+//        logger.error("Receiver cannot generate md5 {}", schemaFromSenderPath.get(), e);
+//      }
+//    }
+//    return md5OfReceiver;
+//  }
+//
+//  /**
+//   * Load metadata from sender
+//   */
+//  private boolean loadMetadata() {
+//    if (new File(schemaFromSenderPath.get()).exists()) {
+//      try (BufferedReader br = new BufferedReader(
+//          new java.io.FileReader(schemaFromSenderPath.get()))) {
+//        String metadataOperation;
+//        while ((metadataOperation = br.readLine()) != null) {
+//          operation(metadataOperation);
+//        }
+//      } catch (FileNotFoundException e) {
+//        logger.error("Cannot read the file {}.",
+//            schemaFromSenderPath.get(), e);
+//        return false;
+//      } catch (IOException e) {
+//        /** multiple insert schema, ignore it **/
+//      } catch (Exception e) {
+//        logger.error("Parse metadata operation failed.", e);
+//        return false;
+//      }
+//    }
+//    return true;
+//  }
+//
+//  /**
+//   * Operate metadata operation in MManager
+//   *
+//   * @param cmd metadata operation
+//   */
+//  private void operation(String cmd)
+//      throws PathErrorException, IOException, MetadataErrorException {
+//    String[] args = cmd.trim().split(",");
+//    switch (args[0]) {
+//      case MetadataOperationType.ADD_PATH_TO_MTREE:
+//        Map<String, String> props;
+//        String[] kv;
+//        props = new HashMap<>(args.length - 5 + 1, 1);
+//        for (int k = 5; k < args.length; k++) {
+//          kv = args[k].split("=");
+//          props.put(kv[0], kv[1]);
+//        }
+//        metadataManger.addPathToMTree(new Path(args[1]), TSDataType.deserialize(Short.valueOf(args[2])),
+//            TSEncoding.deserialize(Short.valueOf(args[3])),
+//            CompressionType.deserialize(Short.valueOf(args[4])),
+//            props);
+//        break;
+//      case MetadataOperationType.DELETE_PATH_FROM_MTREE:
+//        metadataManger.deletePaths(Collections.singletonList(new Path(args[1])));
+//        break;
+//      case MetadataOperationType.SET_STORAGE_LEVEL_TO_MTREE:
+//        metadataManger.setStorageLevelToMTree(args[1]);
+//        break;
+//      case MetadataOperationType.ADD_A_PTREE:
+//        metadataManger.addAPTree(args[1]);
+//        break;
+//      case MetadataOperationType.ADD_A_PATH_TO_PTREE:
+//        metadataManger.addPathToPTree(args[1]);
+//        break;
+//      case MetadataOperationType.DELETE_PATH_FROM_PTREE:
+//        metadataManger.deletePathFromPTree(args[1]);
+//        break;
+//      case MetadataOperationType.LINK_MNODE_TO_PTREE:
+//        metadataManger.linkMNodeToPTree(args[1], args[2]);
+//        break;
+//      case MetadataOperationType.UNLINK_MNODE_FROM_PTREE:
+//        metadataManger.unlinkMNodeFromPTree(args[1], args[2]);
+//        break;
+//      default:
+//        logger.error("Unrecognizable command {}", cmd);
+//    }
+//  }
+//
+//  /**
+//   * Start receiving tsfile from sender
+//   *
+//   * @param status status = SUCCESS_STATUS : finish receiving one tsfile status = PROCESSING_STATUS
+//   * : tsfile has not received completely.
+//   */
+//  @Override
+//  public String syncData(String md5OfSender, List<String> filePathSplit,
+//      ByteBuffer dataToReceive, SyncDataStatus status) {
+//    String md5OfReceiver = Boolean.toString(Boolean.TRUE);
+//    FileChannel channel;
+//    /** Recombination File Path **/
+//    String filePath = StringUtils.join(filePathSplit, File.separatorChar);
+//    syncDataPath = FilePathUtils.regularizePath(syncDataPath);
+//    filePath = syncDataPath + filePath;
+//    if (status == SyncDataStatus.PROCESSING_STATUS) { // there are still data stream to add
+//      File file = new File(filePath);
+//      if (!file.getParentFile().exists()) {
+//        try {
+//          file.getParentFile().mkdirs();
+//          file.createNewFile();
+//        } catch (IOException e) {
+//          logger.error("cannot make file {}", file.getPath(), e);
+//          md5OfReceiver = Boolean.toString(Boolean.FALSE);
+//        }
+//      }
+//      try (FileOutputStream fos = new FileOutputStream(file, true)) {// append new data
+//        channel = fos.getChannel();
+//        channel.write(dataToReceive);
+//      } catch (IOException e) {
+//        logger.error("cannot insert data to file {}", file.getPath(), e);
+//        md5OfReceiver = Boolean.toString(Boolean.FALSE);
+//
+//      }
+//    } else { // all data in the same file has received successfully
+//      try (FileInputStream fis = new FileInputStream(filePath)) {
+//        MessageDigest md = MessageDigest.getInstance("MD5");
+//        byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE];
+//        int n;
+//        while ((n = fis.read(buffer)) != -1) {
+//          md.update(buffer, 0, n);
+//        }
+//        md5OfReceiver = (new BigInteger(1, md.digest())).toString(16);
+//        if (md5OfSender.equals(md5OfReceiver)) {
+//          fileNum.set(fileNum.get() + 1);
+//
+//          logger.info(String.format("Receiver has received %d files from sender", fileNum.get()));
+//        } else {
+//          FileUtils.forceDelete(new File(filePath));
+//        }
+//      } catch (Exception e) {
+//        logger.error("Receiver cannot generate md5 {}", filePath, e);
+//      }
+//    }
+//    return md5OfReceiver;
+//  }
+//
+//
+//  @Override
+//  public boolean load() {
+//    try {
+//      getFileNodeInfo();
+//      loadData();
+//    } catch (Exception e) {
+//      logger.error("fail to load data", e);
+//      return false;
+//    }
+//    return true;
+//  }
+//
+//  /**
+//   * Get all tsfiles' info which are sent from sender, it is preparing for merging these data
+//   */
+//  public void getFileNodeInfo() throws IOException {
+//    File dataFileRoot = new File(syncDataPath);
+//    File[] files = dataFileRoot.listFiles();
+//    int processedNum = 0;
+//    for (File storageGroupPB : files) {
+//      List<String> filesPath = new ArrayList<>();
+//      File[] filesSG = storageGroupPB.listFiles();
+//      for (File fileTF : filesSG) { // fileTF means TsFiles
+//        Map<String, Long> startTimeMap = new HashMap<>();
+//        Map<String, Long> endTimeMap = new HashMap<>();
+//        TsFileSequenceReader reader = null;
+//        try {
+//          reader = new TsFileSequenceReader(fileTF.getPath());
+//          Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
+//          Iterator<String> it = deviceIdMap.keySet().iterator();
+//          while (it.hasNext()) {
+//            String key = it.next();
+//            TsDeviceMetadataIndex device = deviceIdMap.get(key);
+//            startTimeMap.put(key, device.getStartTime());
+//            endTimeMap.put(key, device.getEndTime());
+//          }
+//        } catch (IOException e) {
+//          logger.error("Unable to read tsfile {}", fileTF.getPath());
+//          throw new IOException(e);
+//        } finally {
+//          try {
+//            if (reader != null) {
+//              reader.close();
+//            }
+//          } catch (IOException e) {
+//            logger.error("Cannot close tsfile stream {}", fileTF.getPath());
+//            throw new IOException(e);
+//          }
+//        }
+//        fileNodeStartTime.get().put(fileTF.getPath(), startTimeMap);
+//        fileNodeEndTime.get().put(fileTF.getPath(), endTimeMap);
+//        filesPath.add(fileTF.getPath());
+//        processedNum++;
+//        logger.info(String
+//            .format("Get tsfile info has complete : %d/%d", processedNum, fileNum.get()));
+//        fileNodeMap.get().put(storageGroupPB.getName(), filesPath);
+//      }
+//    }
+//  }
+//
+//
+//  /**
+//   * It is to merge data. If data in the tsfile is new, append the tsfile to the storage group
+//   * directly. If data in the tsfile is old, it has two strategy to merge.It depends on the
+//   * possibility of updating historical data.
+//   */
+//  public void loadData() throws StorageEngineException {
+//    syncDataPath = FilePathUtils.regularizePath(syncDataPath);
+//    int processedNum = 0;
+//    for (String storageGroup : fileNodeMap.get().keySet()) {
+//      List<String> filesPath = fileNodeMap.get().get(storageGroup);
+//      /**  before load external tsFile, it is necessary to order files in the same storage group **/
+//      Collections.sort(filesPath, (o1, o2) -> {
+//        Map<String, Long> startTimePath1 = fileNodeStartTime.get().get(o1);
+//        Map<String, Long> endTimePath2 = fileNodeEndTime.get().get(o2);
+//        for (Entry<String, Long> entry : endTimePath2.entrySet()) {
+//          if (startTimePath1.containsKey(entry.getKey())) {
+//            if (startTimePath1.get(entry.getKey()) > entry.getValue()) {
+//              return 1;
+//            } else {
+//              return -1;
+//            }
+//          }
+//        }
+//        return 0;
+//      });
+//
+//      for (String path : filesPath) {
+//        // get startTimeMap and endTimeMap
+//        Map<String, Long> startTimeMap = fileNodeStartTime.get().get(path);
+//        Map<String, Long> endTimeMap = fileNodeEndTime.get().get(path);
+//
+//        // create a new fileNode
+//        String header = syncDataPath;
+//        String relativePath = path.substring(header.length());
+//        TsFileResource fileNode = new TsFileResource(
+//            new File(DirectoryManager.getInstance().getNextFolderIndexForSequenceFile() +
+//                File.separator + relativePath), startTimeMap, endTimeMap
+//        );
+//        // call interface of load external file
+//        try {
+//          if (!STORAGE_GROUP_MANAGER.appendFileToStorageGroupProcessor(storageGroup, fileNode, path)) {
+//            // it is a file with unsequence data
+//            if (config.isUpdateHistoricalDataPossibility()) {
+//              loadOldData(path);
+//            } else {
+//              List<String> overlapFiles = STORAGE_GROUP_MANAGER.getOverlapFiles(
+//                  storageGroup,
+//                  fileNode, uuid.get());
+//              if (overlapFiles.isEmpty()) {
+//                loadOldData(path);
+//              } else {
+//                loadOldData(path, overlapFiles);
+//              }
+//            }
+//          }
+//        } catch (StorageEngineException | IOException | ProcessorException e) {
+//          logger.error("Can not load external file {}", path);
+//          throw new StorageEngineException(e);
+//        }
+//        processedNum++;
+//        logger.info(String
+//            .format("Merging files has completed : %d/%d", processedNum, fileNum.get()));
+//      }
+//    }
+//  }
+//
+//  /**
+//   * Insert all data in the tsfile into IoTDB.
+//   */
+//  public void loadOldData(String filePath) throws IOException, ProcessorException {
+//    Set<String> timeseriesSet = new HashSet<>();
+//    TsFileSequenceReader reader = null;
+//    QueryProcessExecutor insertExecutor = new QueryProcessExecutor();
+//    try {
+//      /** use tsfile reader to get data **/
+//      reader = new TsFileSequenceReader(filePath);
+//      Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
+//      Iterator<Entry<String, TsDeviceMetadataIndex>> entryIterator = deviceIdMap.entrySet()
+//          .iterator();
+//      while (entryIterator.hasNext()) {
+//        Entry<String, TsDeviceMetadataIndex> deviceMIEntry = entryIterator.next();
+//        String deviceId = deviceMIEntry.getKey();
+//        TsDeviceMetadataIndex deviceMI = deviceMIEntry.getValue();
+//        TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(deviceMI);
+//        List<ChunkGroupMetaData> rowGroupMetadataList = deviceMetadata.getChunkGroupMetaDataList();
+//        timeseriesSet.clear();
+//        /** firstly, get all timeseries in the same device **/
+//        for (ChunkGroupMetaData chunkGroupMetaData : rowGroupMetadataList) {
+//          List<ChunkMetaData> chunkMetaDataList = chunkGroupMetaData
+//              .getChunkMetaDataList();
+//          for (ChunkMetaData chunkMetaData : chunkMetaDataList) {
+//            String measurementUID = chunkMetaData.getMeasurementUid();
+//            measurementUID = deviceId + "." + measurementUID;
+//            timeseriesSet.add(measurementUID);
+//          }
+//        }
+//        /** Secondly, use tsFile Reader to form InsertPlan **/
+//        ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader);
+//        List<Path> paths = new ArrayList<>();
+//        paths.clear();
+//        for (String timeseries : timeseriesSet) {
+//          paths.add(new Path(timeseries));
+//        }
+//        QueryExpression queryExpression = QueryExpression.create(paths, null);
+//        QueryDataSet queryDataSet = readTsFile.query(queryExpression);
+//        while (queryDataSet.hasNext()) {
+//          RowRecord record = queryDataSet.next();
+//          List<Field> fields = record.getFields();
+//          List<String> measurementList = new ArrayList<>();
+//          List<String> insertValues = new ArrayList<>();
+//          for (int i = 0; i < fields.size(); i++) {
+//            Field field = fields.get(i);
+//            if (!field.isNull()) {
+//              measurementList.add(paths.get(i).getMeasurement());
+//              if (fields.get(i).getDataType() == TSDataType.TEXT) {
+//                insertValues.add(String.format("'%s'", field.toString()));
+//              } else {
+//                insertValues.add(String.format("%s", field.toString()));
+//              }
+//            }
+//          }
+//          if (insertExecutor.insert(new InsertPlan(deviceId, record.getTimestamp(),
+//              measurementList.toArray(new String[0]), insertValues.toArray(new String[0])))) {
+//            throw new IOException("Inserting series data to IoTDB engine has failed.");
+//          }
+//        }
+//      }
+//    } catch (IOException e) {
+//      logger.error("Can not parse tsfile into SQL", e);
+//      throw new IOException(e);
+//    } catch (ProcessorException e) {
+//      logger.error("Meet error while processing non-query.");
+//      throw new ProcessorException(e);
+//    } finally {
+//      try {
+//        if (reader != null) {
+//          reader.close();
+//        }
+//      } catch (IOException e) {
+//        logger.error("Cannot close file stream {}", filePath, e);
+//      }
+//    }
+//  }
+//
+//  /**
+//   * Insert those valid data in the tsfile into IoTDB
+//   *
+//   * @param overlapFiles:files which are conflict with the sync file
+//   */
+//  public void loadOldData(String filePath, List<String> overlapFiles)
+//      throws IOException, ProcessorException {
+//    Set<String> timeseriesList = new HashSet<>();
+//    QueryProcessExecutor insertExecutor = new QueryProcessExecutor();
+//    Map<String, ReadOnlyTsFile> tsfilesReaders = openReaders(filePath, overlapFiles);
+//    try {
+//      TsFileSequenceReader reader = new TsFileSequenceReader(filePath);
+//      Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
+//      Iterator<String> it = deviceIdMap.keySet().iterator();
+//      while (it.hasNext()) {
+//        String deviceID = it.next();
+//        TsDeviceMetadataIndex deviceMI = deviceIdMap.get(deviceID);
+//        TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(deviceMI);
+//        List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata
+//            .getChunkGroupMetaDataList();
+//        timeseriesList.clear();
+//        /** firstly, get all timeseries in the same device **/
+//        for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
+//          List<ChunkMetaData> chunkMetaDataList = chunkGroupMetaData.getChunkMetaDataList();
+//          for (ChunkMetaData timeSeriesChunkMetaData : chunkMetaDataList) {
+//            String measurementUID = timeSeriesChunkMetaData.getMeasurementUid();
+//            measurementUID = deviceID + "." + measurementUID;
+//            timeseriesList.add(measurementUID);
+//          }
+//        }
+//        reader.close();
+//
+//        /** secondly, use tsFile Reader to form SQL **/
+//        ReadOnlyTsFile readOnlyTsFile = tsfilesReaders.get(filePath);
+//        List<Path> paths = new ArrayList<>();
+//        /** compare data with one timeseries in a round to get valid data **/
+//        for (String timeseries : timeseriesList) {
+//          paths.clear();
+//          paths.add(new Path(timeseries));
+//          Set<InsertPlan> originDataPoints = new HashSet<>();
+//          QueryExpression queryExpression = QueryExpression.create(paths, null);
+//          QueryDataSet queryDataSet = readOnlyTsFile.query(queryExpression);
+//          Set<InsertPlan> newDataPoints = convertToInserPlans(queryDataSet, paths, deviceID);
+//
+//          /** get all data with the timeseries in all overlap files. **/
+//          for (String overlapFile : overlapFiles) {
+//            ReadOnlyTsFile readTsFileOverlap = tsfilesReaders.get(overlapFile);
+//            QueryDataSet queryDataSetOverlap = readTsFileOverlap.query(queryExpression);
+//            originDataPoints.addAll(convertToInserPlans(queryDataSetOverlap, paths, deviceID));
+//          }
+//
+//          /** If there has no overlap data with the timeseries, inserting all data in the sync file **/
+//          if (originDataPoints.isEmpty()) {
+//            for (InsertPlan insertPlan : newDataPoints) {
+//              if (insertExecutor.insert(insertPlan)) {
+//                throw new IOException("Inserting series data to IoTDB engine has failed.");
+//              }
+//            }
+//          } else {
+//            /** Compare every data to get valid data **/
+//            for (InsertPlan insertPlan : newDataPoints) {
+//              if (!originDataPoints.contains(insertPlan)) {
+//                if (insertExecutor.insert(insertPlan)) {
+//                  throw new IOException("Inserting series data to IoTDB engine has failed.");
+//                }
+//              }
+//            }
+//          }
+//        }
+//      }
+//    } catch (IOException e) {
+//      logger.error("Can not parse tsfile into SQL", e);
+//      throw new IOException(e);
+//    } catch (ProcessorException e) {
+//      logger.error("Meet error while processing non-query.", e);
+//      throw new ProcessorException(e);
+//    } finally {
+//      try {
+//        closeReaders(tsfilesReaders);
+//      } catch (IOException e) {
+//        logger.error("Cannot close file stream {}", filePath, e);
+//      }
+//    }
+//  }
+//
+//  private Set<InsertPlan> convertToInserPlans(QueryDataSet queryDataSet, List<Path> paths, String deviceID) throws IOException {
+//    Set<InsertPlan> plans = new HashSet<>();
+//    while (queryDataSet.hasNext()) {
+//      RowRecord record = queryDataSet.next();
+//      List<Field> fields = record.getFields();
+//      /** get all data with the timeseries in the sync file **/
+//      for (int i = 0; i < fields.size(); i++) {
+//        Field field = fields.get(i);
+//        String[] measurementList = new String[1];
+//        if (!field.isNull()) {
+//          measurementList[0] = paths.get(i).getMeasurement();
+//          InsertPlan insertPlan = new InsertPlan(deviceID, record.getTimestamp(),
+//              measurementList, new String[]{field.getDataType() == TSDataType.TEXT ? String.format("'%s'", field.toString())
+//              : field.toString()});
+//          plans.add(insertPlan);
+//        }
+//      }
+//    }
+//    return plans;
+//  }
+//
+//  /**
+//   * Open all tsfile reader and cache
+//   */
+//  private Map<String, ReadOnlyTsFile> openReaders(String filePath, List<String> overlapFiles)
+//      throws IOException {
+//    Map<String, ReadOnlyTsFile> tsfileReaders = new HashMap<>();
+//    tsfileReaders.put(filePath, new ReadOnlyTsFile(new TsFileSequenceReader(filePath)));
+//    for (String overlapFile : overlapFiles) {
+//      tsfileReaders.put(overlapFile, new ReadOnlyTsFile(new TsFileSequenceReader(overlapFile)));
+//    }
+//    return tsfileReaders;
+//  }
+//
+//  /**
+//   * Close all tsfile reader
+//   */
+//  private void closeReaders(Map<String, ReadOnlyTsFile> readers) throws IOException {
+//    for (ReadOnlyTsFile tsfileReader : readers.values()) {
+//      tsfileReader.close();
+//    }
+//  }
+//
+//  /**
+//   * Release threadLocal variable resources
+//   */
+//  @Override
+//  public void cleanUp() {
+//    uuid.remove();
+//    fileNum.remove();
+//    fileNodeMap.remove();
+//    fileNodeStartTime.remove();
+//    fileNodeEndTime.remove();
+//    schemaFromSenderPath.remove();
+//    try {
+//      FileUtils.deleteDirectory(new File(syncFolderPath));
+//    } catch (IOException e) {
+//      logger.error("can not delete directory {}", syncFolderPath, e);
+//    }
+//    logger.info("Synchronization has finished!");
+//  }
+//
+//  public Map<String, List<String>> getFileNodeMap() {
+//    return fileNodeMap.get();
+//  }
+//
+//  public void setFileNodeMap(Map<String, List<String>> fileNodeMap) {
+//    this.fileNodeMap.set(fileNodeMap);
+//  }
+//
+//}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/SyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/SyncFileManager.java
deleted file mode 100644
index 18c3b60..0000000
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/SyncFileManager.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.sender;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.sync.sender.conf.Constans;
-import org.apache.iotdb.db.sync.sender.conf.SyncSenderConfig;
-import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * SyncFileManager is used to pick up those tsfiles need to sync.
- */
-public class SyncFileManager {
-
-  private static final Logger logger = LoggerFactory.getLogger(SyncFileManager.class);
-
-  /**
-   * Files that need to be synchronized
-   **/
-  private Map<String, Set<String>> validAllFiles = new HashMap<>();
-
-  /**
-   * All tsfiles in last synchronization process
-   **/
-  private Set<String> lastLocalFiles = new HashSet<>();
-
-  /**
-   * All tsfiles in data directory
-   **/
-  private Map<String, Set<String>> currentLocalFiles = new HashMap<>();
-
-  private SyncSenderConfig syncConfig = SyncSenderDescriptor.getInstance().getConfig();
-
-  private IoTDBConfig systemConfig = IoTDBDescriptor.getInstance().getConfig();
-
-  private static final String RESTORE_SUFFIX = ".restore";
-
-  private SyncFileManager() {
-  }
-
-  public static final SyncFileManager getInstance() {
-    return FileManagerHolder.INSTANCE;
-  }
-
-  /**
-   * Initialize SyncFileManager.
-   */
-  public void init() throws IOException {
-    validAllFiles.clear();
-    lastLocalFiles.clear();
-    currentLocalFiles.clear();
-    getLastLocalFileList(syncConfig.getLastFileInfo());
-    getCurrentLocalFileList(systemConfig.getDataDirs());
-    getValidFileList();
-  }
-
-  /**
-   * get files that needs to be synchronized
-   */
-  public void getValidFileList() {
-    for (Entry<String, Set<String>> entry : currentLocalFiles.entrySet()) {
-      for (String path : entry.getValue()) {
-        if (!lastLocalFiles.contains(path)) {
-          validAllFiles.get(entry.getKey()).add(path);
-        }
-      }
-    }
-    logger.info("Acquire list of valid files.");
-    for (Entry<String, Set<String>> entry : validAllFiles.entrySet()) {
-      for (String path : entry.getValue()) {
-        currentLocalFiles.get(entry.getKey()).remove(path);
-      }
-    }
-  }
-
-  /**
-   * get last local file list.
-   *
-   * @param path path
-   */
-  public void getLastLocalFileList(String path) throws IOException {
-    Set<String> fileList = new HashSet<>();
-    File file = new File(path);
-    if (!file.exists()) {
-      try {
-        file.createNewFile();
-      } catch (IOException e) {
-        throw new IOException("Cannot get last local file list", e);
-      }
-    } else {
-      try (BufferedReader bf = new BufferedReader(new FileReader(file))) {
-        String fileName;
-        while ((fileName = bf.readLine()) != null) {
-          fileList.add(fileName);
-        }
-      } catch (IOException e) {
-        logger.error("Cannot get last local file list when reading file {}.",
-            syncConfig.getLastFileInfo());
-        throw new IOException(e);
-      }
-    }
-    lastLocalFiles = fileList;
-  }
-
-  /**
-   * get current local file list.
-   *
-   * @param paths paths in String[] structure
-   */
-  public void getCurrentLocalFileList(String[] paths) {
-    for (String path : paths) {
-      if (!new File(path).exists()) {
-        continue;
-      }
-      File[] listFiles = new File(path).listFiles();
-      for (File storageGroup : listFiles) {
-        if (!storageGroup.isDirectory() || storageGroup.getName().equals(Constans.SYNC_CLIENT)) {
-          continue;
-        }
-        getStorageGroupFiles(storageGroup);
-      }
-    }
-  }
-
-  private void getStorageGroupFiles(File storageGroup) {
-    if (!currentLocalFiles.containsKey(storageGroup.getName())) {
-      currentLocalFiles.put(storageGroup.getName(), new HashSet<>());
-    }
-    if (!validAllFiles.containsKey(storageGroup.getName())) {
-      validAllFiles.put(storageGroup.getName(), new HashSet<>());
-    }
-    File[] files = storageGroup.listFiles();
-    for (File file : files) {
-      if (!file.getPath().endsWith(RESTORE_SUFFIX) && !new File(
-          file.getPath() + RESTORE_SUFFIX).exists()) {
-        currentLocalFiles.get(storageGroup.getName()).add(file.getPath());
-      }
-    }
-  }
-
-  /**
-   * backup current local file information.
-   *
-   * @param backupFile backup file path
-   */
-  public void backupNowLocalFileInfo(String backupFile) {
-    try (BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(backupFile))) {
-      for (Entry<String, Set<String>> entry : currentLocalFiles.entrySet()) {
-        for (String file : entry.getValue()) {
-          bufferedWriter.write(file + "\n");
-        }
-      }
-    } catch (IOException e) {
-      logger.error("Cannot back up current local file info", e);
-    }
-  }
-
-  public Map<String, Set<String>> getValidAllFiles() {
-    return validAllFiles;
-  }
-
-  public Set<String> getLastLocalFiles() {
-    return lastLocalFiles;
-  }
-
-  public Map<String, Set<String>> getCurrentLocalFiles() {
-    return currentLocalFiles;
-  }
-
-  public void setCurrentLocalFiles(Map<String, Set<String>> newNowLocalFiles) {
-    currentLocalFiles = newNowLocalFiles;
-  }
-
-  private static class FileManagerHolder {
-
-    private static final SyncFileManager INSTANCE = new SyncFileManager();
-  }
-}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
index c8e2ce2..05b67c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
@@ -24,15 +24,18 @@ public class Constans {
   }
 
   public static final String CONFIG_NAME = "iotdb-sync-client.properties";
-  public static final String SYNC_CLIENT = "sync-client";
-  public static final String SYNC_SERVER = "sync-server";
+  public static final String SYNC_SENDER = "sync-sender";
+  public static final String SYNC_RECEIVER = "sync-receiver";
 
-  public static final String LOCK_FILE_NAME = "sync-lock";
-  public static final String UUID_FILE_NAME = "uuid.txt";
+  public static final String LOCK_FILE_NAME = "sync_lock";
+  public static final String SCHEMA_POS_FILE_NAME = "sync_schema_pos";
   public static final String LAST_LOCAL_FILE_NAME = "last_local_files.txt";
-  public static final String DATA_SNAPSHOT_NAME = "data-snapshot";
+  public static final String DATA_SNAPSHOT_NAME = "snapshot";
+  public static final String SYNC_LOG_NAME = "sync.log";
+  public static final String CURRENT_SYNC_LOG_NAME = "current_sync.log";
 
-  public static final String BACK_UP_DIRECTORY_NAME = "backup";
+  public static final String MESSAGE_DIGIT_NAME = "MD5";
+  public static final String SYNC_DIR_NAME_SEPARATOR = "_";
 
   /**
    * Split data file , block size at each transmission
@@ -42,7 +45,7 @@ public class Constans {
   /**
    * Max try when syncing the same file to receiver fails.
    */
-  public static final int MAX_SYNC_FILE_TRY = 10;
+  public static final int MAX_SYNC_FILE_TRY = 5;
 
   private static final SyncSenderConfig CONFIG = SyncSenderDescriptor.getInstance().getConfig();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
index 572e5df..a076257 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
@@ -19,104 +19,35 @@
 package org.apache.iotdb.db.sync.sender.conf;
 
 import java.io.File;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.metadata.MetadataConstant;
-import org.apache.iotdb.db.utils.FilePathUtils;
 
 public class SyncSenderConfig {
 
-  private String[] seqFileDirectory = IoTDBDescriptor.getInstance().getConfig()
-      .getDataDirs();
-
-  private String dataDirectory = IoTDBDescriptor.getInstance().getConfig().getBaseDir();
-
-  private String lockFilePath;
-
-  private String uuidPath;
-
-  private String lastFileInfo;
-
-  private String[] snapshotPaths;
-
-  private String schemaPath;
-
   private String serverIp = "127.0.0.1";
 
   private int serverPort = 5555;
 
   private int syncPeriodInSecond = 10;
 
-  /**
-   * Init path
-   */
-  public void init() {
-    schemaPath = IoTDBDescriptor.getInstance().getConfig().getSystemDir() + File.separator + MetadataConstant.METADATA_LOG;
-    if (dataDirectory.length() > 0
-        && dataDirectory.charAt(dataDirectory.length() - 1) != File.separatorChar) {
-      dataDirectory += File.separatorChar;
-    }
-    lockFilePath =
-        dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + Constans.LOCK_FILE_NAME;
-    uuidPath = dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + Constans.UUID_FILE_NAME;
-    lastFileInfo =
-        dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + Constans.LAST_LOCAL_FILE_NAME;
-    snapshotPaths = new String[seqFileDirectory.length];
-    for (int i = 0; i < seqFileDirectory.length; i++) {
-      seqFileDirectory[i] = new File(seqFileDirectory[i]).getAbsolutePath();
-      seqFileDirectory[i] = FilePathUtils.regularizePath(seqFileDirectory[i]);
-      snapshotPaths[i] = seqFileDirectory[i] + Constans.SYNC_CLIENT + File.separatorChar
-          + Constans.DATA_SNAPSHOT_NAME
-          + File.separatorChar;
-    }
-
-  }
-
-  public String[] getSeqFileDirectory() {
-    return seqFileDirectory;
-  }
-
-  public void setSeqFileDirectory(String[] seqFileDirectory) {
-    this.seqFileDirectory = seqFileDirectory;
-  }
-
-  public String getDataDirectory() {
-    return dataDirectory;
-  }
-
-  public void setDataDirectory(String dataDirectory) {
-    this.dataDirectory = dataDirectory;
-  }
-
-  public String getUuidPath() {
-    return uuidPath;
-  }
-
-  public void setUuidPath(String uuidPath) {
-    this.uuidPath = uuidPath;
-  }
-
-  public String getLastFileInfo() {
-    return lastFileInfo;
-  }
+  private String senderPath;
 
-  public void setLastFileInfo(String lastFileInfo) {
-    this.lastFileInfo = lastFileInfo;
-  }
+  private String lockFilePath;
 
-  public String[] getSnapshotPaths() {
-    return snapshotPaths;
-  }
+  private String lastFileInfo;
 
-  public void setSnapshotPaths(String[] snapshotPaths) {
-    this.snapshotPaths = snapshotPaths;
-  }
+  private String snapshotPath;
 
-  public String getSchemaPath() {
-    return schemaPath;
-  }
-
-  public void setSchemaPath(String schemaPath) {
-    this.schemaPath = schemaPath;
+  /**
+   * Update paths based on data directory
+   */
+  public void update(String dataDirectory) {
+    senderPath = dataDirectory + File.separatorChar + Constans.SYNC_SENDER + File.separatorChar +
+        getSyncReceiverName();
+    lockFilePath = senderPath + File.separatorChar + Constans.LOCK_FILE_NAME;
+    lastFileInfo = senderPath + File.separatorChar + Constans.LAST_LOCAL_FILE_NAME;
+    snapshotPath = senderPath + File.separatorChar + Constans.DATA_SNAPSHOT_NAME;
+    if(!new File(snapshotPath).exists()){
+      new File(snapshotPath).mkdirs();
+    }
   }
 
   public String getServerIp() {
@@ -143,6 +74,14 @@ public class SyncSenderConfig {
     this.syncPeriodInSecond = syncPeriodInSecond;
   }
 
+  public String getSenderPath() {
+    return senderPath;
+  }
+
+  public void setSenderPath(String senderPath) {
+    this.senderPath = senderPath;
+  }
+
   public String getLockFilePath() {
     return lockFilePath;
   }
@@ -150,4 +89,24 @@ public class SyncSenderConfig {
   public void setLockFilePath(String lockFilePath) {
     this.lockFilePath = lockFilePath;
   }
+
+  public String getLastFileInfo() {
+    return lastFileInfo;
+  }
+
+  public void setLastFileInfo(String lastFileInfo) {
+    this.lastFileInfo = lastFileInfo;
+  }
+
+  public String getSnapshotPath() {
+    return snapshotPath;
+  }
+
+  public void setSnapshotPath(String snapshotPath) {
+    this.snapshotPath = snapshotPath;
+  }
+
+  public String getSyncReceiverName() {
+    return serverIp + Constans.SYNC_DIR_NAME_SEPARATOR + serverPort;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderDescriptor.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderDescriptor.java
index 2427c10..df1b834 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderDescriptor.java
@@ -25,7 +25,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.Properties;
 import org.apache.iotdb.db.conf.IoTDBConstant;
-import org.apache.iotdb.db.utils.FilePathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,8 +37,8 @@ public class SyncSenderDescriptor {
     loadProps();
   }
 
-  public static final SyncSenderDescriptor getInstance() {
-    return PostBackDescriptorHolder.INSTANCE;
+  public static SyncSenderDescriptor getInstance() {
+    return SyncSenderDescriptorHolder.INSTANCE;
   }
 
   public SyncSenderConfig getConfig() {
@@ -54,7 +53,6 @@ public class SyncSenderDescriptor {
    * load an properties file and set sync config variables
    */
   private void loadProps() {
-    conf.init();
     InputStream inputStream;
     String url = System.getProperty(IoTDBConstant.IOTDB_CONF, null);
     if (url == null) {
@@ -90,42 +88,20 @@ public class SyncSenderDescriptor {
       conf.setSyncPeriodInSecond(Integer.parseInt(properties
           .getProperty("sync_period_in_second",
               Integer.toString(conf.getSyncPeriodInSecond()))));
-      conf.setSchemaPath(properties.getProperty("iotdb_schema_directory", conf.getSchemaPath()));
-      conf.setDataDirectory(
-          properties.getProperty("iotdb_bufferWrite_directory", conf.getDataDirectory()));
-      String dataDirectory = conf.getDataDirectory();
-      if (dataDirectory.length() > 0
-          && dataDirectory.charAt(dataDirectory.length() - 1) != File.separatorChar) {
-        dataDirectory += File.separatorChar;
-      }
-      conf.setUuidPath(
-          dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + Constans.UUID_FILE_NAME);
-      conf.setLastFileInfo(
-          dataDirectory + Constans.SYNC_CLIENT + File.separatorChar
-              + Constans.LAST_LOCAL_FILE_NAME);
-      String[] sequenceFileDirectory = conf.getSeqFileDirectory();
-      String[] snapshots = new String[conf.getSeqFileDirectory().length];
-      for (int i = 0; i < conf.getSeqFileDirectory().length; i++) {
-        sequenceFileDirectory[i] = FilePathUtils.regularizePath(sequenceFileDirectory[i]);
-        snapshots[i] = sequenceFileDirectory[i] + Constans.SYNC_CLIENT + File.separatorChar
-            + Constans.DATA_SNAPSHOT_NAME + File.separatorChar;
-      }
-      conf.setSeqFileDirectory(sequenceFileDirectory);
-      conf.setSnapshotPaths(snapshots);
     } catch (IOException e) {
-      logger.warn("Cannot load config file because {}, use default configuration", e);
+      logger.warn("Cannot load config file, use default configuration.", e);
     } catch (Exception e) {
-      logger.warn("Error format in config file because {}, use default configuration", e);
+      logger.warn("Error format in config file, use default configuration.", e);
     } finally {
       try {
         inputStream.close();
       } catch (IOException e) {
-        logger.error("Fail to close sync config file input stream because ", e);
+        logger.error("Fail to close sync config file input stream.", e);
       }
     }
   }
 
-  private static class PostBackDescriptorHolder {
+  private static class SyncSenderDescriptorHolder {
 
     private static final SyncSenderDescriptor INSTANCE = new SyncSenderDescriptor();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/IFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
similarity index 69%
rename from server/src/main/java/org/apache/iotdb/db/sync/sender/manage/IFileManager.java
rename to server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
index b1ba08f..d4f7e33 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/IFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
@@ -18,8 +18,18 @@
  */
 package org.apache.iotdb.db.sync.sender.manage;
 
-public interface IFileManager {
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
 
+public interface ISyncFileManager {
 
+  void getCurrentLocalFiles(String dataDir);
 
+  void getLastLocalFiles(File lastLocalFile) throws IOException;
+
+  void getValidFiles(String dataDir) throws IOException;
+
+  void updateLastLocalFiles(File lastLocalFile, Set<String> localFiles);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
new file mode 100644
index 0000000..20f1dca
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.sender.manage;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SyncFileManager implements ISyncFileManager {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SyncFileManager.class);
+
+  private Map<String, Set<File>> currentSealedLocalFilesMap;
+
+  private Map<String, Set<File>> lastLocalFilesMap;
+
+  private Map<String, Set<File>> deletedFilesMap;
+
+  private Map<String, Set<File>> toBeSyncedFilesMap;
+
+  private SyncFileManager() {
+
+  }
+
+  public static final SyncFileManager getInstance() {
+    return SyncFileManagerHolder.INSTANCE;
+  }
+
+  @Override
+  public void getCurrentLocalFiles(String dataDir) {
+    LOGGER.info("Start to get current local files in data folder {}", dataDir);
+    // get all files in data dir sequence folder
+    Map<String, Set<File>> currentAllLocalFiles = new HashMap<>();
+    File[] allSGFolders = new File(
+        dataDir + File.separatorChar + IoTDBConstant.SEQUENCE_FLODER_NAME)
+        .listFiles();
+    for (File sgFolder : allSGFolders) {
+      currentAllLocalFiles.putIfAbsent(sgFolder.getName(), new HashSet<>());
+      Arrays.stream(sgFolder.listFiles()).forEach(file -> currentAllLocalFiles.get(sgFolder.getName())
+          .add(new File(sgFolder.getAbsolutePath(), file.getName())));
+    }
+
+    // get sealed tsfiles
+    currentSealedLocalFilesMap = new HashMap<>();
+    for (Entry<String, Set<File>> entry : currentAllLocalFiles.entrySet()) {
+      String sgName = entry.getKey();
+      currentSealedLocalFilesMap.putIfAbsent(sgName, new HashSet<>());
+      for (File file : entry.getValue()) {
+        if (file.getName().endsWith(ModificationFile.FILE_SUFFIX) || file.getName()
+            .endsWith(TsFileResource.RESOURCE_SUFFIX)) {
+          continue;
+        }
+        if (new File(file.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).exists() && !new File(
+            file.getAbsolutePath() + ModificationFile.FILE_SUFFIX).exists()) {
+          currentSealedLocalFilesMap.get(sgName).add(file);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void getLastLocalFiles(File lastLocalFileInfo) throws IOException {
+    LOGGER.info("Start to get last local files from last local file info {}",
+        lastLocalFileInfo.getAbsoluteFile());
+    lastLocalFilesMap = new HashMap<>();
+    try (BufferedReader reader = new BufferedReader(new FileReader(lastLocalFileInfo))) {
+      String fileName;
+      while ((fileName = reader.readLine()) != null) {
+        String sgName = new File(fileName).getParent();
+        lastLocalFilesMap.putIfAbsent(sgName, new HashSet<>());
+        lastLocalFilesMap.get(sgName).add(new File(fileName));
+      }
+    }
+  }
+
+  @Override
+  public void getValidFiles(String dataDir) throws IOException {
+    getCurrentLocalFiles(dataDir);
+    getLastLocalFiles(new File(SyncSenderDescriptor.getInstance().getConfig().getLastFileInfo()));
+    toBeSyncedFilesMap = new HashMap<>();
+    deletedFilesMap = new HashMap<>();
+    for(Entry<String, Set<File>> entry: currentSealedLocalFilesMap.entrySet()){
+      String sgName = entry.getKey();
+      toBeSyncedFilesMap.putIfAbsent(sgName, new HashSet<>());
+      deletedFilesMap.putIfAbsent(sgName, new HashSet<>());
+      for(File newFile:currentSealedLocalFilesMap.get(sgName)){
+        if(!lastLocalFilesMap.get(sgName).contains(newFile)){
+          toBeSyncedFilesMap.get(sgName).add(newFile);
+        }
+      }
+      for(File oldFile:lastLocalFilesMap.get(sgName)){
+        if(!currentSealedLocalFilesMap.get(sgName).contains(oldFile)){
+          deletedFilesMap.get(sgName).add(oldFile);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void updateLastLocalFiles(File lastLocalFile, Set<String> localFiles) {
+
+  }
+
+  public Map<String, Set<File>> getCurrentSealedLocalFilesMap() {
+    return currentSealedLocalFilesMap;
+  }
+
+  public Map<String, Set<File>> getLastLocalFilesMap() {
+    return lastLocalFilesMap;
+  }
+
+  public Map<String, Set<File>> getDeletedFilesMap() {
+    return deletedFilesMap;
+  }
+
+  public Map<String, Set<File>> getToBeSyncedFilesMap() {
+    return toBeSyncedFilesMap;
+  }
+
+  private static class SyncFileManagerHolder {
+
+    private static final SyncFileManager INSTANCE = new SyncFileManager();
+
+    private SyncFileManagerHolder() {
+
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
index c516936..15df693 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
@@ -18,18 +18,27 @@
  */
 package org.apache.iotdb.db.sync.sender.recover;
 
+import java.io.File;
+import java.io.IOException;
+
 public interface ISyncSenderLogger {
 
-  void startSyncDeletedFilesName();
+  void startSync() throws IOException;
+
+  void endSync() throws IOException;
+
+  void startSyncDeletedFilesName() throws IOException;
+
+  void finishSyncDeletedFileName(File file) throws IOException;
 
-  void finishSyncDeletedFileName(String fileName);
+  void endSyncDeletedFilsName() throws IOException;
 
-  void endSyncDeletedFilsName();
+  void startSyncTsFiles() throws IOException;
 
-  void startSyncTsFiles();
+  void finishSyncTsfile(File file) throws IOException;
 
-  void finishSyncTsfile(String fileName);
+  void endSyncTsFiles() throws IOException;
 
-  void endSyncTsFiles();
+  void close() throws IOException;
 
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java
new file mode 100644
index 0000000..8171d0f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java
@@ -0,0 +1,86 @@
+package org.apache.iotdb.db.sync.sender.recover;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+public class SyncSenderLogger implements ISyncSenderLogger {
+
+  public static final String SYNC_START = "sync start";
+  public static final String SYNC_END = "sync end";
+  public static final String SYNC_DELETED_FILE_NAME_START = "sync deleted file names start";
+  public static final String SYNC_DELETED_FILE_NAME_END = "sync deleted file names end";
+  public static final String SYNC_TSFILE_START = "sync tsfile start";
+  public static final String SYNC_TSFILE_END = "sync tsfile end";
+  private BufferedWriter bw;
+
+  public SyncSenderLogger(String filePath) throws IOException {
+    this.bw = new BufferedWriter(new FileWriter(filePath));
+  }
+
+  public SyncSenderLogger(File file) throws IOException {
+    this(file.getAbsolutePath());
+  }
+
+  @Override
+  public void startSync() throws IOException {
+    bw.write(SYNC_START);
+    bw.newLine();
+    bw.flush();
+  }
+
+  @Override
+  public void endSync() throws IOException {
+    bw.write(SYNC_END);
+    bw.newLine();
+    bw.flush();
+  }
+
+  @Override
+  public void startSyncDeletedFilesName() throws IOException {
+    bw.write(SYNC_DELETED_FILE_NAME_START);
+    bw.newLine();
+    bw.flush();
+  }
+
+  @Override
+  public void finishSyncDeletedFileName(File file) throws IOException {
+    bw.write(file.getAbsolutePath());
+    bw.newLine();
+    bw.flush();
+  }
+
+  @Override
+  public void endSyncDeletedFilsName() throws IOException {
+    bw.write(SYNC_DELETED_FILE_NAME_END);
+    bw.newLine();
+    bw.flush();
+  }
+
+  @Override
+  public void startSyncTsFiles() throws IOException {
+    bw.write(SYNC_TSFILE_START);
+    bw.newLine();
+    bw.flush();
+  }
+
+  @Override
+  public void finishSyncTsfile(File file) throws IOException {
+    bw.write(file.getAbsolutePath());
+    bw.newLine();
+    bw.flush();
+  }
+
+  @Override
+  public void endSyncTsFiles() throws IOException {
+    bw.write(SYNC_TSFILE_END);
+    bw.newLine();
+    bw.flush();
+  }
+
+  @Override
+  public void close() throws IOException {
+    bw.close();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
index 59dc27e..c616376 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
@@ -1,28 +1,27 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the License.  You may obtain
+ * a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied.  See the License for the specific language
+ * governing permissions and limitations under the License.
  */
 package org.apache.iotdb.db.sync.sender.transfer;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.FileReader;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.math.BigInteger;
@@ -33,27 +32,30 @@ import java.nio.file.FileSystems;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.security.MessageDigest;
-import java.util.ArrayList;
+import java.security.NoSuchAlgorithmException;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.SyncConnectionException;
-import org.apache.iotdb.db.sync.sender.SyncFileManager;
+import org.apache.iotdb.db.metadata.MetadataConstant;
 import org.apache.iotdb.db.sync.sender.conf.Constans;
 import org.apache.iotdb.db.sync.sender.conf.SyncSenderConfig;
 import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor;
+import org.apache.iotdb.db.sync.sender.manage.SyncFileManager;
+import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogger;
 import org.apache.iotdb.db.utils.SyncUtils;
+import org.apache.iotdb.service.sync.thrift.ResultStatus;
 import org.apache.iotdb.service.sync.thrift.SyncDataStatus;
 import org.apache.iotdb.service.sync.thrift.SyncService;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
@@ -70,29 +72,36 @@ public class DataTransferManager implements IDataTransferManager {
 
   private static final Logger logger = LoggerFactory.getLogger(DataTransferManager.class);
 
-  private TTransport transport;
+  private static SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig();
 
-  private SyncService.Client serviceClient;
+  private static final int BATCH_LINE = 1000;
 
-  private List<String> schema = new ArrayList<>();
+  private int schemaFileLinePos;
 
-  private static SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig();
+  private TTransport transport;
+
+  private SyncService.Client serviceClient;
 
   /**
    * Files that need to be synchronized
    */
-  private Map<String, Set<String>> validAllFiles;
+  private Map<String, Set<File>> toBeSyncedFilesMap;
 
-  /**
-   * All tsfiles in data directory
-   **/
-  private Map<String, Set<String>> currentLocalFiles;
+  private Map<String, Set<File>> deletedFilesMap;
+
+  private Map<String, Set<File>> sucessSyncedFilesMap;
+
+  private Map<String, Set<File>> successDeleyedFilesMap;
+
+  private Map<String, Set<File>> lastLocalFilesMap;
 
   /**
    * If true, sync is in execution.
    **/
   private volatile boolean syncStatus = false;
 
+  private SyncSenderLogger syncLog;
+
   /**
    * Key means storage group, Set means corresponding tsfiles
    **/
@@ -106,14 +115,12 @@ public class DataTransferManager implements IDataTransferManager {
     init();
   }
 
-  public static final DataTransferManager getInstance() {
+  public static DataTransferManager getInstance() {
     return InstanceHolder.INSTANCE;
   }
 
   /**
    * Create a sender and sync files to the receiver.
-   *
-   * @param args not used
    */
   public static void main(String[] args) throws IOException {
     Thread.currentThread().setName(ThreadName.SYNC_CLIENT.getName());
@@ -137,7 +144,7 @@ public class DataTransferManager implements IDataTransferManager {
   private void startMonitor() {
     executorService.scheduleWithFixedDelay(() -> {
       if (syncStatus) {
-        logger.info("Sync process is in execution!");
+        logger.info("Sync process for receiver {} is in execution!", config.getSyncReceiverName());
       }
     }, Constans.SYNC_MONITOR_DELAY, Constans.SYNC_MONITOR_PERIOD, TimeUnit.SECONDS);
   }
@@ -148,8 +155,8 @@ public class DataTransferManager implements IDataTransferManager {
   private void startTimedTask() {
     executorService.scheduleWithFixedDelay(() -> {
       try {
-        sync();
-      } catch (SyncConnectionException | IOException e) {
+        syncAll();
+      } catch (SyncConnectionException | IOException | TException e) {
         logger.error("Sync failed", e);
         stop();
       }
@@ -162,101 +169,203 @@ public class DataTransferManager implements IDataTransferManager {
     executorService = null;
   }
 
-  /**
-   * Execute a sync task.
-   */
-  @Override
-  public void sync() throws SyncConnectionException, IOException {
+  public void syncAll() throws SyncConnectionException, IOException, TException {
 
-    //1. Clear old snapshots if necessary
-    for (String snapshotPath : config.getSnapshotPaths()) {
-      if (new File(snapshotPath).exists() && new File(snapshotPath).list().length != 0) {
-        // It means that the last task of sync does not succeed! Clear the files and start to sync again
-        FileUtils.deleteDirectory(new File(snapshotPath));
-      }
-    }
-
-    // 2. Acquire valid files and check
-    syncFileManager.init();
-    validAllFiles = syncFileManager.getValidAllFiles();
-    currentLocalFiles = syncFileManager.getCurrentLocalFiles();
-    if (SyncUtils.isEmpty(validAllFiles)) {
-      logger.info("There has no file to sync !");
-      return;
-    }
-
-    // 3. Connect to sync server and Confirm Identity
+    // 1. Connect to sync receiver and confirm identity
     establishConnection(config.getServerIp(), config.getServerPort());
-    if (!confirmIdentity(config.getUuidPath())) {
+    if (!confirmIdentity(config.getSenderPath())) {
       logger.error("Sorry, you do not have the permission to connect to sync receiver.");
       System.exit(1);
     }
 
-    // 4. Create snapshot
-    for (Entry<String, Set<String>> entry : validAllFiles.entrySet()) {
-      validFileSnapshot.put(entry.getKey(), makeFileSnapshot(entry.getValue()));
+    // 2. Sync Schema
+    syncSchema();
+
+    // 3. Sync all data
+    String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+    for (String dataDir : dataDirs) {
+      logger.info("Start to sync data in data dir {}", dataDir);
+      config.update(dataDir);
+      SyncFileManager.getInstance().getValidFiles(dataDir);
+      lastLocalFilesMap = SyncFileManager.getInstance().getLastLocalFilesMap();
+      deletedFilesMap = SyncFileManager.getInstance().getDeletedFilesMap();
+      toBeSyncedFilesMap = SyncFileManager.getInstance().getToBeSyncedFilesMap();
+      checkRecovery();
+      if (SyncUtils.isEmpty(deletedFilesMap) && SyncUtils.isEmpty(toBeSyncedFilesMap)) {
+        logger.info("There has no data to sync in data dir {}", dataDir);
+        continue;
+      }
+      sync();
+      logger.info("Finish to sync data in data dir {}", dataDir);
     }
 
-    syncStatus = true;
+    // 4. notify receiver that synchronization finish
+    // At this point the synchronization has finished even if connection fails
+    try {
+      serviceClient.endSync();
+      transport.close();
+      logger.info("Sync process has finished.");
+    } catch (TException e) {
+      logger.error("Unable to connect to receiver.", e);
+    }
+  }
 
+  /**
+   * Execute a sync task.
+   */
+  @Override
+  public void sync() throws IOException {
     try {
-      // 5. Sync schema
-      syncSchema();
+      syncStatus = true;
+      syncLog = new SyncSenderLogger(getSchemaLogFile());
+
+      // 1. Sync data
+      for (Entry<String, Set<File>> entry : deletedFilesMap.entrySet()) {
+        // TODO deal with the situation
+        try {
+          if (serviceClient.init(entry.getKey()) == ResultStatus.FAILURE) {
+            throw new SyncConnectionException("unable init receiver");
+          }
+        } catch (TException | SyncConnectionException e) {
+          throw new SyncConnectionException("Unable to connect to receiver", e);
+        }
+        logger.info("Sync process starts to transfer data of storage group {}", entry.getKey());
+        syncDeletedFilesName(entry.getKey(), entry.getValue());
+        syncDataFilesInOneGroup(entry.getKey(), entry.getValue());
+      }
 
-      // 6. Sync data
-      syncAllData();
-    } catch (SyncConnectionException e) {
+      // 2. Clear sync log
+      clearSyncLog();
+
+    } catch (SyncConnectionException | TException e) {
       logger.error("cannot finish sync process", e);
+    } finally {
+      if (syncLog != null) {
+        syncLog.close();
+      }
       syncStatus = false;
-      return;
     }
+  }
 
-    // 7. clear snapshot
-    for (String snapshotPath : config.getSnapshotPaths()) {
-      FileUtils.deleteDirectory(new File(snapshotPath));
-    }
+  private void checkRecovery() {
+
+  }
+
+  public void clearSyncLog() {
 
-    // 8. notify receiver that synchronization finish
-    // At this point the synchronization has finished even if connection fails
-    try {
-      serviceClient.cleanUp();
-    } catch (TException e) {
-      logger.error("Unable to connect to receiver.", e);
-    }
-    transport.close();
-    logger.info("Sync process has finished.");
-    syncStatus = false;
   }
 
   @Override
-  public void syncAllData() throws SyncConnectionException {
-    for (Entry<String, Set<String>> entry : validAllFiles.entrySet()) {
-      Set<String> validFiles = entry.getValue();
-      Set<String> validSnapshot = validFileSnapshot.get(entry.getKey());
-      if (validSnapshot.isEmpty()) {
-        continue;
+  public void syncDeletedFilesName(String sgName, Set<File> deletedFilesName) {
+    if (deletedFilesName.isEmpty()) {
+      logger.info("There has no deleted files to be synced in storage group {}", sgName);
+      return;
+    }
+    logger.info("Start to sync names of deleted files in storage group {}", sgName);
+    for(File file:deletedFilesName){
+      try {
+        serviceClient.syncDeletedFileName(file.getName());
+      } catch (TException e) {
+        logger.error("Can not sync deleted file name {}, skip it.", file);
       }
-      logger.info("Sync process starts to transfer data of storage group {}", entry.getKey());
+    }
+    logger.info("Finish to sync names of deleted files in storage group {}", sgName);
+  }
+
+  @Override
+  public void syncDataFilesInOneGroup(String sgName, Set<File> toBeSyncFiles)
+      throws SyncConnectionException {
+    Set<String> validSnapshot = validFileSnapshot.get(sgName);
+    if (validSnapshot.isEmpty()) {
+      logger.info("There has no new tsfiles to be synced in storage group {}", sgName);
+      return;
+    }
+    logger.info("Sync process starts to transfer data of storage group {}", sgName);
+    int cnt = 0;
+    for (String tsfilePath : toBeSyncFiles) {
+      cnt++;
+      File snapshotFile = null;
       try {
-        if (!serviceClient.init(entry.getKey())) {
-          throw new SyncConnectionException("unable init receiver");
+        snapshotFile = makeFileSnapshot(tsfilePath);
+        syncSingleFile(snapshotFile);
+        logger.info("Task of synchronization has completed {}/{}.", cnt, toBeSyncFiles.size());
+      } catch (IOException e) {
+        logger.info(
+            "Tsfile {} can not make snapshot, so skip the tsfile and continue to sync other tsfiles",
+            tsfilePath, e);
+      } finally {
+        if(snapshotFile != null) {
+          snapshotFile.deleteOnExit();
         }
-      } catch (TException e) {
-        throw new SyncConnectionException("Unable to connect to receiver", e);
       }
-      syncData(validSnapshot);
-      if (afterSynchronization()) {
-        currentLocalFiles.get(entry.getKey()).addAll(validFiles);
-        syncFileManager.setCurrentLocalFiles(currentLocalFiles);
-        syncFileManager.backupNowLocalFileInfo(config.getLastFileInfo());
-        logger.info("Sync process has finished storage group {}.", entry.getKey());
-      } else {
-        logger.error("Receiver cannot sync data, abandon this synchronization of storage group {}",
-            entry.getKey());
+    }
+    logger.info("Sync process has finished storage group {}.", sgName);
+  }
+
+  private File makeFileSnapshot(String filePath) throws IOException {
+    String snapshotFilePath = SyncUtils.getSnapshotFilePath(filePath);
+    File newFile = new File(snapshotFilePath);
+    if (!newFile.getParentFile().exists()) {
+      newFile.getParentFile().mkdirs();
+    }
+    Path link = FileSystems.getDefault().getPath(snapshotFilePath);
+    Path target = FileSystems.getDefault().getPath(filePath);
+    Files.createLink(link, target);
+    return newFile;
+  }
+
+
+  /**
+   * Transfer data of a storage group to receiver.
+   *
+   */
+  private void syncSingleFile(File snapshotFile) throws SyncConnectionException {
+    try {
+      int retryCount = 0;
+      MessageDigest md = MessageDigest.getInstance(Constans.MESSAGE_DIGIT_NAME);
+      outer:
+      while (true) {
+        retryCount++;
+        if (retryCount > Constans.MAX_SYNC_FILE_TRY) {
+          throw new SyncConnectionException(String
+              .format("Can not sync file %s after %s tries.", snapshotFile.getAbsoluteFile(),
+                  Constans.MAX_SYNC_FILE_TRY));
+        }
+        md.reset();
+        byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE];
+        int dataLength;
+        try (FileInputStream fis = new FileInputStream(snapshotFile);
+            ByteArrayOutputStream bos = new ByteArrayOutputStream(Constans.DATA_CHUNK_SIZE)) {
+          while ((dataLength = fis.read(buffer)) != -1) { // cut the file into pieces to send
+            bos.write(buffer, 0, dataLength);
+            md.update(buffer, 0, dataLength);
+            ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
+            bos.reset();
+            if (!Boolean.parseBoolean(serviceClient
+                .syncData(null, snapshotFile.getName(), buffToSend,
+                    SyncDataStatus.PROCESSING_STATUS))) {
+              logger.info("Receiver failed to receive data from {}, retry.",
+                  snapshotFile.getAbsoluteFile());
+              continue outer;
+            }
+          }
+        }
+
+        // the file is sent successfully
+        String md5OfSender = (new BigInteger(1, md.digest())).toString(16);
+        String md5OfReceiver = serviceClient.syncData(md5OfSender, snapshotFile.getName(),
+            null, SyncDataStatus.FINISH_STATUS);
+        if (md5OfSender.equals(md5OfReceiver)) {
+          logger.info("Receiver has received {} successfully.", snapshotFile.getAbsoluteFile());
+          break;
+        }
       }
+    } catch (IOException e) {
+      throw new SyncConnectionException("Cannot sync data with receiver.", e);
     }
   }
 
+
   /**
    * Establish a connection between the sender and the receiver.
    *
@@ -271,7 +380,6 @@ public class DataTransferManager implements IDataTransferManager {
     try {
       transport.open();
     } catch (TTransportException e) {
-      syncStatus = false;
       logger.error("Cannot connect to server");
       throw new SyncConnectionException(e);
     }
@@ -317,19 +425,18 @@ public class DataTransferManager implements IDataTransferManager {
   }
 
   private String generateUUID() {
-    return Constans.SYNC_CLIENT + UUID.randomUUID().toString().replaceAll("-", "");
+    return Constans.SYNC_SENDER + UUID.randomUUID().toString().replaceAll("-", "");
   }
 
   /**
    * Create snapshots for valid files.
    */
   @Override
-  public Set<String> makeFileSnapshot(Set<String> validFiles) throws IOException {
+  public Set<String> makeFileSnapshot(Set<String> validFiles) {
     Set<String> validFilesSnapshot = new HashSet<>();
-    try {
-      for (String filePath : validFiles) {
+    for (String filePath : validFiles) {
+      try {
         String snapshotFilePath = SyncUtils.getSnapshotFilePath(filePath);
-        validFilesSnapshot.add(snapshotFilePath);
         File newFile = new File(snapshotFilePath);
         if (!newFile.getParentFile().exists()) {
           newFile.getParentFile().mkdirs();
@@ -337,144 +444,126 @@ public class DataTransferManager implements IDataTransferManager {
         Path link = FileSystems.getDefault().getPath(snapshotFilePath);
         Path target = FileSystems.getDefault().getPath(filePath);
         Files.createLink(link, target);
+        validFilesSnapshot.add(snapshotFilePath);
+      } catch (IOException e) {
+        logger.error("Can not make snapshot for file {}", filePath);
       }
-    } catch (IOException e) {
-      logger.error("Can not make fileSnapshot");
-      throw new IOException(e);
     }
     return validFilesSnapshot;
   }
 
   /**
-   * Transfer data of a storage group to receiver.
-   *
-   * @param fileSnapshotList list of sending snapshot files in a storage group.
-   */
-  public void syncData(Set<String> fileSnapshotList) throws SyncConnectionException {
-    try {
-      int successNum = 0;
-      for (String snapshotFilePath : fileSnapshotList) {
-        successNum++;
-        File file = new File(snapshotFilePath);
-        List<String> filePathSplit = new ArrayList<>();
-        String os = System.getProperty("os.name");
-        if (os.toLowerCase().startsWith("windows")) {
-          String[] name = snapshotFilePath.split(File.separator + File.separator);
-          filePathSplit.add(name[name.length - 2]);
-          filePathSplit.add(name[name.length - 1]);
-        } else {
-          String[] name = snapshotFilePath.split(File.separator);
-          filePathSplit.add(name[name.length - 2]);
-          filePathSplit.add(name[name.length - 1]);
-        }
-        int retryCount = 0;
-        // Get md5 of the file.
-        MessageDigest md = MessageDigest.getInstance("MD5");
-        outer:
-        while (true) {
-          retryCount++;
-          // Sync all data to receiver
-          if (retryCount > Constans.MAX_SYNC_FILE_TRY) {
-            throw new SyncConnectionException(String
-                .format("can not sync file %s after %s tries.", snapshotFilePath,
-                    Constans.MAX_SYNC_FILE_TRY));
-          }
-          md.reset();
-          byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE];
-          int dataLength;
-          try (FileInputStream fis = new FileInputStream(file);
-              ByteArrayOutputStream bos = new ByteArrayOutputStream(Constans.DATA_CHUNK_SIZE)) {
-            while ((dataLength = fis.read(buffer)) != -1) { // cut the file into pieces to send
-              bos.write(buffer, 0, dataLength);
-              md.update(buffer, 0, dataLength);
-              ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
-              bos.reset();
-              if (!Boolean.parseBoolean(serviceClient
-                  .syncData(null, filePathSplit, buffToSend, SyncDataStatus.PROCESSING_STATUS))) {
-                logger.info("Receiver failed to receive data from {}, retry.", snapshotFilePath);
-                continue outer;
-              }
-            }
-          }
-
-          // the file is sent successfully
-          String md5OfSender = (new BigInteger(1, md.digest())).toString(16);
-          String md5OfReceiver = serviceClient.syncData(md5OfSender, filePathSplit,
-              null, SyncDataStatus.FINISH_STATUS);
-          if (md5OfSender.equals(md5OfReceiver)) {
-            logger.info("Receiver has received {} successfully.", snapshotFilePath);
-            break;
-          }
-        }
-        logger.info(String.format("Task of synchronization has completed %d/%d.", successNum,
-            fileSnapshotList.size()));
-      }
-    } catch (Exception e) {
-      throw new SyncConnectionException("Cannot sync data with receiver.", e);
-    }
-  }
-
-  /**
    * Sync schema with receiver.
    */
   @Override
-  public void syncSchema() throws SyncConnectionException {
+  public void syncSchema() throws SyncConnectionException, TException {
     int retryCount = 0;
-    outer:
+    serviceClient.initSyncData(MetadataConstant.METADATA_LOG);
     while (true) {
-      retryCount++;
       if (retryCount > Constans.MAX_SYNC_FILE_TRY) {
         throw new SyncConnectionException(String
-            .format("can not sync schema after %s tries.", Constans.MAX_SYNC_FILE_TRY));
+            .format("Can not sync schema after %s tries.", Constans.MAX_SYNC_FILE_TRY));
       }
-      byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE];
-      try (FileInputStream fis = new FileInputStream(new File(config.getSchemaPath()));
-          ByteArrayOutputStream bos = new ByteArrayOutputStream(Constans.DATA_CHUNK_SIZE)) {
-        // Get md5 of the file.
-        MessageDigest md = MessageDigest.getInstance("MD5");
-        int dataLength;
-        while ((dataLength = fis.read(buffer)) != -1) { // cut the file into pieces to send
-          bos.write(buffer, 0, dataLength);
-          md.update(buffer, 0, dataLength);
+      try {
+        if (tryToSyncSchema()) {
+          writeSyncSchemaPos(getSchemaPosFile());
+          break;
+        }
+      } finally {
+        retryCount++;
+      }
+    }
+  }
+
+  private boolean tryToSyncSchema() {
+    int schemaPos = readSyncSchemaPos(getSchemaPosFile());
+
+    // start to sync file data and get md5 of this file.
+    try (BufferedReader br = new BufferedReader(new FileReader(getSchemaLogFile()));
+        ByteArrayOutputStream bos = new ByteArrayOutputStream(Constans.DATA_CHUNK_SIZE)) {
+      schemaFileLinePos = 0;
+      while (schemaFileLinePos++ <= schemaPos) {
+        br.readLine();
+      }
+      MessageDigest md = MessageDigest.getInstance(Constans.MESSAGE_DIGIT_NAME);
+      String line;
+      int cntLine = 0;
+      while ((line = br.readLine()) != null) {
+        schemaFileLinePos++;
+        byte[] singleLineData = BytesUtils.stringToBytes(line);
+        bos.write(singleLineData);
+        md.update(singleLineData);
+        if (cntLine++ == BATCH_LINE) {
           ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
           bos.reset();
           // PROCESSING_STATUS represents there is still schema buffer to send.
-          if (!Boolean.parseBoolean(
-              serviceClient.syncSchema(null, buffToSend, SyncDataStatus.PROCESSING_STATUS))) {
+          if (serviceClient.syncData(buffToSend) == ResultStatus.FAILURE) {
             logger.error("Receiver failed to receive metadata, retry.");
-            continue outer;
+            return false;
           }
+          cntLine = 0;
         }
-        bos.close();
-        String md5OfSender = (new BigInteger(1, md.digest())).toString(16);
-        String md5OfReceiver = serviceClient
-            .syncSchema(md5OfSender, null, SyncDataStatus.FINISH_STATUS);
-        if (md5OfSender.equals(md5OfReceiver)) {
-          logger.info("Receiver has received schema successfully.");
-          /** receiver start to load metadata **/
-          if (Boolean
-              .parseBoolean(serviceClient.syncSchema(null, null, SyncDataStatus.SUCCESS_STATUS))) {
-            throw new SyncConnectionException("Receiver failed to load metadata");
-          }
-          break;
+      }
+      if (bos.size() != 0) {
+        ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
+        bos.reset();
+        if (serviceClient.syncData(buffToSend) == ResultStatus.FAILURE) {
+          logger.error("Receiver failed to receive metadata, retry.");
+          return false;
         }
-      } catch (Exception e) {
-        logger.error("Cannot sync schema ", e);
-        throw new SyncConnectionException(e);
       }
+
+      // check md5
+      return checkMD5ForSchema((new BigInteger(1, md.digest())).toString(16));
+    } catch (NoSuchAlgorithmException | IOException | TException e) {
+      logger.error("Can not finish transfer schema to receiver", e);
+      return false;
     }
   }
 
-  @Override
-  public boolean afterSynchronization() throws SyncConnectionException {
-    boolean successOrNot;
+  private File getSchemaPosFile() {
+    return new File(config.getSenderPath(), Constans.SCHEMA_POS_FILE_NAME)
+  }
+
+  private File getSchemaLogFile() {
+    return new File(IoTDBDescriptor.getInstance().getConfig().getSchemaDir(),
+        MetadataConstant.METADATA_LOG);
+  }
+
+  private boolean checkMD5ForSchema(String md5OfSender) throws TException {
+    String md5OfReceiver = serviceClient.checkDataMD5(md5OfSender);
+    if (md5OfSender.equals(md5OfReceiver)) {
+      logger.info("Receiver has received schema successfully, retry.");
+      return true;
+    } else {
+      logger.error("MD5 check of schema file {} failed, retry", getSchemaLogFile().getAbsoluteFile());
+      return false;
+    }
+  }
+
+  private int readSyncSchemaPos(File syncSchemaLogFile) {
     try {
-      successOrNot = serviceClient.load();
-    } catch (TException e) {
-      throw new SyncConnectionException(
-          "Can not finish sync process because sync receiver has broken down.", e);
+      if (syncSchemaLogFile.exists()) {
+        try (BufferedReader br = new BufferedReader(new FileReader(syncSchemaLogFile))) {
+          return Integer.parseInt(br.readLine());
+        }
+      }
+    } catch (IOException e) {
+      logger.error("Can not find file {}", syncSchemaLogFile.getAbsoluteFile(), e);
+    }
+    return 0;
+  }
+
+  private void writeSyncSchemaPos(File syncSchemaLogFile) {
+    try {
+      if (syncSchemaLogFile.exists()) {
+        try (BufferedWriter br = new BufferedWriter(new FileWriter(syncSchemaLogFile))) {
+          br.write(Integer.toString(schemaFileLinePos));
+        }
+      }
+    } catch (IOException e) {
+      logger.error("Can not find file {}", syncSchemaLogFile.getAbsoluteFile(), e);
     }
-    return successOrNot;
   }
 
   /**
@@ -490,7 +579,7 @@ public class DataTransferManager implements IDataTransferManager {
       lockFile.createNewFile();
     }
     if (!lockInstance(config.getLockFilePath())) {
-      logger.error("Sync client is running.");
+      logger.error("Sync client is already running.");
       System.exit(1);
     }
   }
@@ -498,9 +587,9 @@ public class DataTransferManager implements IDataTransferManager {
   /**
    * Try to lock lockfile. if failed, it means that sync client has benn started.
    *
-   * @param lockFile path of lockfile
+   * @param lockFile path of lock file
    */
-  private static boolean lockInstance(final String lockFile) {
+  private boolean lockInstance(final String lockFile) {
     try {
       final File file = new File(lockFile);
       final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
@@ -510,7 +599,6 @@ public class DataTransferManager implements IDataTransferManager {
           try {
             fileLock.release();
             randomAccessFile.close();
-            FileUtils.forceDelete(file);
           } catch (Exception e) {
             logger.error("Unable to remove lock file: {}", lockFile, e);
           }
@@ -528,11 +616,15 @@ public class DataTransferManager implements IDataTransferManager {
     private static final DataTransferManager INSTANCE = new DataTransferManager();
   }
 
-  public void setConfig(SyncSenderConfig config) {
-    this.config = config;
+  private File getSyncLogFile() {
+    return new File(config.getSenderPath(), Constans.SYNC_LOG_NAME);
+  }
+
+  private File getCurrentLogFile() {
+    return new File(config.getSenderPath(), Constans.CURRENT_SYNC_LOG_NAME);
   }
 
-  public List<String> getSchema() {
-    return schema;
+  public void setConfig(SyncSenderConfig config) {
+    DataTransferManager.config = config;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
index 3941a90..82b0d00 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
@@ -18,9 +18,11 @@
  */
 package org.apache.iotdb.db.sync.sender.transfer;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Set;
 import org.apache.iotdb.db.exception.SyncConnectionException;
+import org.apache.thrift.TException;
 
 /**
  * SyncSender defines the methods of a sender in sync module.
@@ -45,22 +47,20 @@ public interface IDataTransferManager {
   /**
    * Make file snapshots before sending files.
    */
-  Set<String> makeFileSnapshot(Set<String> validFiles) throws IOException;
+  Set<String> makeFileSnapshot(Set<String> validFiles);
 
   /**
    * Send schema file to receiver.
    */
-  void syncSchema() throws SyncConnectionException;
+  void syncSchema() throws SyncConnectionException, TException;
 
-  /**
-   * For all valid files, send it to receiver side and load these data in receiver.
-   */
-  void syncAllData() throws SyncConnectionException;
+  void syncDeletedFilesName(String sgName, Set<File> deletedFilesName)
+      throws SyncConnectionException;
 
   /**
-   * Close the socket after sending files.
+   * For all valid files, send it to receiver side and load these data in receiver.
    */
-  boolean afterSynchronization() throws SyncConnectionException;
+  void syncDataFilesInOneGroup(String sgName, Set<File> deletedFilesName) throws SyncConnectionException;
 
   /**
    * Execute a sync task.
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
index 5a056a9..8e7a9e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
@@ -29,9 +29,6 @@ public class SyncUtils {
 
   private static final String IP_SEPARATOR = "\\.";
 
-  private static String[] snapshotPaths = SyncSenderDescriptor.getInstance()
-      .getConfig().getSnapshotPaths();
-
   private SyncUtils() {
   }
 
@@ -41,40 +38,20 @@ public class SyncUtils {
    * sender.
    */
   public static String getSnapshotFilePath(String filePath) {
-    String[] name;
-    String relativeFilePath;
-    String os = System.getProperty("os.name");
-    if (os.toLowerCase().startsWith("windows")) {
-      name = filePath.split(File.separator + File.separator);
-      relativeFilePath = name[name.length - 2] + File.separator + name[name.length - 1];
-    } else {
-      name = filePath.split(File.separator);
-      relativeFilePath = name[name.length - 2] + File.separator + name[name.length - 1];
-    }
-    String bufferWritePath = name[0];
-    for (int i = 1; i < name.length - 2; i++) {
-      bufferWritePath = bufferWritePath + File.separatorChar + name[i];
-    }
-    for (String snapshotPath : snapshotPaths) {
-      if (snapshotPath.startsWith(bufferWritePath)) {
-        if (!new File(snapshotPath).exists()) {
-          new File(snapshotPath).mkdir();
-        }
-        if (snapshotPath.length() > 0
-            && snapshotPath.charAt(snapshotPath.length() - 1) != File.separatorChar) {
-          snapshotPath = snapshotPath + File.separatorChar;
-        }
-        return snapshotPath + relativeFilePath;
-      }
+    String relativeFilePath =
+        new File(filePath).getParent() + File.separator + new File(filePath).getName();
+    String snapshotDir = SyncSenderDescriptor.getInstance().getConfig().getSnapshotPath();
+    if (!new File(snapshotDir).exists()) {
+      new File(snapshotDir).mkdirs();
     }
-    return null;
+    return new File(snapshotDir, relativeFilePath).getAbsolutePath();
   }
 
   /**
    * Verify sending list is empty or not It's used by sync sender.
    */
-  public static boolean isEmpty(Map<String, Set<String>> sendingFileList) {
-    for (Entry<String, Set<String>> entry : sendingFileList.entrySet()) {
+  public static boolean isEmpty(Map<String, Set<File>> sendingFileList) {
+    for (Entry<String, Set<File>> entry : sendingFileList.entrySet()) {
       if (!entry.getValue().isEmpty()) {
         return false;
       }
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java
index ffb1f2b..7a06281 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java
@@ -147,7 +147,7 @@ public class SingleClientSyncTest {
       "insert into root.test.d0(timestamp,s1) values(3000,'1309')",
       "insert into root.test.d1.g0(timestamp,s0) values(400,1050)", "merge", "flush",};
   private boolean testFlag = Constant.testFlag;
-  private static final String SYNC_CLIENT = Constans.SYNC_CLIENT;
+  private static final String SYNC_CLIENT = Constans.SYNC_SENDER;
   private static final Logger logger = LoggerFactory.getLogger(SingleClientSyncTest.class);
 
   public static void main(String[] args) throws Exception {
@@ -159,7 +159,7 @@ public class SingleClientSyncTest {
   }
 
   public void setConfig() {
-    config.setUuidPath(
+    config.setSenderPath(
         config.getDataDirectory() + SYNC_CLIENT + File.separator + Constans.UUID_FILE_NAME);
     config.setLastFileInfo(
         config.getDataDirectory() + SYNC_CLIENT + File.separator + Constans.LAST_LOCAL_FILE_NAME);
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/SyncFileManagerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/SyncFileManagerTest.java
index 4c8356a..322dfe0 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/sender/SyncFileManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/SyncFileManagerTest.java
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
 
 public class SyncFileManagerTest {
 
-  private static final String POST_BACK_DIRECTORY_TEST = Constans.SYNC_CLIENT + File.separator;
+  private static final String POST_BACK_DIRECTORY_TEST = Constans.SYNC_SENDER + File.separator;
   private static final String LAST_FILE_INFO_TEST =
       POST_BACK_DIRECTORY_TEST + Constans.LAST_LOCAL_FILE_NAME;
   private static final String SENDER_FILE_PATH_TEST = POST_BACK_DIRECTORY_TEST + "data";
diff --git a/service-rpc/src/main/thrift/sync.thrift b/service-rpc/src/main/thrift/sync.thrift
index fec6079..6fc394a 100755
--- a/service-rpc/src/main/thrift/sync.thrift
+++ b/service-rpc/src/main/thrift/sync.thrift
@@ -22,17 +22,18 @@ typedef i32 int
 typedef i16 short
 typedef i64 long
 
-enum SyncDataStatus {
-  SUCCESS_STATUS,
-  FINISH_STATUS,
-  PROCESSING_STATUS
+enum ResultStatus {
+  SUCCESS,
+  FAILURE,
+  BUSY
 }
 
 service SyncService{
-	bool checkIdentity(1:string uuid, 2:string address)
-	string syncSchema(1:string md5, 2:binary buff, 3:SyncDataStatus status)
-	string syncData(1:string md5, 2:list<string> filename, 3:binary buff, 4:SyncDataStatus status)
-	bool load()
-	void cleanUp()
-	bool init(1:string storageGroupName)
+	ResultStatus checkIdentity(1:string address)
+	ResultStatus init(1:string storageGroupName)
+	ResultStatus syncDeletedFileName(1:string fileName)
+	ResultStatus initSyncData(1:string filename)
+	ResultStatus syncData(1:binary buff)
+	string checkDataMD5(1:string md5)
+	void endSync()
 }
\ No newline at end of file