You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/08/22 03:14:11 UTC

[incubator-iotdb] branch reimpl_sync created (now 2a67544)

This is an automated email from the ASF dual-hosted git repository.

lta pushed a change to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 2a67544  merge master

This branch includes the following new commits:

     new 85cd778  add sync new code framework
     new ab86f89  complete manage module
     new 7b57574c complete all sender module
     new b15ec6b  complete sync sender module
     new 2a67544  merge master

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 03/05: complete all sender module

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 7b57574ce0e47b6457d2cfe6de086f0d30bba029
Author: lta <li...@163.com>
AuthorDate: Wed Aug 21 17:03:52 2019 +0800

    complete all sender module
---
 .../receiver/transfer/SyncServiceImplBackup.java   | 736 ---------------------
 .../apache/iotdb/db/sync/sender/conf/Constans.java |   2 +-
 .../db/sync/sender/manage/ISyncFileManager.java    |   4 -
 .../db/sync/sender/manage/SyncFileManager.java     |   9 -
 .../sender/recover/ISyncSenderLogAnalyzer.java     |   6 +-
 .../db/sync/sender/recover/ISyncSenderLogger.java  |   8 -
 .../sync/sender/recover/SyncSenderLogAnalyzer.java | 114 ++++
 .../db/sync/sender/recover/SyncSenderLogger.java   |  50 +-
 .../sync/sender/transfer/DataTransferManager.java  | 276 ++++----
 .../sync/sender/transfer/IDataTransferManager.java |  10 +-
 .../java/org/apache/iotdb/db/utils/SyncUtils.java  |   7 +-
 11 files changed, 255 insertions(+), 967 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImplBackup.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImplBackup.java
deleted file mode 100644
index 6f2f754..0000000
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImplBackup.java
+++ /dev/null
@@ -1,736 +0,0 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing,
-// * software distributed under the License is distributed on an
-// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// * KIND, either express or implied.  See the License for the
-// * specific language governing permissions and limitations
-// * under the License.
-// */
-//package org.apache.iotdb.db.sync.receiver.transfer;
-//
-//import java.io.BufferedReader;
-//import java.io.File;
-//import java.io.FileInputStream;
-//import java.io.FileNotFoundException;
-//import java.io.FileOutputStream;
-//import java.io.IOException;
-//import java.math.BigInteger;
-//import java.nio.ByteBuffer;
-//import java.nio.channels.FileChannel;
-//import java.security.MessageDigest;
-//import java.util.ArrayList;
-//import java.util.Collections;
-//import java.util.HashMap;
-//import java.util.HashSet;
-//import java.util.Iterator;
-//import java.util.List;
-//import java.util.Map;
-//import java.util.Map.Entry;
-//import java.util.Set;
-//import org.apache.commons.io.FileUtils;
-//import org.apache.commons.lang3.StringUtils;
-//import org.apache.iotdb.db.concurrent.ThreadName;
-//import org.apache.iotdb.db.conf.IoTDBConfig;
-//import org.apache.iotdb.db.conf.IoTDBDescriptor;
-//import org.apache.iotdb.db.conf.directories.DirectoryManager;
-//import org.apache.iotdb.db.engine.StorageEngine;
-//import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-//import org.apache.iotdb.db.exception.MetadataErrorException;
-//import org.apache.iotdb.db.exception.PathErrorException;
-//import org.apache.iotdb.db.exception.ProcessorException;
-//import org.apache.iotdb.db.exception.StorageEngineException;
-//import org.apache.iotdb.db.metadata.MManager;
-//import org.apache.iotdb.db.metadata.MetadataConstant;
-//import org.apache.iotdb.db.metadata.MetadataOperationType;
-//import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
-//import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-//import org.apache.iotdb.db.sync.sender.conf.Constans;
-//import org.apache.iotdb.db.utils.FilePathUtils;
-//import org.apache.iotdb.db.utils.SyncUtils;
-//import org.apache.iotdb.service.sync.thrift.SyncDataStatus;
-//import org.apache.iotdb.service.sync.thrift.SyncService;
-//import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
-//import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-//import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
-//import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
-//import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-//import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-//import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-//import org.apache.iotdb.tsfile.read.ReadOnlyTsFile;
-//import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-//import org.apache.iotdb.tsfile.read.common.Field;
-//import org.apache.iotdb.tsfile.read.common.Path;
-//import org.apache.iotdb.tsfile.read.common.RowRecord;
-//import org.apache.iotdb.tsfile.read.expression.QueryExpression;
-//import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-//public class SyncServiceImplBackup implements SyncService.Iface {
-//
-//  private static final Logger logger = LoggerFactory.getLogger(SyncServiceImplBackup.class);
-//
-//  private static final StorageEngine STORAGE_GROUP_MANAGER = StorageEngine.getInstance();
-//  /**
-//   * Metadata manager
-//   **/
-//  private static final MManager metadataManger = MManager.getInstance();
-//
-//  private static final String SYNC_SERVER = Constans.SYNC_RECEIVER;
-//
-//  private ThreadLocal<String> uuid = new ThreadLocal<>();
-//  /**
-//   * String means storage group,List means the set of new files(path) in local IoTDB and String
-//   * means path of new Files
-//   **/
-//  private ThreadLocal<Map<String, List<String>>> fileNodeMap = new ThreadLocal<>();
-//  /**
-//   * Map String1 means timeseries String2 means path of new Files, long means startTime
-//   **/
-//  private ThreadLocal<Map<String, Map<String, Long>>> fileNodeStartTime = new ThreadLocal<>();
-//  /**
-//   * Map String1 means timeseries String2 means path of new Files, long means endTime
-//   **/
-//  private ThreadLocal<Map<String, Map<String, Long>>> fileNodeEndTime = new ThreadLocal<>();
-//
-//  /**
-//   * Total num of files that needs to be loaded
-//   */
-//  private ThreadLocal<Integer> fileNum = new ThreadLocal<>();
-//
-//  /**
-//   * IoTDB config
-//   **/
-//  private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-//
-//  /**
-//   * IoTDB data directory
-//   **/
-//  private String baseDir = config.getBaseDir();
-//
-//  /**
-//   * IoTDB  multiple bufferWrite directory
-//   **/
-//  private String[] bufferWritePaths = config.getDataDirs();
-//
-//  /**
-//   * The path to store metadata file of sender
-//   */
-//  private ThreadLocal<String> schemaFromSenderPath = new ThreadLocal<>();
-//
-//  /**
-//   * Sync folder path of server
-//   **/
-//  private String syncFolderPath;
-//
-//  /**
-//   * Sync data path of server
-//   */
-//  private String syncDataPath;
-//
-//  /**
-//   * Init threadLocal variable and delete old useless files.
-//   */
-//  @Override
-//  public boolean init(String storageGroup) {
-//    logger.info("Sync process starts to receive data of storage group {}", storageGroup);
-//    fileNum.set(0);
-//    fileNodeMap.set(new HashMap<>());
-//    fileNodeStartTime.set(new HashMap<>());
-//    fileNodeEndTime.set(new HashMap<>());
-//    try {
-//      FileUtils.deleteDirectory(new File(syncDataPath));
-//    } catch (IOException e) {
-//      logger.error("cannot delete directory {} ", syncFolderPath);
-//      return false;
-//    }
-//    for (String bufferWritePath : bufferWritePaths) {
-//      bufferWritePath = FilePathUtils.regularizePath(bufferWritePath);
-//      String backupPath = bufferWritePath + SYNC_SERVER + File.separator;
-//      File backupDirectory = new File(backupPath, this.uuid.get());
-//      if (backupDirectory.exists() && backupDirectory.list().length != 0) {
-//        try {
-//          FileUtils.deleteDirectory(backupDirectory);
-//        } catch (IOException e) {
-//          logger.error("cannot delete directory {} ", syncFolderPath);
-//          return false;
-//        }
-//      }
-//    }
-//    return true;
-//  }
-//
-//  /**
-//   * Verify IP address of sender
-//   */
-//  @Override
-//  public boolean checkIdentity(String uuid, String ipAddress) {
-//    Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName());
-//    this.uuid.set(uuid);
-//    initPath();
-//    return SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress);
-//  }
-//
-//  /**
-//   * Init file path and clear data if last sync process failed.
-//   */
-//  private void initPath() {
-//    baseDir = FilePathUtils.regularizePath(baseDir);
-//    syncFolderPath = baseDir + SYNC_SERVER + File.separatorChar + this.uuid.get();
-//    syncDataPath = syncFolderPath + File.separatorChar + Constans.DATA_SNAPSHOT_NAME;
-//    schemaFromSenderPath
-//        .set(syncFolderPath + File.separator + MetadataConstant.METADATA_LOG);
-//  }
-//
-//  /**
-//   * Acquire schema from sender
-//   *
-//   * @param status: FINIFSH_STATUS, SUCCESS_STATUS or PROCESSING_STATUS. status = FINISH_STATUS :
-//   * finish receiving schema file, start to sync schema. status = PROCESSING_STATUS : the schema
-//   * file has not received completely.SUCCESS_STATUS: load metadata.
-//   */
-//  @Override
-//  public String syncSchema(String md5, ByteBuffer schema, SyncDataStatus status) {
-//    String md5OfReceiver = Boolean.toString(Boolean.TRUE);
-//    if (status == SyncDataStatus.SUCCESS_STATUS) {
-//      /** sync metadata, include storage group and timeseries **/
-//      return Boolean.toString(loadMetadata());
-//    } else if (status == SyncDataStatus.PROCESSING_STATUS) {
-//      File file = new File(schemaFromSenderPath.get());
-//      if (!file.getParentFile().exists()) {
-//        try {
-//          file.getParentFile().mkdirs();
-//          file.createNewFile();
-//        } catch (IOException e) {
-//          logger.error("Cannot make schema file {}.", file.getPath(), e);
-//          md5OfReceiver = Boolean.toString(Boolean.FALSE);
-//        }
-//      }
-//      try (FileOutputStream fos = new FileOutputStream(file, true);
-//          FileChannel channel = fos.getChannel()) {
-//        channel.write(schema);
-//      } catch (Exception e) {
-//        logger.error("Cannot insert data to file {}.", file.getPath(), e);
-//        md5OfReceiver = Boolean.toString(Boolean.FALSE);
-//      }
-//    } else {
-//      try (FileInputStream fis = new FileInputStream(schemaFromSenderPath.get())) {
-//        MessageDigest md = MessageDigest.getInstance("MD5");
-//        byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE];
-//        int n;
-//        while ((n = fis.read(buffer)) != -1) {
-//          md.update(buffer, 0, n);
-//        }
-//        md5OfReceiver = (new BigInteger(1, md.digest())).toString(16);
-//        if (!md5.equals(md5OfReceiver)) {
-//          FileUtils.forceDelete(new File(schemaFromSenderPath.get()));
-//        }
-//      } catch (Exception e) {
-//        logger.error("Receiver cannot generate md5 {}", schemaFromSenderPath.get(), e);
-//      }
-//    }
-//    return md5OfReceiver;
-//  }
-//
-//  /**
-//   * Load metadata from sender
-//   */
-//  private boolean loadMetadata() {
-//    if (new File(schemaFromSenderPath.get()).exists()) {
-//      try (BufferedReader br = new BufferedReader(
-//          new java.io.FileReader(schemaFromSenderPath.get()))) {
-//        String metadataOperation;
-//        while ((metadataOperation = br.readLine()) != null) {
-//          operation(metadataOperation);
-//        }
-//      } catch (FileNotFoundException e) {
-//        logger.error("Cannot read the file {}.",
-//            schemaFromSenderPath.get(), e);
-//        return false;
-//      } catch (IOException e) {
-//        /** multiple insert schema, ignore it **/
-//      } catch (Exception e) {
-//        logger.error("Parse metadata operation failed.", e);
-//        return false;
-//      }
-//    }
-//    return true;
-//  }
-//
-//  /**
-//   * Operate metadata operation in MManager
-//   *
-//   * @param cmd metadata operation
-//   */
-//  private void operation(String cmd)
-//      throws PathErrorException, IOException, MetadataErrorException {
-//    String[] args = cmd.trim().split(",");
-//    switch (args[0]) {
-//      case MetadataOperationType.ADD_PATH_TO_MTREE:
-//        Map<String, String> props;
-//        String[] kv;
-//        props = new HashMap<>(args.length - 5 + 1, 1);
-//        for (int k = 5; k < args.length; k++) {
-//          kv = args[k].split("=");
-//          props.put(kv[0], kv[1]);
-//        }
-//        metadataManger.addPathToMTree(new Path(args[1]), TSDataType.deserialize(Short.valueOf(args[2])),
-//            TSEncoding.deserialize(Short.valueOf(args[3])),
-//            CompressionType.deserialize(Short.valueOf(args[4])),
-//            props);
-//        break;
-//      case MetadataOperationType.DELETE_PATH_FROM_MTREE:
-//        metadataManger.deletePaths(Collections.singletonList(new Path(args[1])));
-//        break;
-//      case MetadataOperationType.SET_STORAGE_LEVEL_TO_MTREE:
-//        metadataManger.setStorageLevelToMTree(args[1]);
-//        break;
-//      case MetadataOperationType.ADD_A_PTREE:
-//        metadataManger.addAPTree(args[1]);
-//        break;
-//      case MetadataOperationType.ADD_A_PATH_TO_PTREE:
-//        metadataManger.addPathToPTree(args[1]);
-//        break;
-//      case MetadataOperationType.DELETE_PATH_FROM_PTREE:
-//        metadataManger.deletePathFromPTree(args[1]);
-//        break;
-//      case MetadataOperationType.LINK_MNODE_TO_PTREE:
-//        metadataManger.linkMNodeToPTree(args[1], args[2]);
-//        break;
-//      case MetadataOperationType.UNLINK_MNODE_FROM_PTREE:
-//        metadataManger.unlinkMNodeFromPTree(args[1], args[2]);
-//        break;
-//      default:
-//        logger.error("Unrecognizable command {}", cmd);
-//    }
-//  }
-//
-//  /**
-//   * Start receiving tsfile from sender
-//   *
-//   * @param status status = SUCCESS_STATUS : finish receiving one tsfile status = PROCESSING_STATUS
-//   * : tsfile has not received completely.
-//   */
-//  @Override
-//  public String syncData(String md5OfSender, List<String> filePathSplit,
-//      ByteBuffer dataToReceive, SyncDataStatus status) {
-//    String md5OfReceiver = Boolean.toString(Boolean.TRUE);
-//    FileChannel channel;
-//    /** Recombination File Path **/
-//    String filePath = StringUtils.join(filePathSplit, File.separatorChar);
-//    syncDataPath = FilePathUtils.regularizePath(syncDataPath);
-//    filePath = syncDataPath + filePath;
-//    if (status == SyncDataStatus.PROCESSING_STATUS) { // there are still data stream to add
-//      File file = new File(filePath);
-//      if (!file.getParentFile().exists()) {
-//        try {
-//          file.getParentFile().mkdirs();
-//          file.createNewFile();
-//        } catch (IOException e) {
-//          logger.error("cannot make file {}", file.getPath(), e);
-//          md5OfReceiver = Boolean.toString(Boolean.FALSE);
-//        }
-//      }
-//      try (FileOutputStream fos = new FileOutputStream(file, true)) {// append new data
-//        channel = fos.getChannel();
-//        channel.write(dataToReceive);
-//      } catch (IOException e) {
-//        logger.error("cannot insert data to file {}", file.getPath(), e);
-//        md5OfReceiver = Boolean.toString(Boolean.FALSE);
-//
-//      }
-//    } else { // all data in the same file has received successfully
-//      try (FileInputStream fis = new FileInputStream(filePath)) {
-//        MessageDigest md = MessageDigest.getInstance("MD5");
-//        byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE];
-//        int n;
-//        while ((n = fis.read(buffer)) != -1) {
-//          md.update(buffer, 0, n);
-//        }
-//        md5OfReceiver = (new BigInteger(1, md.digest())).toString(16);
-//        if (md5OfSender.equals(md5OfReceiver)) {
-//          fileNum.set(fileNum.get() + 1);
-//
-//          logger.info(String.format("Receiver has received %d files from sender", fileNum.get()));
-//        } else {
-//          FileUtils.forceDelete(new File(filePath));
-//        }
-//      } catch (Exception e) {
-//        logger.error("Receiver cannot generate md5 {}", filePath, e);
-//      }
-//    }
-//    return md5OfReceiver;
-//  }
-//
-//
-//  @Override
-//  public boolean load() {
-//    try {
-//      getFileNodeInfo();
-//      loadData();
-//    } catch (Exception e) {
-//      logger.error("fail to load data", e);
-//      return false;
-//    }
-//    return true;
-//  }
-//
-//  /**
-//   * Get all tsfiles' info which are sent from sender, it is preparing for merging these data
-//   */
-//  public void getFileNodeInfo() throws IOException {
-//    File dataFileRoot = new File(syncDataPath);
-//    File[] files = dataFileRoot.listFiles();
-//    int processedNum = 0;
-//    for (File storageGroupPB : files) {
-//      List<String> filesPath = new ArrayList<>();
-//      File[] filesSG = storageGroupPB.listFiles();
-//      for (File fileTF : filesSG) { // fileTF means TsFiles
-//        Map<String, Long> startTimeMap = new HashMap<>();
-//        Map<String, Long> endTimeMap = new HashMap<>();
-//        TsFileSequenceReader reader = null;
-//        try {
-//          reader = new TsFileSequenceReader(fileTF.getPath());
-//          Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
-//          Iterator<String> it = deviceIdMap.keySet().iterator();
-//          while (it.hasNext()) {
-//            String key = it.next();
-//            TsDeviceMetadataIndex device = deviceIdMap.get(key);
-//            startTimeMap.put(key, device.getStartTime());
-//            endTimeMap.put(key, device.getEndTime());
-//          }
-//        } catch (IOException e) {
-//          logger.error("Unable to read tsfile {}", fileTF.getPath());
-//          throw new IOException(e);
-//        } finally {
-//          try {
-//            if (reader != null) {
-//              reader.close();
-//            }
-//          } catch (IOException e) {
-//            logger.error("Cannot close tsfile stream {}", fileTF.getPath());
-//            throw new IOException(e);
-//          }
-//        }
-//        fileNodeStartTime.get().put(fileTF.getPath(), startTimeMap);
-//        fileNodeEndTime.get().put(fileTF.getPath(), endTimeMap);
-//        filesPath.add(fileTF.getPath());
-//        processedNum++;
-//        logger.info(String
-//            .format("Get tsfile info has complete : %d/%d", processedNum, fileNum.get()));
-//        fileNodeMap.get().put(storageGroupPB.getName(), filesPath);
-//      }
-//    }
-//  }
-//
-//
-//  /**
-//   * It is to merge data. If data in the tsfile is new, append the tsfile to the storage group
-//   * directly. If data in the tsfile is old, it has two strategy to merge.It depends on the
-//   * possibility of updating historical data.
-//   */
-//  public void loadData() throws StorageEngineException {
-//    syncDataPath = FilePathUtils.regularizePath(syncDataPath);
-//    int processedNum = 0;
-//    for (String storageGroup : fileNodeMap.get().keySet()) {
-//      List<String> filesPath = fileNodeMap.get().get(storageGroup);
-//      /**  before load external tsFile, it is necessary to order files in the same storage group **/
-//      Collections.sort(filesPath, (o1, o2) -> {
-//        Map<String, Long> startTimePath1 = fileNodeStartTime.get().get(o1);
-//        Map<String, Long> endTimePath2 = fileNodeEndTime.get().get(o2);
-//        for (Entry<String, Long> entry : endTimePath2.entrySet()) {
-//          if (startTimePath1.containsKey(entry.getKey())) {
-//            if (startTimePath1.get(entry.getKey()) > entry.getValue()) {
-//              return 1;
-//            } else {
-//              return -1;
-//            }
-//          }
-//        }
-//        return 0;
-//      });
-//
-//      for (String path : filesPath) {
-//        // get startTimeMap and endTimeMap
-//        Map<String, Long> startTimeMap = fileNodeStartTime.get().get(path);
-//        Map<String, Long> endTimeMap = fileNodeEndTime.get().get(path);
-//
-//        // create a new fileNode
-//        String header = syncDataPath;
-//        String relativePath = path.substring(header.length());
-//        TsFileResource fileNode = new TsFileResource(
-//            new File(DirectoryManager.getInstance().getNextFolderIndexForSequenceFile() +
-//                File.separator + relativePath), startTimeMap, endTimeMap
-//        );
-//        // call interface of load external file
-//        try {
-//          if (!STORAGE_GROUP_MANAGER.appendFileToStorageGroupProcessor(storageGroup, fileNode, path)) {
-//            // it is a file with unsequence data
-//            if (config.isUpdateHistoricalDataPossibility()) {
-//              loadOldData(path);
-//            } else {
-//              List<String> overlapFiles = STORAGE_GROUP_MANAGER.getOverlapFiles(
-//                  storageGroup,
-//                  fileNode, uuid.get());
-//              if (overlapFiles.isEmpty()) {
-//                loadOldData(path);
-//              } else {
-//                loadOldData(path, overlapFiles);
-//              }
-//            }
-//          }
-//        } catch (StorageEngineException | IOException | ProcessorException e) {
-//          logger.error("Can not load external file {}", path);
-//          throw new StorageEngineException(e);
-//        }
-//        processedNum++;
-//        logger.info(String
-//            .format("Merging files has completed : %d/%d", processedNum, fileNum.get()));
-//      }
-//    }
-//  }
-//
-//  /**
-//   * Insert all data in the tsfile into IoTDB.
-//   */
-//  public void loadOldData(String filePath) throws IOException, ProcessorException {
-//    Set<String> timeseriesSet = new HashSet<>();
-//    TsFileSequenceReader reader = null;
-//    QueryProcessExecutor insertExecutor = new QueryProcessExecutor();
-//    try {
-//      /** use tsfile reader to get data **/
-//      reader = new TsFileSequenceReader(filePath);
-//      Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
-//      Iterator<Entry<String, TsDeviceMetadataIndex>> entryIterator = deviceIdMap.entrySet()
-//          .iterator();
-//      while (entryIterator.hasNext()) {
-//        Entry<String, TsDeviceMetadataIndex> deviceMIEntry = entryIterator.next();
-//        String deviceId = deviceMIEntry.getKey();
-//        TsDeviceMetadataIndex deviceMI = deviceMIEntry.getValue();
-//        TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(deviceMI);
-//        List<ChunkGroupMetaData> rowGroupMetadataList = deviceMetadata.getChunkGroupMetaDataList();
-//        timeseriesSet.clear();
-//        /** firstly, get all timeseries in the same device **/
-//        for (ChunkGroupMetaData chunkGroupMetaData : rowGroupMetadataList) {
-//          List<ChunkMetaData> chunkMetaDataList = chunkGroupMetaData
-//              .getChunkMetaDataList();
-//          for (ChunkMetaData chunkMetaData : chunkMetaDataList) {
-//            String measurementUID = chunkMetaData.getMeasurementUid();
-//            measurementUID = deviceId + "." + measurementUID;
-//            timeseriesSet.add(measurementUID);
-//          }
-//        }
-//        /** Secondly, use tsFile Reader to form InsertPlan **/
-//        ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader);
-//        List<Path> paths = new ArrayList<>();
-//        paths.clear();
-//        for (String timeseries : timeseriesSet) {
-//          paths.add(new Path(timeseries));
-//        }
-//        QueryExpression queryExpression = QueryExpression.create(paths, null);
-//        QueryDataSet queryDataSet = readTsFile.query(queryExpression);
-//        while (queryDataSet.hasNext()) {
-//          RowRecord record = queryDataSet.next();
-//          List<Field> fields = record.getFields();
-//          List<String> measurementList = new ArrayList<>();
-//          List<String> insertValues = new ArrayList<>();
-//          for (int i = 0; i < fields.size(); i++) {
-//            Field field = fields.get(i);
-//            if (!field.isNull()) {
-//              measurementList.add(paths.get(i).getMeasurement());
-//              if (fields.get(i).getDataType() == TSDataType.TEXT) {
-//                insertValues.add(String.format("'%s'", field.toString()));
-//              } else {
-//                insertValues.add(String.format("%s", field.toString()));
-//              }
-//            }
-//          }
-//          if (insertExecutor.insert(new InsertPlan(deviceId, record.getTimestamp(),
-//              measurementList.toArray(new String[0]), insertValues.toArray(new String[0])))) {
-//            throw new IOException("Inserting series data to IoTDB engine has failed.");
-//          }
-//        }
-//      }
-//    } catch (IOException e) {
-//      logger.error("Can not parse tsfile into SQL", e);
-//      throw new IOException(e);
-//    } catch (ProcessorException e) {
-//      logger.error("Meet error while processing non-query.");
-//      throw new ProcessorException(e);
-//    } finally {
-//      try {
-//        if (reader != null) {
-//          reader.close();
-//        }
-//      } catch (IOException e) {
-//        logger.error("Cannot close file stream {}", filePath, e);
-//      }
-//    }
-//  }
-//
-//  /**
-//   * Insert those valid data in the tsfile into IoTDB
-//   *
-//   * @param overlapFiles:files which are conflict with the sync file
-//   */
-//  public void loadOldData(String filePath, List<String> overlapFiles)
-//      throws IOException, ProcessorException {
-//    Set<String> timeseriesList = new HashSet<>();
-//    QueryProcessExecutor insertExecutor = new QueryProcessExecutor();
-//    Map<String, ReadOnlyTsFile> tsfilesReaders = openReaders(filePath, overlapFiles);
-//    try {
-//      TsFileSequenceReader reader = new TsFileSequenceReader(filePath);
-//      Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
-//      Iterator<String> it = deviceIdMap.keySet().iterator();
-//      while (it.hasNext()) {
-//        String deviceID = it.next();
-//        TsDeviceMetadataIndex deviceMI = deviceIdMap.get(deviceID);
-//        TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(deviceMI);
-//        List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata
-//            .getChunkGroupMetaDataList();
-//        timeseriesList.clear();
-//        /** firstly, get all timeseries in the same device **/
-//        for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
-//          List<ChunkMetaData> chunkMetaDataList = chunkGroupMetaData.getChunkMetaDataList();
-//          for (ChunkMetaData timeSeriesChunkMetaData : chunkMetaDataList) {
-//            String measurementUID = timeSeriesChunkMetaData.getMeasurementUid();
-//            measurementUID = deviceID + "." + measurementUID;
-//            timeseriesList.add(measurementUID);
-//          }
-//        }
-//        reader.close();
-//
-//        /** secondly, use tsFile Reader to form SQL **/
-//        ReadOnlyTsFile readOnlyTsFile = tsfilesReaders.get(filePath);
-//        List<Path> paths = new ArrayList<>();
-//        /** compare data with one timeseries in a round to get valid data **/
-//        for (String timeseries : timeseriesList) {
-//          paths.clear();
-//          paths.add(new Path(timeseries));
-//          Set<InsertPlan> originDataPoints = new HashSet<>();
-//          QueryExpression queryExpression = QueryExpression.create(paths, null);
-//          QueryDataSet queryDataSet = readOnlyTsFile.query(queryExpression);
-//          Set<InsertPlan> newDataPoints = convertToInserPlans(queryDataSet, paths, deviceID);
-//
-//          /** get all data with the timeseries in all overlap files. **/
-//          for (String overlapFile : overlapFiles) {
-//            ReadOnlyTsFile readTsFileOverlap = tsfilesReaders.get(overlapFile);
-//            QueryDataSet queryDataSetOverlap = readTsFileOverlap.query(queryExpression);
-//            originDataPoints.addAll(convertToInserPlans(queryDataSetOverlap, paths, deviceID));
-//          }
-//
-//          /** If there has no overlap data with the timeseries, inserting all data in the sync file **/
-//          if (originDataPoints.isEmpty()) {
-//            for (InsertPlan insertPlan : newDataPoints) {
-//              if (insertExecutor.insert(insertPlan)) {
-//                throw new IOException("Inserting series data to IoTDB engine has failed.");
-//              }
-//            }
-//          } else {
-//            /** Compare every data to get valid data **/
-//            for (InsertPlan insertPlan : newDataPoints) {
-//              if (!originDataPoints.contains(insertPlan)) {
-//                if (insertExecutor.insert(insertPlan)) {
-//                  throw new IOException("Inserting series data to IoTDB engine has failed.");
-//                }
-//              }
-//            }
-//          }
-//        }
-//      }
-//    } catch (IOException e) {
-//      logger.error("Can not parse tsfile into SQL", e);
-//      throw new IOException(e);
-//    } catch (ProcessorException e) {
-//      logger.error("Meet error while processing non-query.", e);
-//      throw new ProcessorException(e);
-//    } finally {
-//      try {
-//        closeReaders(tsfilesReaders);
-//      } catch (IOException e) {
-//        logger.error("Cannot close file stream {}", filePath, e);
-//      }
-//    }
-//  }
-//
-//  private Set<InsertPlan> convertToInserPlans(QueryDataSet queryDataSet, List<Path> paths, String deviceID) throws IOException {
-//    Set<InsertPlan> plans = new HashSet<>();
-//    while (queryDataSet.hasNext()) {
-//      RowRecord record = queryDataSet.next();
-//      List<Field> fields = record.getFields();
-//      /** get all data with the timeseries in the sync file **/
-//      for (int i = 0; i < fields.size(); i++) {
-//        Field field = fields.get(i);
-//        String[] measurementList = new String[1];
-//        if (!field.isNull()) {
-//          measurementList[0] = paths.get(i).getMeasurement();
-//          InsertPlan insertPlan = new InsertPlan(deviceID, record.getTimestamp(),
-//              measurementList, new String[]{field.getDataType() == TSDataType.TEXT ? String.format("'%s'", field.toString())
-//              : field.toString()});
-//          plans.add(insertPlan);
-//        }
-//      }
-//    }
-//    return plans;
-//  }
-//
-//  /**
-//   * Open all tsfile reader and cache
-//   */
-//  private Map<String, ReadOnlyTsFile> openReaders(String filePath, List<String> overlapFiles)
-//      throws IOException {
-//    Map<String, ReadOnlyTsFile> tsfileReaders = new HashMap<>();
-//    tsfileReaders.put(filePath, new ReadOnlyTsFile(new TsFileSequenceReader(filePath)));
-//    for (String overlapFile : overlapFiles) {
-//      tsfileReaders.put(overlapFile, new ReadOnlyTsFile(new TsFileSequenceReader(overlapFile)));
-//    }
-//    return tsfileReaders;
-//  }
-//
-//  /**
-//   * Close all tsfile reader
-//   */
-//  private void closeReaders(Map<String, ReadOnlyTsFile> readers) throws IOException {
-//    for (ReadOnlyTsFile tsfileReader : readers.values()) {
-//      tsfileReader.close();
-//    }
-//  }
-//
-//  /**
-//   * Release threadLocal variable resources
-//   */
-//  @Override
-//  public void cleanUp() {
-//    uuid.remove();
-//    fileNum.remove();
-//    fileNodeMap.remove();
-//    fileNodeStartTime.remove();
-//    fileNodeEndTime.remove();
-//    schemaFromSenderPath.remove();
-//    try {
-//      FileUtils.deleteDirectory(new File(syncFolderPath));
-//    } catch (IOException e) {
-//      logger.error("can not delete directory {}", syncFolderPath, e);
-//    }
-//    logger.info("Synchronization has finished!");
-//  }
-//
-//  public Map<String, List<String>> getFileNodeMap() {
-//    return fileNodeMap.get();
-//  }
-//
-//  public void setFileNodeMap(Map<String, List<String>> fileNodeMap) {
-//    this.fileNodeMap.set(fileNodeMap);
-//  }
-//
-//}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
index 05b67c0..dfe3c52 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
@@ -30,9 +30,9 @@ public class Constans {
   public static final String LOCK_FILE_NAME = "sync_lock";
   public static final String SCHEMA_POS_FILE_NAME = "sync_schema_pos";
   public static final String LAST_LOCAL_FILE_NAME = "last_local_files.txt";
+  public static final String CURRENT_LOCAL_FILE_NAME = "current_local_files.txt";
   public static final String DATA_SNAPSHOT_NAME = "snapshot";
   public static final String SYNC_LOG_NAME = "sync.log";
-  public static final String CURRENT_SYNC_LOG_NAME = "current_sync.log";
 
   public static final String MESSAGE_DIGIT_NAME = "MD5";
   public static final String SYNC_DIR_NAME_SEPARATOR = "_";
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
index d4f7e33..1a529be 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
@@ -20,8 +20,6 @@ package org.apache.iotdb.db.sync.sender.manage;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
 
 public interface ISyncFileManager {
 
@@ -30,6 +28,4 @@ public interface ISyncFileManager {
   void getLastLocalFiles(File lastLocalFile) throws IOException;
 
   void getValidFiles(String dataDir) throws IOException;
-
-  void updateLastLocalFiles(File lastLocalFile, Set<String> localFiles);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
index 20f1dca..c08b64f 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
@@ -125,15 +125,6 @@ public class SyncFileManager implements ISyncFileManager {
     }
   }
 
-  @Override
-  public void updateLastLocalFiles(File lastLocalFile, Set<String> localFiles) {
-
-  }
-
-  public Map<String, Set<File>> getCurrentSealedLocalFilesMap() {
-    return currentSealedLocalFilesMap;
-  }
-
   public Map<String, Set<File>> getLastLocalFilesMap() {
     return lastLocalFilesMap;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
index 3b33f78..53cb54f 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
@@ -22,12 +22,12 @@ import java.util.Set;
 
 public interface ISyncSenderLogAnalyzer {
 
-  void loadLastLocalFiles(Set<String> set);
+  void loadLastLocalFiles(Set<String> lastLocalFiles);
 
-  void loadLogger(Set<String> set);
+  void loadLogger(Set<String> deletedFiles, Set<String> newFiles);
 
   void recover();
 
-  void clearLogger();
+  void clearLogger(Set<String> currentLocalFiles);
 
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
index 15df693..d1cdf9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
@@ -23,22 +23,14 @@ import java.io.IOException;
 
 public interface ISyncSenderLogger {
 
-  void startSync() throws IOException;
-
-  void endSync() throws IOException;
-
   void startSyncDeletedFilesName() throws IOException;
 
   void finishSyncDeletedFileName(File file) throws IOException;
 
-  void endSyncDeletedFilsName() throws IOException;
-
   void startSyncTsFiles() throws IOException;
 
   void finishSyncTsfile(File file) throws IOException;
 
-  void endSyncTsFiles() throws IOException;
-
   void close() throws IOException;
 
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
new file mode 100644
index 0000000..9649806
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.sender.recover;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.iotdb.db.sync.sender.conf.Constans;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SyncSenderLogAnalyzer implements ISyncSenderLogAnalyzer{
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SyncSenderLogAnalyzer.class);
+  private File currentLocalFile;
+  private File lastLocalFile;
+  private File syncLogFile;
+
+  public SyncSenderLogAnalyzer(String senderPath) {
+    this.currentLocalFile = new File(senderPath, Constans.CURRENT_LOCAL_FILE_NAME);
+    this.lastLocalFile = new File(senderPath, Constans.LAST_LOCAL_FILE_NAME);
+    this.syncLogFile = new File(senderPath, Constans.SYNC_LOG_NAME);
+  }
+
+  @Override
+  public void recover() {
+    if (currentLocalFile.exists() && !lastLocalFile.exists()) {
+      currentLocalFile.renameTo(lastLocalFile);
+    } else {
+      Set<String> lastLocalFiles = new HashSet<>();
+      Set<String> deletedFiles = new HashSet<>();
+      Set<String> newFiles = new HashSet<>();
+      loadLastLocalFiles(lastLocalFiles);
+      loadLogger(deletedFiles, newFiles);
+      lastLocalFiles.removeAll(deletedFiles);
+      lastLocalFiles.addAll(newFiles);
+      clearLogger(lastLocalFiles);
+    }
+  }
+
+  @Override
+  public void loadLastLocalFiles(Set<String> lastLocalFiles) {
+    try (BufferedReader br = new BufferedReader(new FileReader(lastLocalFile))) {
+      String line;
+      while ((line = br.readLine()) != null) {
+        lastLocalFiles.add(line);
+      }
+    } catch (IOException e) {
+      LOGGER
+          .error("Can not load last local file list from file {}", lastLocalFile.getAbsoluteFile(),
+              e);
+    }
+  }
+
+  @Override
+  public void loadLogger(Set<String> deletedFiles, Set<String> newFiles) {
+    try (BufferedReader br = new BufferedReader(new FileReader(syncLogFile))) {
+      String line;
+      int mode = 0;
+      while ((line = br.readLine()) != null) {
+        if (line.equals(SyncSenderLogger.SYNC_DELETED_FILE_NAME_START)) {
+          mode = -1;
+        } else if (line.equals(SyncSenderLogger.SYNC_TSFILE_START)) {
+          mode = 1;
+        } else {
+          if (mode == -1) {
+            deletedFiles.add(line);
+          } else if (mode == 1) {
+            newFiles.add(line);
+          }
+        }
+      }
+    } catch (IOException e) {
+      LOGGER
+          .error("Can not load last local file list from file {}", lastLocalFile.getAbsoluteFile(),
+              e);
+    }
+  }
+
+  @Override
+  public void clearLogger(Set<String> currentLocalFiles) {
+    try (BufferedWriter bw = new BufferedWriter(new FileWriter(currentLocalFile))) {
+      for (String line : currentLocalFiles) {
+        bw.write(line);
+        bw.newLine();
+      }
+      bw.flush();
+    } catch (IOException e) {
+      LOGGER.error("Can not clear sync log {}", syncLogFile.getAbsoluteFile(), e);
+    }
+    currentLocalFile.renameTo(lastLocalFile);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java
index 8171d0f..8e118d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.sync.sender.recover;
 
 import java.io.BufferedWriter;
@@ -7,12 +25,8 @@ import java.io.IOException;
 
 public class SyncSenderLogger implements ISyncSenderLogger {
 
-  public static final String SYNC_START = "sync start";
-  public static final String SYNC_END = "sync end";
   public static final String SYNC_DELETED_FILE_NAME_START = "sync deleted file names start";
-  public static final String SYNC_DELETED_FILE_NAME_END = "sync deleted file names end";
   public static final String SYNC_TSFILE_START = "sync tsfile start";
-  public static final String SYNC_TSFILE_END = "sync tsfile end";
   private BufferedWriter bw;
 
   public SyncSenderLogger(String filePath) throws IOException {
@@ -24,20 +38,6 @@ public class SyncSenderLogger implements ISyncSenderLogger {
   }
 
   @Override
-  public void startSync() throws IOException {
-    bw.write(SYNC_START);
-    bw.newLine();
-    bw.flush();
-  }
-
-  @Override
-  public void endSync() throws IOException {
-    bw.write(SYNC_END);
-    bw.newLine();
-    bw.flush();
-  }
-
-  @Override
   public void startSyncDeletedFilesName() throws IOException {
     bw.write(SYNC_DELETED_FILE_NAME_START);
     bw.newLine();
@@ -52,13 +52,6 @@ public class SyncSenderLogger implements ISyncSenderLogger {
   }
 
   @Override
-  public void endSyncDeletedFilsName() throws IOException {
-    bw.write(SYNC_DELETED_FILE_NAME_END);
-    bw.newLine();
-    bw.flush();
-  }
-
-  @Override
   public void startSyncTsFiles() throws IOException {
     bw.write(SYNC_TSFILE_START);
     bw.newLine();
@@ -73,13 +66,6 @@ public class SyncSenderLogger implements ISyncSenderLogger {
   }
 
   @Override
-  public void endSyncTsFiles() throws IOException {
-    bw.write(SYNC_TSFILE_END);
-    bw.newLine();
-    bw.flush();
-  }
-
-  @Override
   public void close() throws IOException {
     bw.close();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
index c616376..fc68960 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
@@ -1,16 +1,4 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements.  See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with the License.  You may obtain
- * a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied.  See the License for the specific language
- * governing permissions and limitations under the License.
- */
+
 package org.apache.iotdb.db.sync.sender.transfer;
 
 import java.io.BufferedReader;
@@ -18,8 +6,6 @@ import java.io.BufferedWriter;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
@@ -33,12 +19,10 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
@@ -50,10 +34,10 @@ import org.apache.iotdb.db.sync.sender.conf.Constans;
 import org.apache.iotdb.db.sync.sender.conf.SyncSenderConfig;
 import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor;
 import org.apache.iotdb.db.sync.sender.manage.SyncFileManager;
+import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogAnalyzer;
 import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogger;
 import org.apache.iotdb.db.utils.SyncUtils;
 import org.apache.iotdb.service.sync.thrift.ResultStatus;
-import org.apache.iotdb.service.sync.thrift.SyncDataStatus;
 import org.apache.iotdb.service.sync.thrift.SyncService;
 import org.apache.iotdb.tsfile.utils.BytesUtils;
 import org.apache.thrift.TException;
@@ -91,7 +75,7 @@ public class DataTransferManager implements IDataTransferManager {
 
   private Map<String, Set<File>> sucessSyncedFilesMap;
 
-  private Map<String, Set<File>> successDeleyedFilesMap;
+  private Map<String, Set<File>> successDeletedFilesMap;
 
   private Map<String, Set<File>> lastLocalFilesMap;
 
@@ -102,11 +86,6 @@ public class DataTransferManager implements IDataTransferManager {
 
   private SyncSenderLogger syncLog;
 
-  /**
-   * Key means storage group, Set means corresponding tsfiles
-   **/
-  private Map<String, Set<String>> validFileSnapshot = new HashMap<>();
-
   private SyncFileManager syncFileManager = SyncFileManager.getInstance();
 
   private ScheduledExecutorService executorService;
@@ -130,6 +109,53 @@ public class DataTransferManager implements IDataTransferManager {
     fileSenderImpl.startTimedTask();
   }
 
+
+  /**
+   * The method is to verify whether the client lock file is locked or not, ensuring that only one
+   * client is running.
+   */
+  private void verifySingleton() throws IOException {
+    File lockFile = new File(config.getLockFilePath());
+    if (!lockFile.getParentFile().exists()) {
+      lockFile.getParentFile().mkdirs();
+    }
+    if (!lockFile.exists()) {
+      lockFile.createNewFile();
+    }
+    if (!lockInstance(config.getLockFilePath())) {
+      logger.error("Sync client is already running.");
+      System.exit(1);
+    }
+  }
+
+  /**
+   * Try to lock lockfile. if failed, it means that sync client has benn started.
+   *
+   * @param lockFile path of lock file
+   */
+  private boolean lockInstance(final String lockFile) {
+    try {
+      final File file = new File(lockFile);
+      final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
+      final FileLock fileLock = randomAccessFile.getChannel().tryLock();
+      if (fileLock != null) {
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+          try {
+            fileLock.release();
+            randomAccessFile.close();
+          } catch (Exception e) {
+            logger.error("Unable to remove lock file: {}", lockFile, e);
+          }
+        }));
+        return true;
+      }
+    } catch (Exception e) {
+      logger.error("Unable to create and/or lock file: {}", lockFile, e);
+    }
+    return false;
+  }
+
+
   @Override
   public void init() {
     if (executorService == null) {
@@ -221,6 +247,8 @@ public class DataTransferManager implements IDataTransferManager {
 
       // 1. Sync data
       for (Entry<String, Set<File>> entry : deletedFilesMap.entrySet()) {
+        checkRecovery();
+        syncLog = new SyncSenderLogger(getSyncLogFile());
         // TODO deal with the situation
         try {
           if (serviceClient.init(entry.getKey()) == ResultStatus.FAILURE) {
@@ -232,12 +260,10 @@ public class DataTransferManager implements IDataTransferManager {
         logger.info("Sync process starts to transfer data of storage group {}", entry.getKey());
         syncDeletedFilesName(entry.getKey(), entry.getValue());
         syncDataFilesInOneGroup(entry.getKey(), entry.getValue());
+        clearSyncLog();
       }
 
-      // 2. Clear sync log
-      clearSyncLog();
-
-    } catch (SyncConnectionException | TException e) {
+    } catch (SyncConnectionException e) {
       logger.error("cannot finish sync process", e);
     } finally {
       if (syncLog != null) {
@@ -248,23 +274,22 @@ public class DataTransferManager implements IDataTransferManager {
   }
 
   private void checkRecovery() {
-
-  }
-
-  public void clearSyncLog() {
-
+    new SyncSenderLogAnalyzer(config.getSenderPath()).recover();
   }
 
   @Override
-  public void syncDeletedFilesName(String sgName, Set<File> deletedFilesName) {
+  public void syncDeletedFilesName(String sgName, Set<File> deletedFilesName) throws IOException {
     if (deletedFilesName.isEmpty()) {
       logger.info("There has no deleted files to be synced in storage group {}", sgName);
       return;
     }
+    syncLog.startSyncDeletedFilesName();
     logger.info("Start to sync names of deleted files in storage group {}", sgName);
-    for(File file:deletedFilesName){
+    for (File file : deletedFilesName) {
       try {
         serviceClient.syncDeletedFileName(file.getName());
+        successDeletedFilesMap.get(sgName).add(file);
+        syncLog.finishSyncDeletedFileName(file);
       } catch (TException e) {
         logger.error("Can not sync deleted file name {}, skip it.", file);
       }
@@ -274,27 +299,29 @@ public class DataTransferManager implements IDataTransferManager {
 
   @Override
   public void syncDataFilesInOneGroup(String sgName, Set<File> toBeSyncFiles)
-      throws SyncConnectionException {
-    Set<String> validSnapshot = validFileSnapshot.get(sgName);
-    if (validSnapshot.isEmpty()) {
+      throws SyncConnectionException, IOException {
+    if (toBeSyncFiles.isEmpty()) {
       logger.info("There has no new tsfiles to be synced in storage group {}", sgName);
       return;
     }
+    syncLog.startSyncTsFiles();
     logger.info("Sync process starts to transfer data of storage group {}", sgName);
     int cnt = 0;
-    for (String tsfilePath : toBeSyncFiles) {
+    for (File tsfile : toBeSyncFiles) {
       cnt++;
       File snapshotFile = null;
       try {
-        snapshotFile = makeFileSnapshot(tsfilePath);
+        snapshotFile = makeFileSnapshot(tsfile);
         syncSingleFile(snapshotFile);
+        sucessSyncedFilesMap.get(sgName).add(tsfile);
+        syncLog.finishSyncTsfile(tsfile);
         logger.info("Task of synchronization has completed {}/{}.", cnt, toBeSyncFiles.size());
       } catch (IOException e) {
         logger.info(
             "Tsfile {} can not make snapshot, so skip the tsfile and continue to sync other tsfiles",
-            tsfilePath, e);
+            tsfile, e);
       } finally {
-        if(snapshotFile != null) {
+        if (snapshotFile != null) {
           snapshotFile.deleteOnExit();
         }
       }
@@ -302,27 +329,25 @@ public class DataTransferManager implements IDataTransferManager {
     logger.info("Sync process has finished storage group {}.", sgName);
   }
 
-  private File makeFileSnapshot(String filePath) throws IOException {
-    String snapshotFilePath = SyncUtils.getSnapshotFilePath(filePath);
-    File newFile = new File(snapshotFilePath);
-    if (!newFile.getParentFile().exists()) {
-      newFile.getParentFile().mkdirs();
+  private File makeFileSnapshot(File file) throws IOException {
+    File snapshotFile = SyncUtils.getSnapshotFile(file);
+    if (!snapshotFile.getParentFile().exists()) {
+      snapshotFile.getParentFile().mkdirs();
     }
-    Path link = FileSystems.getDefault().getPath(snapshotFilePath);
-    Path target = FileSystems.getDefault().getPath(filePath);
+    Path link = FileSystems.getDefault().getPath(snapshotFile.getAbsolutePath());
+    Path target = FileSystems.getDefault().getPath(snapshotFile.getAbsolutePath());
     Files.createLink(link, target);
-    return newFile;
+    return snapshotFile;
   }
 
-
   /**
    * Transfer data of a storage group to receiver.
-   *
    */
   private void syncSingleFile(File snapshotFile) throws SyncConnectionException {
     try {
       int retryCount = 0;
       MessageDigest md = MessageDigest.getInstance(Constans.MESSAGE_DIGIT_NAME);
+      serviceClient.initSyncData(snapshotFile.getName());
       outer:
       while (true) {
         retryCount++;
@@ -341,9 +366,7 @@ public class DataTransferManager implements IDataTransferManager {
             md.update(buffer, 0, dataLength);
             ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
             bos.reset();
-            if (!Boolean.parseBoolean(serviceClient
-                .syncData(null, snapshotFile.getName(), buffToSend,
-                    SyncDataStatus.PROCESSING_STATUS))) {
+            if (serviceClient.syncData(buffToSend) == ResultStatus.FAILURE) {
               logger.info("Receiver failed to receive data from {}, retry.",
                   snapshotFile.getAbsoluteFile());
               continue outer;
@@ -353,14 +376,13 @@ public class DataTransferManager implements IDataTransferManager {
 
         // the file is sent successfully
         String md5OfSender = (new BigInteger(1, md.digest())).toString(16);
-        String md5OfReceiver = serviceClient.syncData(md5OfSender, snapshotFile.getName(),
-            null, SyncDataStatus.FINISH_STATUS);
+        String md5OfReceiver = serviceClient.checkDataMD5(md5OfSender);
         if (md5OfSender.equals(md5OfReceiver)) {
           logger.info("Receiver has received {} successfully.", snapshotFile.getAbsoluteFile());
           break;
         }
       }
-    } catch (IOException e) {
+    } catch (IOException | TException | NoSuchAlgorithmException e) {
       throw new SyncConnectionException("Cannot sync data with receiver.", e);
     }
   }
@@ -389,67 +411,14 @@ public class DataTransferManager implements IDataTransferManager {
    * UUID marks the identity of sender for receiver.
    */
   @Override
-  public boolean confirmIdentity(String uuidPath) throws SyncConnectionException, IOException {
-    File file = new File(uuidPath);
-    /** Mark the identity of sender **/
-    String uuid;
-    if (!file.getParentFile().exists()) {
-      file.getParentFile().mkdirs();
-    }
-    if (!file.exists()) {
-      try (FileOutputStream out = new FileOutputStream(file)) {
-        file.createNewFile();
-        uuid = generateUUID();
-        out.write(uuid.getBytes());
-      } catch (IOException e) {
-        logger.error("Cannot insert UUID to file {}", file.getPath());
-        throw new IOException(e);
-      }
-    } else {
-      try (BufferedReader bf = new BufferedReader((new FileReader(uuidPath)))) {
-        uuid = bf.readLine();
-      } catch (IOException e) {
-        logger.error("Cannot read UUID from file{}", file.getPath());
-        throw new IOException(e);
-      }
-    }
-    boolean legalConnection;
+  public boolean confirmIdentity(String uuidPath) throws SyncConnectionException {
     try {
-      legalConnection = serviceClient.checkIdentity(uuid,
-          InetAddress.getLocalHost().getHostAddress());
+      return serviceClient.checkIdentity(InetAddress.getLocalHost().getHostAddress())
+          == ResultStatus.SUCCESS;
     } catch (Exception e) {
       logger.error("Cannot confirm identity with receiver");
       throw new SyncConnectionException(e);
     }
-    return legalConnection;
-  }
-
-  private String generateUUID() {
-    return Constans.SYNC_SENDER + UUID.randomUUID().toString().replaceAll("-", "");
-  }
-
-  /**
-   * Create snapshots for valid files.
-   */
-  @Override
-  public Set<String> makeFileSnapshot(Set<String> validFiles) {
-    Set<String> validFilesSnapshot = new HashSet<>();
-    for (String filePath : validFiles) {
-      try {
-        String snapshotFilePath = SyncUtils.getSnapshotFilePath(filePath);
-        File newFile = new File(snapshotFilePath);
-        if (!newFile.getParentFile().exists()) {
-          newFile.getParentFile().mkdirs();
-        }
-        Path link = FileSystems.getDefault().getPath(snapshotFilePath);
-        Path target = FileSystems.getDefault().getPath(filePath);
-        Files.createLink(link, target);
-        validFilesSnapshot.add(snapshotFilePath);
-      } catch (IOException e) {
-        logger.error("Can not make snapshot for file {}", filePath);
-      }
-    }
-    return validFilesSnapshot;
   }
 
   /**
@@ -521,14 +490,6 @@ public class DataTransferManager implements IDataTransferManager {
     }
   }
 
-  private File getSchemaPosFile() {
-    return new File(config.getSenderPath(), Constans.SCHEMA_POS_FILE_NAME)
-  }
-
-  private File getSchemaLogFile() {
-    return new File(IoTDBDescriptor.getInstance().getConfig().getSchemaDir(),
-        MetadataConstant.METADATA_LOG);
-  }
 
   private boolean checkMD5ForSchema(String md5OfSender) throws TException {
     String md5OfReceiver = serviceClient.checkDataMD5(md5OfSender);
@@ -536,7 +497,8 @@ public class DataTransferManager implements IDataTransferManager {
       logger.info("Receiver has received schema successfully, retry.");
       return true;
     } else {
-      logger.error("MD5 check of schema file {} failed, retry", getSchemaLogFile().getAbsoluteFile());
+      logger
+          .error("MD5 check of schema file {} failed, retry", getSchemaLogFile().getAbsoluteFile());
       return false;
     }
   }
@@ -566,49 +528,37 @@ public class DataTransferManager implements IDataTransferManager {
     }
   }
 
-  /**
-   * The method is to verify whether the client lock file is locked or not, ensuring that only one
-   * client is running.
-   */
-  private void verifySingleton() throws IOException {
-    File lockFile = new File(config.getLockFilePath());
-    if (!lockFile.getParentFile().exists()) {
-      lockFile.getParentFile().mkdirs();
-    }
-    if (!lockFile.exists()) {
-      lockFile.createNewFile();
+  private void clearSyncLog() {
+    for (Entry<String, Set<File>> entry : lastLocalFilesMap.entrySet()) {
+      entry.getValue()
+          .removeAll(successDeletedFilesMap.getOrDefault(entry.getKey(), new HashSet<>()));
+      entry.getValue()
+          .removeAll(sucessSyncedFilesMap.getOrDefault(entry.getKey(), new HashSet<>()));
     }
-    if (!lockInstance(config.getLockFilePath())) {
-      logger.error("Sync client is already running.");
-      System.exit(1);
+    File currentLocalFile = getCurrentLogFile();
+    File lastLocalFile = new File(config.getLastFileInfo());
+    try (BufferedWriter bw = new BufferedWriter(new FileWriter(currentLocalFile))) {
+      for (Set<File> currentLocalFiles : lastLocalFilesMap.values()) {
+        for (File file : currentLocalFiles) {
+          bw.write(file.getAbsolutePath());
+          bw.newLine();
+        }
+        bw.flush();
+      }
+    } catch (IOException e) {
+      logger.error("Can not clear sync log {}", lastLocalFile.getAbsoluteFile(), e);
     }
+    currentLocalFile.renameTo(lastLocalFile);
   }
 
-  /**
-   * Try to lock lockfile. if failed, it means that sync client has benn started.
-   *
-   * @param lockFile path of lock file
-   */
-  private boolean lockInstance(final String lockFile) {
-    try {
-      final File file = new File(lockFile);
-      final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
-      final FileLock fileLock = randomAccessFile.getChannel().tryLock();
-      if (fileLock != null) {
-        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-          try {
-            fileLock.release();
-            randomAccessFile.close();
-          } catch (Exception e) {
-            logger.error("Unable to remove lock file: {}", lockFile, e);
-          }
-        }));
-        return true;
-      }
-    } catch (Exception e) {
-      logger.error("Unable to create and/or lock file: {}", lockFile, e);
-    }
-    return false;
+
+  private File getSchemaPosFile() {
+    return new File(config.getSenderPath(), Constans.SCHEMA_POS_FILE_NAME);
+  }
+
+  private File getSchemaLogFile() {
+    return new File(IoTDBDescriptor.getInstance().getConfig().getSchemaDir(),
+        MetadataConstant.METADATA_LOG);
   }
 
   private static class InstanceHolder {
@@ -621,7 +571,7 @@ public class DataTransferManager implements IDataTransferManager {
   }
 
   private File getCurrentLogFile() {
-    return new File(config.getSenderPath(), Constans.CURRENT_SYNC_LOG_NAME);
+    return new File(config.getSenderPath(), Constans.CURRENT_LOCAL_FILE_NAME);
   }
 
   public void setConfig(SyncSenderConfig config) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
index 82b0d00..3366b91 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
@@ -45,22 +45,18 @@ public interface IDataTransferManager {
   boolean confirmIdentity(String uuidPath) throws SyncConnectionException, IOException;
 
   /**
-   * Make file snapshots before sending files.
-   */
-  Set<String> makeFileSnapshot(Set<String> validFiles);
-
-  /**
    * Send schema file to receiver.
    */
   void syncSchema() throws SyncConnectionException, TException;
 
   void syncDeletedFilesName(String sgName, Set<File> deletedFilesName)
-      throws SyncConnectionException;
+      throws SyncConnectionException, IOException;
 
   /**
    * For all valid files, send it to receiver side and load these data in receiver.
    */
-  void syncDataFilesInOneGroup(String sgName, Set<File> deletedFilesName) throws SyncConnectionException;
+  void syncDataFilesInOneGroup(String sgName, Set<File> deletedFilesName)
+      throws SyncConnectionException, IOException;
 
   /**
    * Execute a sync task.
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
index 8e7a9e8..13247cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
@@ -37,14 +37,13 @@ public class SyncUtils {
    * multiple directories, it's necessary to make a snapshot in the same disk. It's used by sync
    * sender.
    */
-  public static String getSnapshotFilePath(String filePath) {
-    String relativeFilePath =
-        new File(filePath).getParent() + File.separator + new File(filePath).getName();
+  public static File getSnapshotFile(File file) {
+    String relativeFilePath = file.getParent() + File.separator + file.getName();
     String snapshotDir = SyncSenderDescriptor.getInstance().getConfig().getSnapshotPath();
     if (!new File(snapshotDir).exists()) {
       new File(snapshotDir).mkdirs();
     }
-    return new File(snapshotDir, relativeFilePath).getAbsolutePath();
+    return new File(snapshotDir, relativeFilePath);
   }
 
   /**


[incubator-iotdb] 05/05: merge master

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 2a67544931d80860829d137676c270e14b6e778d
Merge: b15ec6b ec4f051
Author: lta <li...@163.com>
AuthorDate: Thu Aug 22 11:13:29 2019 +0800

    merge master

 .gitignore                                         |   3 +-
 LICENSE-binary                                     |  19 -
 NOTICE                                             |   6 +-
 NOTICE-binary                                      |   2 +-
 README.md                                          |  65 ++--
 client/pom.xml                                     |  65 ++--
 .../src/assembly/client.xml                        |  20 +-
 .../assembly/resources}/sbin/start-client.bat      |   0
 .../assembly/resources}/sbin/start-client.sh       |   4 -
 .../assembly/resources}/tools/export-csv.bat       |   0
 .../assembly/resources}/tools/export-csv.sh        |   0
 .../assembly/resources}/tools/import-csv.bat       |   0
 .../assembly/resources}/tools/import-csv.sh        |   0
 .../apache/iotdb/cli/client/AbstractClient.java    |  46 ++-
 .../java/org/apache/iotdb/cli/client/Client.java   |  21 +-
 .../java/org/apache/iotdb/cli/tool/ExportCsv.java  |  44 ++-
 .../apache/iotdb/cli/client/AbstractScript.java    |  29 +-
 .../iotdb/cli/client/StartClientScriptIT.java      |  11 +-
 .../org/apache/iotdb/cli/tool/ExportCsvTestIT.java |   8 +-
 .../org/apache/iotdb/cli/tool/ImportCsvTestIT.java |   8 +-
 distribution/pom.xml                               | 100 +++++
 .../src}/assembly/distribution.xml                 |  40 +-
 docker/{ => src/main}/Dockerfile                   |  18 +-
 docs/Documentation-CHN/QuickStart.md               |  14 +-
 .../2-Data Type.md                                 |   2 +-
 docs/Documentation-CHN/UserGuide/8-Tools-Cli.md    |  59 ++-
 .../Documentation-CHN/UserGuide/8-Tools-Grafana.md |   2 +-
 docs/Documentation/Frequently asked questions.md   |  11 +-
 docs/Documentation/QuickStart.md                   |  13 +-
 .../2-Data Type.md                                 |   2 +-
 .../4-Deployment and Management/1-Deployment.md    |   5 +-
 .../7-Build and use IoTDB by Dockerfile.md         |   4 +-
 docs/Documentation/UserGuide/7-TsFile/2-Usage.md   |  90 ++++-
 docs/Documentation/UserGuide/8-Tools-Cli.md        |  47 ++-
 docs/Documentation/UserGuide/8-Tools-Grafana.md    |   6 +-
 example/kafka/pom.xml                              |  24 +-
 example/pom.xml                                    |  52 ++-
 example/rocketmq/pom.xml                           |  14 +-
 example/{ => tsfile}/pom.xml                       |  50 +--
 {tsfile/example => example/tsfile}/readme.md       |   2 +-
 .../java/org/apache/iotdb/tsfile/TsFileRead.java   |  10 +-
 .../apache/iotdb/tsfile/TsFileSequenceRead.java    |   8 +-
 .../iotdb/tsfile/TsFileWriteWithRowBatch.java      |  93 +++++
 .../iotdb/tsfile/TsFileWriteWithTSRecord.java      |   4 +-
 grafana/pom.xml                                    |   2 +-
 grafana/readme.md                                  |   6 +-
 grafana/readme_zh.md                               |   2 +-
 hadoop/pom.xml                                     |   6 +-
 jdbc/pom.xml                                       |  15 +-
 .../main/java/org/apache/iotdb/jdbc/Config.java    |   2 +
 .../org/apache/iotdb/jdbc/IoTDBConnection.java     |  15 +-
 pom.xml                                            | 401 ++++++++++++++++++---
 server/pom.xml                                     |  85 ++---
 .../resources}/conf/iotdb-engine.properties        |  12 +
 .../assembly/resources}/conf/iotdb-env.bat         |   0
 .../assembly/resources}/conf/iotdb-env.sh          |   0
 .../resources}/conf/iotdb-sync-client.properties   |   0
 .../assembly/resources}/conf/logback-tool.xml      |   0
 .../assembly/resources}/conf/logback.xml           |   0
 .../resources}/conf/tsfile-format.properties       |   0
 .../assembly/resources}/sbin/start-server.bat      |   0
 .../assembly/resources}/sbin/start-server.sh       |   0
 .../assembly/resources}/sbin/stop-server.bat       |   0
 .../assembly/resources}/sbin/stop-server.sh        |   0
 .../assembly/resources}/tools/memory-tool.bat      |   0
 .../assembly/resources}/tools/memory-tool.sh       |   0
 .../assembly/resources}/tools/start-WalChecker.bat |   8 +-
 .../assembly/resources}/tools/start-WalChecker.sh  |  11 +-
 .../resources}/tools/start-sync-client.bat         |   0
 .../assembly/resources}/tools/start-sync-client.sh |   0
 .../assembly/resources}/tools/stop-sync-client.bat |   0
 .../assembly/resources}/tools/stop-sync-client.sh  |   0
 .../logback-tool.xml => src/assembly/server.xml}   |  20 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  53 ++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  34 ++
 .../iotdb/db/engine/cache/DeviceMetaDataCache.java |  22 +-
 .../iotdb/db/engine/cache/TsFileMetaDataCache.java |  18 +-
 .../iotdb/db/engine/cache/TsFileMetadataUtils.java |  18 +-
 .../apache/iotdb/db/engine/flush/FlushManager.java |  33 +-
 .../iotdb/db/engine/flush/FlushManagerMBean.java   |  32 ++
 .../db/engine/flush/pool/AbstractPoolManager.java  |   5 +-
 .../engine/flush/pool/FlushSubTaskPoolManager.java |   1 -
 .../db/engine/flush/pool/FlushTaskPoolManager.java |   1 +
 .../engine/storagegroup/StorageGroupProcessor.java |   2 +-
 .../org/apache/iotdb/db/service/JDBCService.java   |  11 +-
 .../org/apache/iotdb/db/service/ServiceType.java   |   9 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  44 ++-
 .../iotdb/db/sync/receiver/SyncServerManager.java  |  11 +-
 .../org/apache/iotdb/db/script/EnvScriptIT.java    |  34 +-
 spark-tsfile/pom.xml                               |  32 +-
 .../org/apache/iotdb/tool/TsFileWriteTool.java     |   2 +-
 .../org/apache/iotdb/tsfile/HDFSInputTest.java     |   2 +-
 tsfile/pom.xml                                     |   3 -
 .../encoding/encoder/DeltaBinaryEncoder.java       |   6 +-
 .../file/metadata/statistics/BinaryStatistics.java |  13 +
 .../metadata/statistics/BooleanStatistics.java     |  13 +
 .../file/metadata/statistics/DoubleStatistics.java |  12 +
 .../file/metadata/statistics/FloatStatistics.java  |  12 +
 .../metadata/statistics/IntegerStatistics.java     |  13 +
 .../file/metadata/statistics/LongStatistics.java   |  12 +
 .../file/metadata/statistics/NoStatistics.java     |  20 +
 .../file/metadata/statistics/Statistics.java       |  28 ++
 .../apache/iotdb/tsfile/write/TsFileWriter.java    |  57 ++-
 .../tsfile/write/chunk/ChunkGroupWriterImpl.java   |  48 ++-
 .../iotdb/tsfile/write/chunk/ChunkWriterImpl.java  | 102 +++++-
 .../tsfile/write/chunk/IChunkGroupWriter.java      |  16 +-
 .../iotdb/tsfile/write/chunk/IChunkWriter.java     |  35 ++
 .../apache/iotdb/tsfile/write/page/PageWriter.java |  70 ++++
 .../apache/iotdb/tsfile/write/record/RowBatch.java | 137 +++++++
 .../iotdb/tsfile/write/schema/FileSchema.java      |  86 ++++-
 .../tsfile/write/schema/MeasurementSchema.java     |   3 -
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |  18 +-
 .../tsfile/file/metadata/utils/TestHelper.java     |   9 +-
 .../iotdb/tsfile/file/metadata/utils/Utils.java    |   8 +-
 .../iotdb/tsfile/write/TsFileReadWriteTest.java    |  62 +++-
 .../write/schema/converter/SchemaBuilderTest.java  |   2 +-
 116 files changed, 2187 insertions(+), 568 deletions(-)



[incubator-iotdb] 02/05: complete manage module

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit ab86f899065649273be9cf188e7620c48fe120e0
Author: lta <li...@163.com>
AuthorDate: Wed Aug 21 15:39:25 2019 +0800

    complete manage module
---
 server/iotdb/conf/iotdb-sync-client.properties     |  12 +-
 .../org/apache/iotdb/db/conf/IoTDBConstant.java    |   4 +
 .../db/conf/directories/DirectoryManager.java      |   8 +-
 .../org/apache/iotdb/db/metadata/MManager.java     |   2 +-
 .../db/sync/receiver/transfer/SyncServiceImpl.java |  65 +-
 .../receiver/transfer/SyncServiceImplBackup.java   | 736 +++++++++++++++++++++
 .../iotdb/db/sync/sender/SyncFileManager.java      | 208 ------
 .../apache/iotdb/db/sync/sender/conf/Constans.java |  17 +-
 .../db/sync/sender/conf/SyncSenderConfig.java      | 129 ++--
 .../db/sync/sender/conf/SyncSenderDescriptor.java  |  36 +-
 .../{IFileManager.java => ISyncFileManager.java}   |  12 +-
 .../db/sync/sender/manage/SyncFileManager.java     | 157 +++++
 .../db/sync/sender/recover/ISyncSenderLogger.java  |  21 +-
 .../db/sync/sender/recover/SyncSenderLogger.java   |  86 +++
 .../sync/sender/transfer/DataTransferManager.java  | 542 ++++++++-------
 .../sync/sender/transfer/IDataTransferManager.java |  16 +-
 .../java/org/apache/iotdb/db/utils/SyncUtils.java  |  39 +-
 .../iotdb/db/sync/sender/SingleClientSyncTest.java |   4 +-
 .../iotdb/db/sync/sender/SyncFileManagerTest.java  |   2 +-
 service-rpc/src/main/thrift/sync.thrift            |  21 +-
 20 files changed, 1460 insertions(+), 657 deletions(-)

diff --git a/server/iotdb/conf/iotdb-sync-client.properties b/server/iotdb/conf/iotdb-sync-client.properties
index 65b3074..3606ab4 100644
--- a/server/iotdb/conf/iotdb-sync-client.properties
+++ b/server/iotdb/conf/iotdb-sync-client.properties
@@ -17,19 +17,11 @@
 # under the License.
 #
 
-# Sync server port address
+# Sync receiver server address
 server_ip=127.0.0.1
 
-# Sync client port
+# Sync receiver server port
 server_port=5555
 
 # The period time of sync process, the time unit is second.
 sync_period_in_second=600
-
-# Set bufferWrite data absolute path of IoTDB
-# It needs to be set with iotdb_schema_directory, they have to belong to the same IoTDB
-# iotdb_bufferWrite_directory = D:\\iotdb\\data\\data\\settled
-
-# Set schema file absolute path of IoTDB
-# It needs to be set with iotdb_bufferWrite_directory, they have to belong to the same IoTDB
-# iotdb_schema_directory = D:\\iotdb\\data\\system\\schema\\mlog.txt
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index 3ea1621..13042aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -65,4 +65,8 @@ public class IoTDBConstant {
   public static final String USER = "User";
   public static final String PRIVILEGE = "Privilege";
 
+  // data folder name
+  public static final String SEQUENCE_FLODER_NAME = "sequence";
+  public static final String UNSEQUENCE_FLODER_NAME = "unsequence";
+
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java
index 9b4b58d..e3b8d5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.strategy.DirectoryStrategy;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
@@ -44,14 +45,16 @@ public class DirectoryManager {
     sequenceFileFolders =
         new ArrayList<>(Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getDataDirs()));
     for (int i = 0; i < sequenceFileFolders.size(); i++) {
-      sequenceFileFolders.set(i, sequenceFileFolders.get(i) + File.separator + "sequence");
+      sequenceFileFolders
+          .set(i, sequenceFileFolders.get(i) + File.separator + IoTDBConstant.SEQUENCE_FLODER_NAME);
     }
     mkDirs(sequenceFileFolders);
 
     unsequenceFileFolders =
         new ArrayList<>(Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getDataDirs()));
     for (int i = 0; i < unsequenceFileFolders.size(); i++) {
-      unsequenceFileFolders.set(i, unsequenceFileFolders.get(i) + File.separator + "unsequence");
+      unsequenceFileFolders.set(i,
+          unsequenceFileFolders.get(i) + File.separator + IoTDBConstant.UNSEQUENCE_FLODER_NAME);
     }
     mkDirs(unsequenceFileFolders);
 
@@ -120,6 +123,7 @@ public class DirectoryManager {
   }
 
   private static class DirectoriesHolder {
+
     private static final DirectoryManager INSTANCE = new DirectoryManager();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 1116ff9..ad50573 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -80,7 +80,7 @@ public class MManager {
   private MManager() {
 
     schemaDir =
-        IoTDBDescriptor.getInstance().getConfig().getSystemDir() + File.separator + "schema";
+        IoTDBDescriptor.getInstance().getConfig().getSchemaDir();
 
     File systemFolder = new File(schemaDir);
     if (!systemFolder.exists()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index 4866d41..02bb5b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -45,10 +45,10 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.MetadataErrorException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metadata.MetadataConstant;
 import org.apache.iotdb.db.metadata.MetadataOperationType;
@@ -57,6 +57,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.sync.sender.conf.Constans;
 import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.db.utils.SyncUtils;
+import org.apache.iotdb.service.sync.thrift.ResultStatus;
 import org.apache.iotdb.service.sync.thrift.SyncDataStatus;
 import org.apache.iotdb.service.sync.thrift.SyncService;
 import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
@@ -73,6 +74,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.expression.QueryExpression;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,22 +88,13 @@ public class SyncServiceImpl implements SyncService.Iface {
    **/
   private static final MManager metadataManger = MManager.getInstance();
 
-  private static final String SYNC_SERVER = Constans.SYNC_SERVER;
+  private static final String SYNC_SERVER = Constans.SYNC_RECEIVER;
 
-  private ThreadLocal<String> uuid = new ThreadLocal<>();
   /**
    * String means storage group,List means the set of new files(path) in local IoTDB and String
    * means path of new Files
    **/
   private ThreadLocal<Map<String, List<String>>> fileNodeMap = new ThreadLocal<>();
-  /**
-   * Map String1 means timeseries String2 means path of new Files, long means startTime
-   **/
-  private ThreadLocal<Map<String, Map<String, Long>>> fileNodeStartTime = new ThreadLocal<>();
-  /**
-   * Map String1 means timeseries String2 means path of new Files, long means endTime
-   **/
-  private ThreadLocal<Map<String, Map<String, Long>>> fileNodeEndTime = new ThreadLocal<>();
 
   /**
    * Total num of files that needs to be loaded
@@ -131,28 +124,41 @@ public class SyncServiceImpl implements SyncService.Iface {
   /**
    * Sync folder path of server
    **/
-  private String syncFolderPath;
+  private ThreadLocal<String> syncFolderPath = new ThreadLocal<>();
 
   /**
    * Sync data path of server
    */
-  private String syncDataPath;
+  private ThreadLocal<String> syncDataPath = new ThreadLocal<>();
+
+  private ThreadLocal<String> currentSG = new ThreadLocal<>();
+
+
+  /**
+   * Verify IP address of sender
+   */
+  @Override
+  public ResultStatus checkIdentity(String ipAddress) {
+    Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName());
+    initPath();
+    return SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress) ? ResultStatus.SUCCESS
+        : ResultStatus.FAILURE;
+  }
 
   /**
    * Init threadLocal variable and delete old useless files.
    */
   @Override
-  public boolean init(String storageGroup) {
+  public ResultStatus init(String storageGroup) {
     logger.info("Sync process starts to receive data of storage group {}", storageGroup);
+    currentSG.set(storageGroup);
     fileNum.set(0);
     fileNodeMap.set(new HashMap<>());
-    fileNodeStartTime.set(new HashMap<>());
-    fileNodeEndTime.set(new HashMap<>());
     try {
       FileUtils.deleteDirectory(new File(syncDataPath));
     } catch (IOException e) {
       logger.error("cannot delete directory {} ", syncFolderPath);
-      return false;
+      return ResultStatus.FAILURE;
     }
     for (String bufferWritePath : bufferWritePaths) {
       bufferWritePath = FilePathUtils.regularizePath(bufferWritePath);
@@ -163,22 +169,11 @@ public class SyncServiceImpl implements SyncService.Iface {
           FileUtils.deleteDirectory(backupDirectory);
         } catch (IOException e) {
           logger.error("cannot delete directory {} ", syncFolderPath);
-          return false;
+          return ResultStatus.FAILURE;
         }
       }
     }
-    return true;
-  }
-
-  /**
-   * Verify IP address of sender
-   */
-  @Override
-  public boolean checkIdentity(String uuid, String ipAddress) {
-    Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName());
-    this.uuid.set(uuid);
-    initPath();
-    return SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress);
+    return ResultStatus.SUCCESS;
   }
 
   /**
@@ -242,6 +237,16 @@ public class SyncServiceImpl implements SyncService.Iface {
     return md5OfReceiver;
   }
 
+  @Override
+  public String checkDataMD5(String md5) throws TException {
+    return null;
+  }
+
+  @Override
+  public void endSync() throws TException {
+
+  }
+
   /**
    * Load metadata from sender
    */
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImplBackup.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImplBackup.java
new file mode 100644
index 0000000..6f2f754
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImplBackup.java
@@ -0,0 +1,736 @@
+///**
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied.  See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// */
+//package org.apache.iotdb.db.sync.receiver.transfer;
+//
+//import java.io.BufferedReader;
+//import java.io.File;
+//import java.io.FileInputStream;
+//import java.io.FileNotFoundException;
+//import java.io.FileOutputStream;
+//import java.io.IOException;
+//import java.math.BigInteger;
+//import java.nio.ByteBuffer;
+//import java.nio.channels.FileChannel;
+//import java.security.MessageDigest;
+//import java.util.ArrayList;
+//import java.util.Collections;
+//import java.util.HashMap;
+//import java.util.HashSet;
+//import java.util.Iterator;
+//import java.util.List;
+//import java.util.Map;
+//import java.util.Map.Entry;
+//import java.util.Set;
+//import org.apache.commons.io.FileUtils;
+//import org.apache.commons.lang3.StringUtils;
+//import org.apache.iotdb.db.concurrent.ThreadName;
+//import org.apache.iotdb.db.conf.IoTDBConfig;
+//import org.apache.iotdb.db.conf.IoTDBDescriptor;
+//import org.apache.iotdb.db.conf.directories.DirectoryManager;
+//import org.apache.iotdb.db.engine.StorageEngine;
+//import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+//import org.apache.iotdb.db.exception.MetadataErrorException;
+//import org.apache.iotdb.db.exception.PathErrorException;
+//import org.apache.iotdb.db.exception.ProcessorException;
+//import org.apache.iotdb.db.exception.StorageEngineException;
+//import org.apache.iotdb.db.metadata.MManager;
+//import org.apache.iotdb.db.metadata.MetadataConstant;
+//import org.apache.iotdb.db.metadata.MetadataOperationType;
+//import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
+//import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+//import org.apache.iotdb.db.sync.sender.conf.Constans;
+//import org.apache.iotdb.db.utils.FilePathUtils;
+//import org.apache.iotdb.db.utils.SyncUtils;
+//import org.apache.iotdb.service.sync.thrift.SyncDataStatus;
+//import org.apache.iotdb.service.sync.thrift.SyncService;
+//import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+//import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+//import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
+//import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
+//import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+//import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+//import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+//import org.apache.iotdb.tsfile.read.ReadOnlyTsFile;
+//import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+//import org.apache.iotdb.tsfile.read.common.Field;
+//import org.apache.iotdb.tsfile.read.common.Path;
+//import org.apache.iotdb.tsfile.read.common.RowRecord;
+//import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+//import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//public class SyncServiceImplBackup implements SyncService.Iface {
+//
+//  private static final Logger logger = LoggerFactory.getLogger(SyncServiceImplBackup.class);
+//
+//  private static final StorageEngine STORAGE_GROUP_MANAGER = StorageEngine.getInstance();
+//  /**
+//   * Metadata manager
+//   **/
+//  private static final MManager metadataManger = MManager.getInstance();
+//
+//  private static final String SYNC_SERVER = Constans.SYNC_RECEIVER;
+//
+//  private ThreadLocal<String> uuid = new ThreadLocal<>();
+//  /**
+//   * String means storage group,List means the set of new files(path) in local IoTDB and String
+//   * means path of new Files
+//   **/
+//  private ThreadLocal<Map<String, List<String>>> fileNodeMap = new ThreadLocal<>();
+//  /**
+//   * Map String1 means timeseries String2 means path of new Files, long means startTime
+//   **/
+//  private ThreadLocal<Map<String, Map<String, Long>>> fileNodeStartTime = new ThreadLocal<>();
+//  /**
+//   * Map String1 means timeseries String2 means path of new Files, long means endTime
+//   **/
+//  private ThreadLocal<Map<String, Map<String, Long>>> fileNodeEndTime = new ThreadLocal<>();
+//
+//  /**
+//   * Total num of files that needs to be loaded
+//   */
+//  private ThreadLocal<Integer> fileNum = new ThreadLocal<>();
+//
+//  /**
+//   * IoTDB config
+//   **/
+//  private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+//
+//  /**
+//   * IoTDB data directory
+//   **/
+//  private String baseDir = config.getBaseDir();
+//
+//  /**
+//   * IoTDB  multiple bufferWrite directory
+//   **/
+//  private String[] bufferWritePaths = config.getDataDirs();
+//
+//  /**
+//   * The path to store metadata file of sender
+//   */
+//  private ThreadLocal<String> schemaFromSenderPath = new ThreadLocal<>();
+//
+//  /**
+//   * Sync folder path of server
+//   **/
+//  private String syncFolderPath;
+//
+//  /**
+//   * Sync data path of server
+//   */
+//  private String syncDataPath;
+//
+//  /**
+//   * Init threadLocal variable and delete old useless files.
+//   */
+//  @Override
+//  public boolean init(String storageGroup) {
+//    logger.info("Sync process starts to receive data of storage group {}", storageGroup);
+//    fileNum.set(0);
+//    fileNodeMap.set(new HashMap<>());
+//    fileNodeStartTime.set(new HashMap<>());
+//    fileNodeEndTime.set(new HashMap<>());
+//    try {
+//      FileUtils.deleteDirectory(new File(syncDataPath));
+//    } catch (IOException e) {
+//      logger.error("cannot delete directory {} ", syncFolderPath);
+//      return false;
+//    }
+//    for (String bufferWritePath : bufferWritePaths) {
+//      bufferWritePath = FilePathUtils.regularizePath(bufferWritePath);
+//      String backupPath = bufferWritePath + SYNC_SERVER + File.separator;
+//      File backupDirectory = new File(backupPath, this.uuid.get());
+//      if (backupDirectory.exists() && backupDirectory.list().length != 0) {
+//        try {
+//          FileUtils.deleteDirectory(backupDirectory);
+//        } catch (IOException e) {
+//          logger.error("cannot delete directory {} ", syncFolderPath);
+//          return false;
+//        }
+//      }
+//    }
+//    return true;
+//  }
+//
+//  /**
+//   * Verify IP address of sender
+//   */
+//  @Override
+//  public boolean checkIdentity(String uuid, String ipAddress) {
+//    Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName());
+//    this.uuid.set(uuid);
+//    initPath();
+//    return SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress);
+//  }
+//
+//  /**
+//   * Init file path and clear data if last sync process failed.
+//   */
+//  private void initPath() {
+//    baseDir = FilePathUtils.regularizePath(baseDir);
+//    syncFolderPath = baseDir + SYNC_SERVER + File.separatorChar + this.uuid.get();
+//    syncDataPath = syncFolderPath + File.separatorChar + Constans.DATA_SNAPSHOT_NAME;
+//    schemaFromSenderPath
+//        .set(syncFolderPath + File.separator + MetadataConstant.METADATA_LOG);
+//  }
+//
+//  /**
+//   * Acquire schema from sender
+//   *
+//   * @param status: FINIFSH_STATUS, SUCCESS_STATUS or PROCESSING_STATUS. status = FINISH_STATUS :
+//   * finish receiving schema file, start to sync schema. status = PROCESSING_STATUS : the schema
+//   * file has not received completely.SUCCESS_STATUS: load metadata.
+//   */
+//  @Override
+//  public String syncSchema(String md5, ByteBuffer schema, SyncDataStatus status) {
+//    String md5OfReceiver = Boolean.toString(Boolean.TRUE);
+//    if (status == SyncDataStatus.SUCCESS_STATUS) {
+//      /** sync metadata, include storage group and timeseries **/
+//      return Boolean.toString(loadMetadata());
+//    } else if (status == SyncDataStatus.PROCESSING_STATUS) {
+//      File file = new File(schemaFromSenderPath.get());
+//      if (!file.getParentFile().exists()) {
+//        try {
+//          file.getParentFile().mkdirs();
+//          file.createNewFile();
+//        } catch (IOException e) {
+//          logger.error("Cannot make schema file {}.", file.getPath(), e);
+//          md5OfReceiver = Boolean.toString(Boolean.FALSE);
+//        }
+//      }
+//      try (FileOutputStream fos = new FileOutputStream(file, true);
+//          FileChannel channel = fos.getChannel()) {
+//        channel.write(schema);
+//      } catch (Exception e) {
+//        logger.error("Cannot insert data to file {}.", file.getPath(), e);
+//        md5OfReceiver = Boolean.toString(Boolean.FALSE);
+//      }
+//    } else {
+//      try (FileInputStream fis = new FileInputStream(schemaFromSenderPath.get())) {
+//        MessageDigest md = MessageDigest.getInstance("MD5");
+//        byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE];
+//        int n;
+//        while ((n = fis.read(buffer)) != -1) {
+//          md.update(buffer, 0, n);
+//        }
+//        md5OfReceiver = (new BigInteger(1, md.digest())).toString(16);
+//        if (!md5.equals(md5OfReceiver)) {
+//          FileUtils.forceDelete(new File(schemaFromSenderPath.get()));
+//        }
+//      } catch (Exception e) {
+//        logger.error("Receiver cannot generate md5 {}", schemaFromSenderPath.get(), e);
+//      }
+//    }
+//    return md5OfReceiver;
+//  }
+//
+//  /**
+//   * Load metadata from sender
+//   */
+//  private boolean loadMetadata() {
+//    if (new File(schemaFromSenderPath.get()).exists()) {
+//      try (BufferedReader br = new BufferedReader(
+//          new java.io.FileReader(schemaFromSenderPath.get()))) {
+//        String metadataOperation;
+//        while ((metadataOperation = br.readLine()) != null) {
+//          operation(metadataOperation);
+//        }
+//      } catch (FileNotFoundException e) {
+//        logger.error("Cannot read the file {}.",
+//            schemaFromSenderPath.get(), e);
+//        return false;
+//      } catch (IOException e) {
+//        /** multiple insert schema, ignore it **/
+//      } catch (Exception e) {
+//        logger.error("Parse metadata operation failed.", e);
+//        return false;
+//      }
+//    }
+//    return true;
+//  }
+//
+//  /**
+//   * Operate metadata operation in MManager
+//   *
+//   * @param cmd metadata operation
+//   */
+//  private void operation(String cmd)
+//      throws PathErrorException, IOException, MetadataErrorException {
+//    String[] args = cmd.trim().split(",");
+//    switch (args[0]) {
+//      case MetadataOperationType.ADD_PATH_TO_MTREE:
+//        Map<String, String> props;
+//        String[] kv;
+//        props = new HashMap<>(args.length - 5 + 1, 1);
+//        for (int k = 5; k < args.length; k++) {
+//          kv = args[k].split("=");
+//          props.put(kv[0], kv[1]);
+//        }
+//        metadataManger.addPathToMTree(new Path(args[1]), TSDataType.deserialize(Short.valueOf(args[2])),
+//            TSEncoding.deserialize(Short.valueOf(args[3])),
+//            CompressionType.deserialize(Short.valueOf(args[4])),
+//            props);
+//        break;
+//      case MetadataOperationType.DELETE_PATH_FROM_MTREE:
+//        metadataManger.deletePaths(Collections.singletonList(new Path(args[1])));
+//        break;
+//      case MetadataOperationType.SET_STORAGE_LEVEL_TO_MTREE:
+//        metadataManger.setStorageLevelToMTree(args[1]);
+//        break;
+//      case MetadataOperationType.ADD_A_PTREE:
+//        metadataManger.addAPTree(args[1]);
+//        break;
+//      case MetadataOperationType.ADD_A_PATH_TO_PTREE:
+//        metadataManger.addPathToPTree(args[1]);
+//        break;
+//      case MetadataOperationType.DELETE_PATH_FROM_PTREE:
+//        metadataManger.deletePathFromPTree(args[1]);
+//        break;
+//      case MetadataOperationType.LINK_MNODE_TO_PTREE:
+//        metadataManger.linkMNodeToPTree(args[1], args[2]);
+//        break;
+//      case MetadataOperationType.UNLINK_MNODE_FROM_PTREE:
+//        metadataManger.unlinkMNodeFromPTree(args[1], args[2]);
+//        break;
+//      default:
+//        logger.error("Unrecognizable command {}", cmd);
+//    }
+//  }
+//
+//  /**
+//   * Start receiving tsfile from sender
+//   *
+//   * @param status status = SUCCESS_STATUS : finish receiving one tsfile status = PROCESSING_STATUS
+//   * : tsfile has not received completely.
+//   */
+//  @Override
+//  public String syncData(String md5OfSender, List<String> filePathSplit,
+//      ByteBuffer dataToReceive, SyncDataStatus status) {
+//    String md5OfReceiver = Boolean.toString(Boolean.TRUE);
+//    FileChannel channel;
+//    /** Recombination File Path **/
+//    String filePath = StringUtils.join(filePathSplit, File.separatorChar);
+//    syncDataPath = FilePathUtils.regularizePath(syncDataPath);
+//    filePath = syncDataPath + filePath;
+//    if (status == SyncDataStatus.PROCESSING_STATUS) { // there are still data stream to add
+//      File file = new File(filePath);
+//      if (!file.getParentFile().exists()) {
+//        try {
+//          file.getParentFile().mkdirs();
+//          file.createNewFile();
+//        } catch (IOException e) {
+//          logger.error("cannot make file {}", file.getPath(), e);
+//          md5OfReceiver = Boolean.toString(Boolean.FALSE);
+//        }
+//      }
+//      try (FileOutputStream fos = new FileOutputStream(file, true)) {// append new data
+//        channel = fos.getChannel();
+//        channel.write(dataToReceive);
+//      } catch (IOException e) {
+//        logger.error("cannot insert data to file {}", file.getPath(), e);
+//        md5OfReceiver = Boolean.toString(Boolean.FALSE);
+//
+//      }
+//    } else { // all data in the same file has received successfully
+//      try (FileInputStream fis = new FileInputStream(filePath)) {
+//        MessageDigest md = MessageDigest.getInstance("MD5");
+//        byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE];
+//        int n;
+//        while ((n = fis.read(buffer)) != -1) {
+//          md.update(buffer, 0, n);
+//        }
+//        md5OfReceiver = (new BigInteger(1, md.digest())).toString(16);
+//        if (md5OfSender.equals(md5OfReceiver)) {
+//          fileNum.set(fileNum.get() + 1);
+//
+//          logger.info(String.format("Receiver has received %d files from sender", fileNum.get()));
+//        } else {
+//          FileUtils.forceDelete(new File(filePath));
+//        }
+//      } catch (Exception e) {
+//        logger.error("Receiver cannot generate md5 {}", filePath, e);
+//      }
+//    }
+//    return md5OfReceiver;
+//  }
+//
+//
+//  @Override
+//  public boolean load() {
+//    try {
+//      getFileNodeInfo();
+//      loadData();
+//    } catch (Exception e) {
+//      logger.error("fail to load data", e);
+//      return false;
+//    }
+//    return true;
+//  }
+//
+//  /**
+//   * Get all tsfiles' info which are sent from sender, it is preparing for merging these data
+//   */
+//  public void getFileNodeInfo() throws IOException {
+//    File dataFileRoot = new File(syncDataPath);
+//    File[] files = dataFileRoot.listFiles();
+//    int processedNum = 0;
+//    for (File storageGroupPB : files) {
+//      List<String> filesPath = new ArrayList<>();
+//      File[] filesSG = storageGroupPB.listFiles();
+//      for (File fileTF : filesSG) { // fileTF means TsFiles
+//        Map<String, Long> startTimeMap = new HashMap<>();
+//        Map<String, Long> endTimeMap = new HashMap<>();
+//        TsFileSequenceReader reader = null;
+//        try {
+//          reader = new TsFileSequenceReader(fileTF.getPath());
+//          Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
+//          Iterator<String> it = deviceIdMap.keySet().iterator();
+//          while (it.hasNext()) {
+//            String key = it.next();
+//            TsDeviceMetadataIndex device = deviceIdMap.get(key);
+//            startTimeMap.put(key, device.getStartTime());
+//            endTimeMap.put(key, device.getEndTime());
+//          }
+//        } catch (IOException e) {
+//          logger.error("Unable to read tsfile {}", fileTF.getPath());
+//          throw new IOException(e);
+//        } finally {
+//          try {
+//            if (reader != null) {
+//              reader.close();
+//            }
+//          } catch (IOException e) {
+//            logger.error("Cannot close tsfile stream {}", fileTF.getPath());
+//            throw new IOException(e);
+//          }
+//        }
+//        fileNodeStartTime.get().put(fileTF.getPath(), startTimeMap);
+//        fileNodeEndTime.get().put(fileTF.getPath(), endTimeMap);
+//        filesPath.add(fileTF.getPath());
+//        processedNum++;
+//        logger.info(String
+//            .format("Get tsfile info has complete : %d/%d", processedNum, fileNum.get()));
+//        fileNodeMap.get().put(storageGroupPB.getName(), filesPath);
+//      }
+//    }
+//  }
+//
+//
+//  /**
+//   * It is to merge data. If data in the tsfile is new, append the tsfile to the storage group
+//   * directly. If data in the tsfile is old, it has two strategy to merge.It depends on the
+//   * possibility of updating historical data.
+//   */
+//  public void loadData() throws StorageEngineException {
+//    syncDataPath = FilePathUtils.regularizePath(syncDataPath);
+//    int processedNum = 0;
+//    for (String storageGroup : fileNodeMap.get().keySet()) {
+//      List<String> filesPath = fileNodeMap.get().get(storageGroup);
+//      /**  before load external tsFile, it is necessary to order files in the same storage group **/
+//      Collections.sort(filesPath, (o1, o2) -> {
+//        Map<String, Long> startTimePath1 = fileNodeStartTime.get().get(o1);
+//        Map<String, Long> endTimePath2 = fileNodeEndTime.get().get(o2);
+//        for (Entry<String, Long> entry : endTimePath2.entrySet()) {
+//          if (startTimePath1.containsKey(entry.getKey())) {
+//            if (startTimePath1.get(entry.getKey()) > entry.getValue()) {
+//              return 1;
+//            } else {
+//              return -1;
+//            }
+//          }
+//        }
+//        return 0;
+//      });
+//
+//      for (String path : filesPath) {
+//        // get startTimeMap and endTimeMap
+//        Map<String, Long> startTimeMap = fileNodeStartTime.get().get(path);
+//        Map<String, Long> endTimeMap = fileNodeEndTime.get().get(path);
+//
+//        // create a new fileNode
+//        String header = syncDataPath;
+//        String relativePath = path.substring(header.length());
+//        TsFileResource fileNode = new TsFileResource(
+//            new File(DirectoryManager.getInstance().getNextFolderIndexForSequenceFile() +
+//                File.separator + relativePath), startTimeMap, endTimeMap
+//        );
+//        // call interface of load external file
+//        try {
+//          if (!STORAGE_GROUP_MANAGER.appendFileToStorageGroupProcessor(storageGroup, fileNode, path)) {
+//            // it is a file with unsequence data
+//            if (config.isUpdateHistoricalDataPossibility()) {
+//              loadOldData(path);
+//            } else {
+//              List<String> overlapFiles = STORAGE_GROUP_MANAGER.getOverlapFiles(
+//                  storageGroup,
+//                  fileNode, uuid.get());
+//              if (overlapFiles.isEmpty()) {
+//                loadOldData(path);
+//              } else {
+//                loadOldData(path, overlapFiles);
+//              }
+//            }
+//          }
+//        } catch (StorageEngineException | IOException | ProcessorException e) {
+//          logger.error("Can not load external file {}", path);
+//          throw new StorageEngineException(e);
+//        }
+//        processedNum++;
+//        logger.info(String
+//            .format("Merging files has completed : %d/%d", processedNum, fileNum.get()));
+//      }
+//    }
+//  }
+//
+//  /**
+//   * Insert all data in the tsfile into IoTDB.
+//   */
+//  public void loadOldData(String filePath) throws IOException, ProcessorException {
+//    Set<String> timeseriesSet = new HashSet<>();
+//    TsFileSequenceReader reader = null;
+//    QueryProcessExecutor insertExecutor = new QueryProcessExecutor();
+//    try {
+//      /** use tsfile reader to get data **/
+//      reader = new TsFileSequenceReader(filePath);
+//      Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
+//      Iterator<Entry<String, TsDeviceMetadataIndex>> entryIterator = deviceIdMap.entrySet()
+//          .iterator();
+//      while (entryIterator.hasNext()) {
+//        Entry<String, TsDeviceMetadataIndex> deviceMIEntry = entryIterator.next();
+//        String deviceId = deviceMIEntry.getKey();
+//        TsDeviceMetadataIndex deviceMI = deviceMIEntry.getValue();
+//        TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(deviceMI);
+//        List<ChunkGroupMetaData> rowGroupMetadataList = deviceMetadata.getChunkGroupMetaDataList();
+//        timeseriesSet.clear();
+//        /** firstly, get all timeseries in the same device **/
+//        for (ChunkGroupMetaData chunkGroupMetaData : rowGroupMetadataList) {
+//          List<ChunkMetaData> chunkMetaDataList = chunkGroupMetaData
+//              .getChunkMetaDataList();
+//          for (ChunkMetaData chunkMetaData : chunkMetaDataList) {
+//            String measurementUID = chunkMetaData.getMeasurementUid();
+//            measurementUID = deviceId + "." + measurementUID;
+//            timeseriesSet.add(measurementUID);
+//          }
+//        }
+//        /** Secondly, use tsFile Reader to form InsertPlan **/
+//        ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader);
+//        List<Path> paths = new ArrayList<>();
+//        paths.clear();
+//        for (String timeseries : timeseriesSet) {
+//          paths.add(new Path(timeseries));
+//        }
+//        QueryExpression queryExpression = QueryExpression.create(paths, null);
+//        QueryDataSet queryDataSet = readTsFile.query(queryExpression);
+//        while (queryDataSet.hasNext()) {
+//          RowRecord record = queryDataSet.next();
+//          List<Field> fields = record.getFields();
+//          List<String> measurementList = new ArrayList<>();
+//          List<String> insertValues = new ArrayList<>();
+//          for (int i = 0; i < fields.size(); i++) {
+//            Field field = fields.get(i);
+//            if (!field.isNull()) {
+//              measurementList.add(paths.get(i).getMeasurement());
+//              if (fields.get(i).getDataType() == TSDataType.TEXT) {
+//                insertValues.add(String.format("'%s'", field.toString()));
+//              } else {
+//                insertValues.add(String.format("%s", field.toString()));
+//              }
+//            }
+//          }
+//          if (insertExecutor.insert(new InsertPlan(deviceId, record.getTimestamp(),
+//              measurementList.toArray(new String[0]), insertValues.toArray(new String[0])))) {
+//            throw new IOException("Inserting series data to IoTDB engine has failed.");
+//          }
+//        }
+//      }
+//    } catch (IOException e) {
+//      logger.error("Can not parse tsfile into SQL", e);
+//      throw new IOException(e);
+//    } catch (ProcessorException e) {
+//      logger.error("Meet error while processing non-query.");
+//      throw new ProcessorException(e);
+//    } finally {
+//      try {
+//        if (reader != null) {
+//          reader.close();
+//        }
+//      } catch (IOException e) {
+//        logger.error("Cannot close file stream {}", filePath, e);
+//      }
+//    }
+//  }
+//
+//  /**
+//   * Insert those valid data in the tsfile into IoTDB
+//   *
+//   * @param overlapFiles:files which are conflict with the sync file
+//   */
+//  public void loadOldData(String filePath, List<String> overlapFiles)
+//      throws IOException, ProcessorException {
+//    Set<String> timeseriesList = new HashSet<>();
+//    QueryProcessExecutor insertExecutor = new QueryProcessExecutor();
+//    Map<String, ReadOnlyTsFile> tsfilesReaders = openReaders(filePath, overlapFiles);
+//    try {
+//      TsFileSequenceReader reader = new TsFileSequenceReader(filePath);
+//      Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
+//      Iterator<String> it = deviceIdMap.keySet().iterator();
+//      while (it.hasNext()) {
+//        String deviceID = it.next();
+//        TsDeviceMetadataIndex deviceMI = deviceIdMap.get(deviceID);
+//        TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(deviceMI);
+//        List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata
+//            .getChunkGroupMetaDataList();
+//        timeseriesList.clear();
+//        /** firstly, get all timeseries in the same device **/
+//        for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
+//          List<ChunkMetaData> chunkMetaDataList = chunkGroupMetaData.getChunkMetaDataList();
+//          for (ChunkMetaData timeSeriesChunkMetaData : chunkMetaDataList) {
+//            String measurementUID = timeSeriesChunkMetaData.getMeasurementUid();
+//            measurementUID = deviceID + "." + measurementUID;
+//            timeseriesList.add(measurementUID);
+//          }
+//        }
+//        reader.close();
+//
+//        /** secondly, use tsFile Reader to form SQL **/
+//        ReadOnlyTsFile readOnlyTsFile = tsfilesReaders.get(filePath);
+//        List<Path> paths = new ArrayList<>();
+//        /** compare data with one timeseries in a round to get valid data **/
+//        for (String timeseries : timeseriesList) {
+//          paths.clear();
+//          paths.add(new Path(timeseries));
+//          Set<InsertPlan> originDataPoints = new HashSet<>();
+//          QueryExpression queryExpression = QueryExpression.create(paths, null);
+//          QueryDataSet queryDataSet = readOnlyTsFile.query(queryExpression);
+//          Set<InsertPlan> newDataPoints = convertToInserPlans(queryDataSet, paths, deviceID);
+//
+//          /** get all data with the timeseries in all overlap files. **/
+//          for (String overlapFile : overlapFiles) {
+//            ReadOnlyTsFile readTsFileOverlap = tsfilesReaders.get(overlapFile);
+//            QueryDataSet queryDataSetOverlap = readTsFileOverlap.query(queryExpression);
+//            originDataPoints.addAll(convertToInserPlans(queryDataSetOverlap, paths, deviceID));
+//          }
+//
+//          /** If there has no overlap data with the timeseries, inserting all data in the sync file **/
+//          if (originDataPoints.isEmpty()) {
+//            for (InsertPlan insertPlan : newDataPoints) {
+//              if (insertExecutor.insert(insertPlan)) {
+//                throw new IOException("Inserting series data to IoTDB engine has failed.");
+//              }
+//            }
+//          } else {
+//            /** Compare every data to get valid data **/
+//            for (InsertPlan insertPlan : newDataPoints) {
+//              if (!originDataPoints.contains(insertPlan)) {
+//                if (insertExecutor.insert(insertPlan)) {
+//                  throw new IOException("Inserting series data to IoTDB engine has failed.");
+//                }
+//              }
+//            }
+//          }
+//        }
+//      }
+//    } catch (IOException e) {
+//      logger.error("Can not parse tsfile into SQL", e);
+//      throw new IOException(e);
+//    } catch (ProcessorException e) {
+//      logger.error("Meet error while processing non-query.", e);
+//      throw new ProcessorException(e);
+//    } finally {
+//      try {
+//        closeReaders(tsfilesReaders);
+//      } catch (IOException e) {
+//        logger.error("Cannot close file stream {}", filePath, e);
+//      }
+//    }
+//  }
+//
+//  private Set<InsertPlan> convertToInserPlans(QueryDataSet queryDataSet, List<Path> paths, String deviceID) throws IOException {
+//    Set<InsertPlan> plans = new HashSet<>();
+//    while (queryDataSet.hasNext()) {
+//      RowRecord record = queryDataSet.next();
+//      List<Field> fields = record.getFields();
+//      /** get all data with the timeseries in the sync file **/
+//      for (int i = 0; i < fields.size(); i++) {
+//        Field field = fields.get(i);
+//        String[] measurementList = new String[1];
+//        if (!field.isNull()) {
+//          measurementList[0] = paths.get(i).getMeasurement();
+//          InsertPlan insertPlan = new InsertPlan(deviceID, record.getTimestamp(),
+//              measurementList, new String[]{field.getDataType() == TSDataType.TEXT ? String.format("'%s'", field.toString())
+//              : field.toString()});
+//          plans.add(insertPlan);
+//        }
+//      }
+//    }
+//    return plans;
+//  }
+//
+//  /**
+//   * Open all tsfile reader and cache
+//   */
+//  private Map<String, ReadOnlyTsFile> openReaders(String filePath, List<String> overlapFiles)
+//      throws IOException {
+//    Map<String, ReadOnlyTsFile> tsfileReaders = new HashMap<>();
+//    tsfileReaders.put(filePath, new ReadOnlyTsFile(new TsFileSequenceReader(filePath)));
+//    for (String overlapFile : overlapFiles) {
+//      tsfileReaders.put(overlapFile, new ReadOnlyTsFile(new TsFileSequenceReader(overlapFile)));
+//    }
+//    return tsfileReaders;
+//  }
+//
+//  /**
+//   * Close all tsfile reader
+//   */
+//  private void closeReaders(Map<String, ReadOnlyTsFile> readers) throws IOException {
+//    for (ReadOnlyTsFile tsfileReader : readers.values()) {
+//      tsfileReader.close();
+//    }
+//  }
+//
+//  /**
+//   * Release threadLocal variable resources
+//   */
+//  @Override
+//  public void cleanUp() {
+//    uuid.remove();
+//    fileNum.remove();
+//    fileNodeMap.remove();
+//    fileNodeStartTime.remove();
+//    fileNodeEndTime.remove();
+//    schemaFromSenderPath.remove();
+//    try {
+//      FileUtils.deleteDirectory(new File(syncFolderPath));
+//    } catch (IOException e) {
+//      logger.error("can not delete directory {}", syncFolderPath, e);
+//    }
+//    logger.info("Synchronization has finished!");
+//  }
+//
+//  public Map<String, List<String>> getFileNodeMap() {
+//    return fileNodeMap.get();
+//  }
+//
+//  public void setFileNodeMap(Map<String, List<String>> fileNodeMap) {
+//    this.fileNodeMap.set(fileNodeMap);
+//  }
+//
+//}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/SyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/SyncFileManager.java
deleted file mode 100644
index 18c3b60..0000000
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/SyncFileManager.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.sender;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.sync.sender.conf.Constans;
-import org.apache.iotdb.db.sync.sender.conf.SyncSenderConfig;
-import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * SyncFileManager is used to pick up those tsfiles need to sync.
- */
-public class SyncFileManager {
-
-  private static final Logger logger = LoggerFactory.getLogger(SyncFileManager.class);
-
-  /**
-   * Files that need to be synchronized
-   **/
-  private Map<String, Set<String>> validAllFiles = new HashMap<>();
-
-  /**
-   * All tsfiles in last synchronization process
-   **/
-  private Set<String> lastLocalFiles = new HashSet<>();
-
-  /**
-   * All tsfiles in data directory
-   **/
-  private Map<String, Set<String>> currentLocalFiles = new HashMap<>();
-
-  private SyncSenderConfig syncConfig = SyncSenderDescriptor.getInstance().getConfig();
-
-  private IoTDBConfig systemConfig = IoTDBDescriptor.getInstance().getConfig();
-
-  private static final String RESTORE_SUFFIX = ".restore";
-
-  private SyncFileManager() {
-  }
-
-  public static final SyncFileManager getInstance() {
-    return FileManagerHolder.INSTANCE;
-  }
-
-  /**
-   * Initialize SyncFileManager.
-   */
-  public void init() throws IOException {
-    validAllFiles.clear();
-    lastLocalFiles.clear();
-    currentLocalFiles.clear();
-    getLastLocalFileList(syncConfig.getLastFileInfo());
-    getCurrentLocalFileList(systemConfig.getDataDirs());
-    getValidFileList();
-  }
-
-  /**
-   * get files that needs to be synchronized
-   */
-  public void getValidFileList() {
-    for (Entry<String, Set<String>> entry : currentLocalFiles.entrySet()) {
-      for (String path : entry.getValue()) {
-        if (!lastLocalFiles.contains(path)) {
-          validAllFiles.get(entry.getKey()).add(path);
-        }
-      }
-    }
-    logger.info("Acquire list of valid files.");
-    for (Entry<String, Set<String>> entry : validAllFiles.entrySet()) {
-      for (String path : entry.getValue()) {
-        currentLocalFiles.get(entry.getKey()).remove(path);
-      }
-    }
-  }
-
-  /**
-   * get last local file list.
-   *
-   * @param path path
-   */
-  public void getLastLocalFileList(String path) throws IOException {
-    Set<String> fileList = new HashSet<>();
-    File file = new File(path);
-    if (!file.exists()) {
-      try {
-        file.createNewFile();
-      } catch (IOException e) {
-        throw new IOException("Cannot get last local file list", e);
-      }
-    } else {
-      try (BufferedReader bf = new BufferedReader(new FileReader(file))) {
-        String fileName;
-        while ((fileName = bf.readLine()) != null) {
-          fileList.add(fileName);
-        }
-      } catch (IOException e) {
-        logger.error("Cannot get last local file list when reading file {}.",
-            syncConfig.getLastFileInfo());
-        throw new IOException(e);
-      }
-    }
-    lastLocalFiles = fileList;
-  }
-
-  /**
-   * get current local file list.
-   *
-   * @param paths paths in String[] structure
-   */
-  public void getCurrentLocalFileList(String[] paths) {
-    for (String path : paths) {
-      if (!new File(path).exists()) {
-        continue;
-      }
-      File[] listFiles = new File(path).listFiles();
-      for (File storageGroup : listFiles) {
-        if (!storageGroup.isDirectory() || storageGroup.getName().equals(Constans.SYNC_CLIENT)) {
-          continue;
-        }
-        getStorageGroupFiles(storageGroup);
-      }
-    }
-  }
-
-  private void getStorageGroupFiles(File storageGroup) {
-    if (!currentLocalFiles.containsKey(storageGroup.getName())) {
-      currentLocalFiles.put(storageGroup.getName(), new HashSet<>());
-    }
-    if (!validAllFiles.containsKey(storageGroup.getName())) {
-      validAllFiles.put(storageGroup.getName(), new HashSet<>());
-    }
-    File[] files = storageGroup.listFiles();
-    for (File file : files) {
-      if (!file.getPath().endsWith(RESTORE_SUFFIX) && !new File(
-          file.getPath() + RESTORE_SUFFIX).exists()) {
-        currentLocalFiles.get(storageGroup.getName()).add(file.getPath());
-      }
-    }
-  }
-
-  /**
-   * backup current local file information.
-   *
-   * @param backupFile backup file path
-   */
-  public void backupNowLocalFileInfo(String backupFile) {
-    try (BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(backupFile))) {
-      for (Entry<String, Set<String>> entry : currentLocalFiles.entrySet()) {
-        for (String file : entry.getValue()) {
-          bufferedWriter.write(file + "\n");
-        }
-      }
-    } catch (IOException e) {
-      logger.error("Cannot back up current local file info", e);
-    }
-  }
-
-  public Map<String, Set<String>> getValidAllFiles() {
-    return validAllFiles;
-  }
-
-  public Set<String> getLastLocalFiles() {
-    return lastLocalFiles;
-  }
-
-  public Map<String, Set<String>> getCurrentLocalFiles() {
-    return currentLocalFiles;
-  }
-
-  public void setCurrentLocalFiles(Map<String, Set<String>> newNowLocalFiles) {
-    currentLocalFiles = newNowLocalFiles;
-  }
-
-  private static class FileManagerHolder {
-
-    private static final SyncFileManager INSTANCE = new SyncFileManager();
-  }
-}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
index c8e2ce2..05b67c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
@@ -24,15 +24,18 @@ public class Constans {
   }
 
   public static final String CONFIG_NAME = "iotdb-sync-client.properties";
-  public static final String SYNC_CLIENT = "sync-client";
-  public static final String SYNC_SERVER = "sync-server";
+  public static final String SYNC_SENDER = "sync-sender";
+  public static final String SYNC_RECEIVER = "sync-receiver";
 
-  public static final String LOCK_FILE_NAME = "sync-lock";
-  public static final String UUID_FILE_NAME = "uuid.txt";
+  public static final String LOCK_FILE_NAME = "sync_lock";
+  public static final String SCHEMA_POS_FILE_NAME = "sync_schema_pos";
   public static final String LAST_LOCAL_FILE_NAME = "last_local_files.txt";
-  public static final String DATA_SNAPSHOT_NAME = "data-snapshot";
+  public static final String DATA_SNAPSHOT_NAME = "snapshot";
+  public static final String SYNC_LOG_NAME = "sync.log";
+  public static final String CURRENT_SYNC_LOG_NAME = "current_sync.log";
 
-  public static final String BACK_UP_DIRECTORY_NAME = "backup";
+  public static final String MESSAGE_DIGIT_NAME = "MD5";
+  public static final String SYNC_DIR_NAME_SEPARATOR = "_";
 
   /**
    * Split data file , block size at each transmission
@@ -42,7 +45,7 @@ public class Constans {
   /**
    * Max try when syncing the same file to receiver fails.
    */
-  public static final int MAX_SYNC_FILE_TRY = 10;
+  public static final int MAX_SYNC_FILE_TRY = 5;
 
   private static final SyncSenderConfig CONFIG = SyncSenderDescriptor.getInstance().getConfig();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
index 572e5df..a076257 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
@@ -19,104 +19,35 @@
 package org.apache.iotdb.db.sync.sender.conf;
 
 import java.io.File;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.metadata.MetadataConstant;
-import org.apache.iotdb.db.utils.FilePathUtils;
 
 public class SyncSenderConfig {
 
-  private String[] seqFileDirectory = IoTDBDescriptor.getInstance().getConfig()
-      .getDataDirs();
-
-  private String dataDirectory = IoTDBDescriptor.getInstance().getConfig().getBaseDir();
-
-  private String lockFilePath;
-
-  private String uuidPath;
-
-  private String lastFileInfo;
-
-  private String[] snapshotPaths;
-
-  private String schemaPath;
-
   private String serverIp = "127.0.0.1";
 
   private int serverPort = 5555;
 
   private int syncPeriodInSecond = 10;
 
-  /**
-   * Init path
-   */
-  public void init() {
-    schemaPath = IoTDBDescriptor.getInstance().getConfig().getSystemDir() + File.separator + MetadataConstant.METADATA_LOG;
-    if (dataDirectory.length() > 0
-        && dataDirectory.charAt(dataDirectory.length() - 1) != File.separatorChar) {
-      dataDirectory += File.separatorChar;
-    }
-    lockFilePath =
-        dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + Constans.LOCK_FILE_NAME;
-    uuidPath = dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + Constans.UUID_FILE_NAME;
-    lastFileInfo =
-        dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + Constans.LAST_LOCAL_FILE_NAME;
-    snapshotPaths = new String[seqFileDirectory.length];
-    for (int i = 0; i < seqFileDirectory.length; i++) {
-      seqFileDirectory[i] = new File(seqFileDirectory[i]).getAbsolutePath();
-      seqFileDirectory[i] = FilePathUtils.regularizePath(seqFileDirectory[i]);
-      snapshotPaths[i] = seqFileDirectory[i] + Constans.SYNC_CLIENT + File.separatorChar
-          + Constans.DATA_SNAPSHOT_NAME
-          + File.separatorChar;
-    }
-
-  }
-
-  public String[] getSeqFileDirectory() {
-    return seqFileDirectory;
-  }
-
-  public void setSeqFileDirectory(String[] seqFileDirectory) {
-    this.seqFileDirectory = seqFileDirectory;
-  }
-
-  public String getDataDirectory() {
-    return dataDirectory;
-  }
-
-  public void setDataDirectory(String dataDirectory) {
-    this.dataDirectory = dataDirectory;
-  }
-
-  public String getUuidPath() {
-    return uuidPath;
-  }
-
-  public void setUuidPath(String uuidPath) {
-    this.uuidPath = uuidPath;
-  }
-
-  public String getLastFileInfo() {
-    return lastFileInfo;
-  }
+  private String senderPath;
 
-  public void setLastFileInfo(String lastFileInfo) {
-    this.lastFileInfo = lastFileInfo;
-  }
+  private String lockFilePath;
 
-  public String[] getSnapshotPaths() {
-    return snapshotPaths;
-  }
+  private String lastFileInfo;
 
-  public void setSnapshotPaths(String[] snapshotPaths) {
-    this.snapshotPaths = snapshotPaths;
-  }
+  private String snapshotPath;
 
-  public String getSchemaPath() {
-    return schemaPath;
-  }
-
-  public void setSchemaPath(String schemaPath) {
-    this.schemaPath = schemaPath;
+  /**
+   * Update paths based on data directory
+   */
+  public void update(String dataDirectory) {
+    senderPath = dataDirectory + File.separatorChar + Constans.SYNC_SENDER + File.separatorChar +
+        getSyncReceiverName();
+    lockFilePath = senderPath + File.separatorChar + Constans.LOCK_FILE_NAME;
+    lastFileInfo = senderPath + File.separatorChar + Constans.LAST_LOCAL_FILE_NAME;
+    snapshotPath = senderPath + File.separatorChar + Constans.DATA_SNAPSHOT_NAME;
+    if(!new File(snapshotPath).exists()){
+      new File(snapshotPath).mkdirs();
+    }
   }
 
   public String getServerIp() {
@@ -143,6 +74,14 @@ public class SyncSenderConfig {
     this.syncPeriodInSecond = syncPeriodInSecond;
   }
 
+  public String getSenderPath() {
+    return senderPath;
+  }
+
+  public void setSenderPath(String senderPath) {
+    this.senderPath = senderPath;
+  }
+
   public String getLockFilePath() {
     return lockFilePath;
   }
@@ -150,4 +89,24 @@ public class SyncSenderConfig {
   public void setLockFilePath(String lockFilePath) {
     this.lockFilePath = lockFilePath;
   }
+
+  public String getLastFileInfo() {
+    return lastFileInfo;
+  }
+
+  public void setLastFileInfo(String lastFileInfo) {
+    this.lastFileInfo = lastFileInfo;
+  }
+
+  public String getSnapshotPath() {
+    return snapshotPath;
+  }
+
+  public void setSnapshotPath(String snapshotPath) {
+    this.snapshotPath = snapshotPath;
+  }
+
+  public String getSyncReceiverName() {
+    return serverIp + Constans.SYNC_DIR_NAME_SEPARATOR + serverPort;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderDescriptor.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderDescriptor.java
index 2427c10..df1b834 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderDescriptor.java
@@ -25,7 +25,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.Properties;
 import org.apache.iotdb.db.conf.IoTDBConstant;
-import org.apache.iotdb.db.utils.FilePathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,8 +37,8 @@ public class SyncSenderDescriptor {
     loadProps();
   }
 
-  public static final SyncSenderDescriptor getInstance() {
-    return PostBackDescriptorHolder.INSTANCE;
+  public static SyncSenderDescriptor getInstance() {
+    return SyncSenderDescriptorHolder.INSTANCE;
   }
 
   public SyncSenderConfig getConfig() {
@@ -54,7 +53,6 @@ public class SyncSenderDescriptor {
    * load an properties file and set sync config variables
    */
   private void loadProps() {
-    conf.init();
     InputStream inputStream;
     String url = System.getProperty(IoTDBConstant.IOTDB_CONF, null);
     if (url == null) {
@@ -90,42 +88,20 @@ public class SyncSenderDescriptor {
       conf.setSyncPeriodInSecond(Integer.parseInt(properties
           .getProperty("sync_period_in_second",
               Integer.toString(conf.getSyncPeriodInSecond()))));
-      conf.setSchemaPath(properties.getProperty("iotdb_schema_directory", conf.getSchemaPath()));
-      conf.setDataDirectory(
-          properties.getProperty("iotdb_bufferWrite_directory", conf.getDataDirectory()));
-      String dataDirectory = conf.getDataDirectory();
-      if (dataDirectory.length() > 0
-          && dataDirectory.charAt(dataDirectory.length() - 1) != File.separatorChar) {
-        dataDirectory += File.separatorChar;
-      }
-      conf.setUuidPath(
-          dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + Constans.UUID_FILE_NAME);
-      conf.setLastFileInfo(
-          dataDirectory + Constans.SYNC_CLIENT + File.separatorChar
-              + Constans.LAST_LOCAL_FILE_NAME);
-      String[] sequenceFileDirectory = conf.getSeqFileDirectory();
-      String[] snapshots = new String[conf.getSeqFileDirectory().length];
-      for (int i = 0; i < conf.getSeqFileDirectory().length; i++) {
-        sequenceFileDirectory[i] = FilePathUtils.regularizePath(sequenceFileDirectory[i]);
-        snapshots[i] = sequenceFileDirectory[i] + Constans.SYNC_CLIENT + File.separatorChar
-            + Constans.DATA_SNAPSHOT_NAME + File.separatorChar;
-      }
-      conf.setSeqFileDirectory(sequenceFileDirectory);
-      conf.setSnapshotPaths(snapshots);
     } catch (IOException e) {
-      logger.warn("Cannot load config file because {}, use default configuration", e);
+      logger.warn("Cannot load config file, use default configuration.", e);
     } catch (Exception e) {
-      logger.warn("Error format in config file because {}, use default configuration", e);
+      logger.warn("Error format in config file, use default configuration.", e);
     } finally {
       try {
         inputStream.close();
       } catch (IOException e) {
-        logger.error("Fail to close sync config file input stream because ", e);
+        logger.error("Fail to close sync config file input stream.", e);
       }
     }
   }
 
-  private static class PostBackDescriptorHolder {
+  private static class SyncSenderDescriptorHolder {
 
     private static final SyncSenderDescriptor INSTANCE = new SyncSenderDescriptor();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/IFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
similarity index 69%
rename from server/src/main/java/org/apache/iotdb/db/sync/sender/manage/IFileManager.java
rename to server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
index b1ba08f..d4f7e33 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/IFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
@@ -18,8 +18,18 @@
  */
 package org.apache.iotdb.db.sync.sender.manage;
 
-public interface IFileManager {
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
 
+public interface ISyncFileManager {
 
+  void getCurrentLocalFiles(String dataDir);
 
+  void getLastLocalFiles(File lastLocalFile) throws IOException;
+
+  void getValidFiles(String dataDir) throws IOException;
+
+  void updateLastLocalFiles(File lastLocalFile, Set<String> localFiles);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
new file mode 100644
index 0000000..20f1dca
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.sender.manage;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SyncFileManager implements ISyncFileManager {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SyncFileManager.class);
+
+  private Map<String, Set<File>> currentSealedLocalFilesMap;
+
+  private Map<String, Set<File>> lastLocalFilesMap;
+
+  private Map<String, Set<File>> deletedFilesMap;
+
+  private Map<String, Set<File>> toBeSyncedFilesMap;
+
+  private SyncFileManager() {
+
+  }
+
+  public static final SyncFileManager getInstance() {
+    return SyncFileManagerHolder.INSTANCE;
+  }
+
+  @Override
+  public void getCurrentLocalFiles(String dataDir) {
+    LOGGER.info("Start to get current local files in data folder {}", dataDir);
+    // get all files in data dir sequence folder
+    Map<String, Set<File>> currentAllLocalFiles = new HashMap<>();
+    File[] allSGFolders = new File(
+        dataDir + File.separatorChar + IoTDBConstant.SEQUENCE_FLODER_NAME)
+        .listFiles();
+    for (File sgFolder : allSGFolders) {
+      currentAllLocalFiles.putIfAbsent(sgFolder.getName(), new HashSet<>());
+      Arrays.stream(sgFolder.listFiles()).forEach(file -> currentAllLocalFiles.get(sgFolder.getName())
+          .add(new File(sgFolder.getAbsolutePath(), file.getName())));
+    }
+
+    // get sealed tsfiles
+    currentSealedLocalFilesMap = new HashMap<>();
+    for (Entry<String, Set<File>> entry : currentAllLocalFiles.entrySet()) {
+      String sgName = entry.getKey();
+      currentSealedLocalFilesMap.putIfAbsent(sgName, new HashSet<>());
+      for (File file : entry.getValue()) {
+        if (file.getName().endsWith(ModificationFile.FILE_SUFFIX) || file.getName()
+            .endsWith(TsFileResource.RESOURCE_SUFFIX)) {
+          continue;
+        }
+        if (new File(file.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).exists() && !new File(
+            file.getAbsolutePath() + ModificationFile.FILE_SUFFIX).exists()) {
+          currentSealedLocalFilesMap.get(sgName).add(file);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void getLastLocalFiles(File lastLocalFileInfo) throws IOException {
+    LOGGER.info("Start to get last local files from last local file info {}",
+        lastLocalFileInfo.getAbsoluteFile());
+    lastLocalFilesMap = new HashMap<>();
+    try (BufferedReader reader = new BufferedReader(new FileReader(lastLocalFileInfo))) {
+      String fileName;
+      while ((fileName = reader.readLine()) != null) {
+        String sgName = new File(fileName).getParent();
+        lastLocalFilesMap.putIfAbsent(sgName, new HashSet<>());
+        lastLocalFilesMap.get(sgName).add(new File(fileName));
+      }
+    }
+  }
+
+  @Override
+  public void getValidFiles(String dataDir) throws IOException {
+    getCurrentLocalFiles(dataDir);
+    getLastLocalFiles(new File(SyncSenderDescriptor.getInstance().getConfig().getLastFileInfo()));
+    toBeSyncedFilesMap = new HashMap<>();
+    deletedFilesMap = new HashMap<>();
+    for(Entry<String, Set<File>> entry: currentSealedLocalFilesMap.entrySet()){
+      String sgName = entry.getKey();
+      toBeSyncedFilesMap.putIfAbsent(sgName, new HashSet<>());
+      deletedFilesMap.putIfAbsent(sgName, new HashSet<>());
+      for(File newFile:currentSealedLocalFilesMap.get(sgName)){
+        if(!lastLocalFilesMap.get(sgName).contains(newFile)){
+          toBeSyncedFilesMap.get(sgName).add(newFile);
+        }
+      }
+      for(File oldFile:lastLocalFilesMap.get(sgName)){
+        if(!currentSealedLocalFilesMap.get(sgName).contains(oldFile)){
+          deletedFilesMap.get(sgName).add(oldFile);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void updateLastLocalFiles(File lastLocalFile, Set<String> localFiles) {
+
+  }
+
+  public Map<String, Set<File>> getCurrentSealedLocalFilesMap() {
+    return currentSealedLocalFilesMap;
+  }
+
+  public Map<String, Set<File>> getLastLocalFilesMap() {
+    return lastLocalFilesMap;
+  }
+
+  public Map<String, Set<File>> getDeletedFilesMap() {
+    return deletedFilesMap;
+  }
+
+  public Map<String, Set<File>> getToBeSyncedFilesMap() {
+    return toBeSyncedFilesMap;
+  }
+
+  private static class SyncFileManagerHolder {
+
+    private static final SyncFileManager INSTANCE = new SyncFileManager();
+
+    private SyncFileManagerHolder() {
+
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
index c516936..15df693 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
@@ -18,18 +18,27 @@
  */
 package org.apache.iotdb.db.sync.sender.recover;
 
+import java.io.File;
+import java.io.IOException;
+
 public interface ISyncSenderLogger {
 
-  void startSyncDeletedFilesName();
+  void startSync() throws IOException;
+
+  void endSync() throws IOException;
+
+  void startSyncDeletedFilesName() throws IOException;
+
+  void finishSyncDeletedFileName(File file) throws IOException;
 
-  void finishSyncDeletedFileName(String fileName);
+  void endSyncDeletedFilsName() throws IOException;
 
-  void endSyncDeletedFilsName();
+  void startSyncTsFiles() throws IOException;
 
-  void startSyncTsFiles();
+  void finishSyncTsfile(File file) throws IOException;
 
-  void finishSyncTsfile(String fileName);
+  void endSyncTsFiles() throws IOException;
 
-  void endSyncTsFiles();
+  void close() throws IOException;
 
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java
new file mode 100644
index 0000000..8171d0f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java
@@ -0,0 +1,86 @@
+package org.apache.iotdb.db.sync.sender.recover;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+public class SyncSenderLogger implements ISyncSenderLogger {
+
+  public static final String SYNC_START = "sync start";
+  public static final String SYNC_END = "sync end";
+  public static final String SYNC_DELETED_FILE_NAME_START = "sync deleted file names start";
+  public static final String SYNC_DELETED_FILE_NAME_END = "sync deleted file names end";
+  public static final String SYNC_TSFILE_START = "sync tsfile start";
+  public static final String SYNC_TSFILE_END = "sync tsfile end";
+  private BufferedWriter bw;
+
+  public SyncSenderLogger(String filePath) throws IOException {
+    this.bw = new BufferedWriter(new FileWriter(filePath));
+  }
+
+  public SyncSenderLogger(File file) throws IOException {
+    this(file.getAbsolutePath());
+  }
+
+  @Override
+  public void startSync() throws IOException {
+    bw.write(SYNC_START);
+    bw.newLine();
+    bw.flush();
+  }
+
+  @Override
+  public void endSync() throws IOException {
+    bw.write(SYNC_END);
+    bw.newLine();
+    bw.flush();
+  }
+
+  @Override
+  public void startSyncDeletedFilesName() throws IOException {
+    bw.write(SYNC_DELETED_FILE_NAME_START);
+    bw.newLine();
+    bw.flush();
+  }
+
+  @Override
+  public void finishSyncDeletedFileName(File file) throws IOException {
+    bw.write(file.getAbsolutePath());
+    bw.newLine();
+    bw.flush();
+  }
+
+  @Override
+  public void endSyncDeletedFilsName() throws IOException {
+    bw.write(SYNC_DELETED_FILE_NAME_END);
+    bw.newLine();
+    bw.flush();
+  }
+
+  @Override
+  public void startSyncTsFiles() throws IOException {
+    bw.write(SYNC_TSFILE_START);
+    bw.newLine();
+    bw.flush();
+  }
+
+  @Override
+  public void finishSyncTsfile(File file) throws IOException {
+    bw.write(file.getAbsolutePath());
+    bw.newLine();
+    bw.flush();
+  }
+
+  @Override
+  public void endSyncTsFiles() throws IOException {
+    bw.write(SYNC_TSFILE_END);
+    bw.newLine();
+    bw.flush();
+  }
+
+  @Override
+  public void close() throws IOException {
+    bw.close();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
index 59dc27e..c616376 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
@@ -1,28 +1,27 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the License.  You may obtain
+ * a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied.  See the License for the specific language
+ * governing permissions and limitations under the License.
  */
 package org.apache.iotdb.db.sync.sender.transfer;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.FileReader;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.math.BigInteger;
@@ -33,27 +32,30 @@ import java.nio.file.FileSystems;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.security.MessageDigest;
-import java.util.ArrayList;
+import java.security.NoSuchAlgorithmException;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.SyncConnectionException;
-import org.apache.iotdb.db.sync.sender.SyncFileManager;
+import org.apache.iotdb.db.metadata.MetadataConstant;
 import org.apache.iotdb.db.sync.sender.conf.Constans;
 import org.apache.iotdb.db.sync.sender.conf.SyncSenderConfig;
 import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor;
+import org.apache.iotdb.db.sync.sender.manage.SyncFileManager;
+import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogger;
 import org.apache.iotdb.db.utils.SyncUtils;
+import org.apache.iotdb.service.sync.thrift.ResultStatus;
 import org.apache.iotdb.service.sync.thrift.SyncDataStatus;
 import org.apache.iotdb.service.sync.thrift.SyncService;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
@@ -70,29 +72,36 @@ public class DataTransferManager implements IDataTransferManager {
 
   private static final Logger logger = LoggerFactory.getLogger(DataTransferManager.class);
 
-  private TTransport transport;
+  private static SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig();
 
-  private SyncService.Client serviceClient;
+  private static final int BATCH_LINE = 1000;
 
-  private List<String> schema = new ArrayList<>();
+  private int schemaFileLinePos;
 
-  private static SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig();
+  private TTransport transport;
+
+  private SyncService.Client serviceClient;
 
   /**
    * Files that need to be synchronized
    */
-  private Map<String, Set<String>> validAllFiles;
+  private Map<String, Set<File>> toBeSyncedFilesMap;
 
-  /**
-   * All tsfiles in data directory
-   **/
-  private Map<String, Set<String>> currentLocalFiles;
+  private Map<String, Set<File>> deletedFilesMap;
+
+  private Map<String, Set<File>> sucessSyncedFilesMap;
+
+  private Map<String, Set<File>> successDeleyedFilesMap;
+
+  private Map<String, Set<File>> lastLocalFilesMap;
 
   /**
    * If true, sync is in execution.
    **/
   private volatile boolean syncStatus = false;
 
+  private SyncSenderLogger syncLog;
+
   /**
    * Key means storage group, Set means corresponding tsfiles
    **/
@@ -106,14 +115,12 @@ public class DataTransferManager implements IDataTransferManager {
     init();
   }
 
-  public static final DataTransferManager getInstance() {
+  public static DataTransferManager getInstance() {
     return InstanceHolder.INSTANCE;
   }
 
   /**
    * Create a sender and sync files to the receiver.
-   *
-   * @param args not used
    */
   public static void main(String[] args) throws IOException {
     Thread.currentThread().setName(ThreadName.SYNC_CLIENT.getName());
@@ -137,7 +144,7 @@ public class DataTransferManager implements IDataTransferManager {
   private void startMonitor() {
     executorService.scheduleWithFixedDelay(() -> {
       if (syncStatus) {
-        logger.info("Sync process is in execution!");
+        logger.info("Sync process for receiver {} is in execution!", config.getSyncReceiverName());
       }
     }, Constans.SYNC_MONITOR_DELAY, Constans.SYNC_MONITOR_PERIOD, TimeUnit.SECONDS);
   }
@@ -148,8 +155,8 @@ public class DataTransferManager implements IDataTransferManager {
   private void startTimedTask() {
     executorService.scheduleWithFixedDelay(() -> {
       try {
-        sync();
-      } catch (SyncConnectionException | IOException e) {
+        syncAll();
+      } catch (SyncConnectionException | IOException | TException e) {
         logger.error("Sync failed", e);
         stop();
       }
@@ -162,101 +169,203 @@ public class DataTransferManager implements IDataTransferManager {
     executorService = null;
   }
 
-  /**
-   * Execute a sync task.
-   */
-  @Override
-  public void sync() throws SyncConnectionException, IOException {
+  public void syncAll() throws SyncConnectionException, IOException, TException {
 
-    //1. Clear old snapshots if necessary
-    for (String snapshotPath : config.getSnapshotPaths()) {
-      if (new File(snapshotPath).exists() && new File(snapshotPath).list().length != 0) {
-        // It means that the last task of sync does not succeed! Clear the files and start to sync again
-        FileUtils.deleteDirectory(new File(snapshotPath));
-      }
-    }
-
-    // 2. Acquire valid files and check
-    syncFileManager.init();
-    validAllFiles = syncFileManager.getValidAllFiles();
-    currentLocalFiles = syncFileManager.getCurrentLocalFiles();
-    if (SyncUtils.isEmpty(validAllFiles)) {
-      logger.info("There has no file to sync !");
-      return;
-    }
-
-    // 3. Connect to sync server and Confirm Identity
+    // 1. Connect to sync receiver and confirm identity
     establishConnection(config.getServerIp(), config.getServerPort());
-    if (!confirmIdentity(config.getUuidPath())) {
+    if (!confirmIdentity(config.getSenderPath())) {
       logger.error("Sorry, you do not have the permission to connect to sync receiver.");
       System.exit(1);
     }
 
-    // 4. Create snapshot
-    for (Entry<String, Set<String>> entry : validAllFiles.entrySet()) {
-      validFileSnapshot.put(entry.getKey(), makeFileSnapshot(entry.getValue()));
+    // 2. Sync Schema
+    syncSchema();
+
+    // 3. Sync all data
+    String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+    for (String dataDir : dataDirs) {
+      logger.info("Start to sync data in data dir {}", dataDir);
+      config.update(dataDir);
+      SyncFileManager.getInstance().getValidFiles(dataDir);
+      lastLocalFilesMap = SyncFileManager.getInstance().getLastLocalFilesMap();
+      deletedFilesMap = SyncFileManager.getInstance().getDeletedFilesMap();
+      toBeSyncedFilesMap = SyncFileManager.getInstance().getToBeSyncedFilesMap();
+      checkRecovery();
+      if (SyncUtils.isEmpty(deletedFilesMap) && SyncUtils.isEmpty(toBeSyncedFilesMap)) {
+        logger.info("There has no data to sync in data dir {}", dataDir);
+        continue;
+      }
+      sync();
+      logger.info("Finish to sync data in data dir {}", dataDir);
     }
 
-    syncStatus = true;
+    // 4. notify receiver that synchronization finish
+    // At this point the synchronization has finished even if connection fails
+    try {
+      serviceClient.endSync();
+      transport.close();
+      logger.info("Sync process has finished.");
+    } catch (TException e) {
+      logger.error("Unable to connect to receiver.", e);
+    }
+  }
 
+  /**
+   * Execute a sync task.
+   */
+  @Override
+  public void sync() throws IOException {
     try {
-      // 5. Sync schema
-      syncSchema();
+      syncStatus = true;
+      syncLog = new SyncSenderLogger(getSchemaLogFile());
+
+      // 1. Sync data
+      for (Entry<String, Set<File>> entry : deletedFilesMap.entrySet()) {
+        // TODO deal with the situation
+        try {
+          if (serviceClient.init(entry.getKey()) == ResultStatus.FAILURE) {
+            throw new SyncConnectionException("unable init receiver");
+          }
+        } catch (TException | SyncConnectionException e) {
+          throw new SyncConnectionException("Unable to connect to receiver", e);
+        }
+        logger.info("Sync process starts to transfer data of storage group {}", entry.getKey());
+        syncDeletedFilesName(entry.getKey(), entry.getValue());
+        syncDataFilesInOneGroup(entry.getKey(), entry.getValue());
+      }
 
-      // 6. Sync data
-      syncAllData();
-    } catch (SyncConnectionException e) {
+      // 2. Clear sync log
+      clearSyncLog();
+
+    } catch (SyncConnectionException | TException e) {
       logger.error("cannot finish sync process", e);
+    } finally {
+      if (syncLog != null) {
+        syncLog.close();
+      }
       syncStatus = false;
-      return;
     }
+  }
 
-    // 7. clear snapshot
-    for (String snapshotPath : config.getSnapshotPaths()) {
-      FileUtils.deleteDirectory(new File(snapshotPath));
-    }
+  private void checkRecovery() {
+
+  }
+
+  public void clearSyncLog() {
 
-    // 8. notify receiver that synchronization finish
-    // At this point the synchronization has finished even if connection fails
-    try {
-      serviceClient.cleanUp();
-    } catch (TException e) {
-      logger.error("Unable to connect to receiver.", e);
-    }
-    transport.close();
-    logger.info("Sync process has finished.");
-    syncStatus = false;
   }
 
   @Override
-  public void syncAllData() throws SyncConnectionException {
-    for (Entry<String, Set<String>> entry : validAllFiles.entrySet()) {
-      Set<String> validFiles = entry.getValue();
-      Set<String> validSnapshot = validFileSnapshot.get(entry.getKey());
-      if (validSnapshot.isEmpty()) {
-        continue;
+  public void syncDeletedFilesName(String sgName, Set<File> deletedFilesName) {
+    if (deletedFilesName.isEmpty()) {
+      logger.info("There has no deleted files to be synced in storage group {}", sgName);
+      return;
+    }
+    logger.info("Start to sync names of deleted files in storage group {}", sgName);
+    for(File file:deletedFilesName){
+      try {
+        serviceClient.syncDeletedFileName(file.getName());
+      } catch (TException e) {
+        logger.error("Can not sync deleted file name {}, skip it.", file);
       }
-      logger.info("Sync process starts to transfer data of storage group {}", entry.getKey());
+    }
+    logger.info("Finish to sync names of deleted files in storage group {}", sgName);
+  }
+
+  @Override
+  public void syncDataFilesInOneGroup(String sgName, Set<File> toBeSyncFiles)
+      throws SyncConnectionException {
+    Set<String> validSnapshot = validFileSnapshot.get(sgName);
+    if (validSnapshot.isEmpty()) {
+      logger.info("There has no new tsfiles to be synced in storage group {}", sgName);
+      return;
+    }
+    logger.info("Sync process starts to transfer data of storage group {}", sgName);
+    int cnt = 0;
+    for (String tsfilePath : toBeSyncFiles) {
+      cnt++;
+      File snapshotFile = null;
       try {
-        if (!serviceClient.init(entry.getKey())) {
-          throw new SyncConnectionException("unable init receiver");
+        snapshotFile = makeFileSnapshot(tsfilePath);
+        syncSingleFile(snapshotFile);
+        logger.info("Task of synchronization has completed {}/{}.", cnt, toBeSyncFiles.size());
+      } catch (IOException e) {
+        logger.info(
+            "Tsfile {} can not make snapshot, so skip the tsfile and continue to sync other tsfiles",
+            tsfilePath, e);
+      } finally {
+        if(snapshotFile != null) {
+          snapshotFile.deleteOnExit();
         }
-      } catch (TException e) {
-        throw new SyncConnectionException("Unable to connect to receiver", e);
       }
-      syncData(validSnapshot);
-      if (afterSynchronization()) {
-        currentLocalFiles.get(entry.getKey()).addAll(validFiles);
-        syncFileManager.setCurrentLocalFiles(currentLocalFiles);
-        syncFileManager.backupNowLocalFileInfo(config.getLastFileInfo());
-        logger.info("Sync process has finished storage group {}.", entry.getKey());
-      } else {
-        logger.error("Receiver cannot sync data, abandon this synchronization of storage group {}",
-            entry.getKey());
+    }
+    logger.info("Sync process has finished storage group {}.", sgName);
+  }
+
+  private File makeFileSnapshot(String filePath) throws IOException {
+    String snapshotFilePath = SyncUtils.getSnapshotFilePath(filePath);
+    File newFile = new File(snapshotFilePath);
+    if (!newFile.getParentFile().exists()) {
+      newFile.getParentFile().mkdirs();
+    }
+    Path link = FileSystems.getDefault().getPath(snapshotFilePath);
+    Path target = FileSystems.getDefault().getPath(filePath);
+    Files.createLink(link, target);
+    return newFile;
+  }
+
+
+  /**
+   * Transfer data of a storage group to receiver.
+   *
+   */
+  private void syncSingleFile(File snapshotFile) throws SyncConnectionException {
+    try {
+      int retryCount = 0;
+      MessageDigest md = MessageDigest.getInstance(Constans.MESSAGE_DIGIT_NAME);
+      outer:
+      while (true) {
+        retryCount++;
+        if (retryCount > Constans.MAX_SYNC_FILE_TRY) {
+          throw new SyncConnectionException(String
+              .format("Can not sync file %s after %s tries.", snapshotFile.getAbsoluteFile(),
+                  Constans.MAX_SYNC_FILE_TRY));
+        }
+        md.reset();
+        byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE];
+        int dataLength;
+        try (FileInputStream fis = new FileInputStream(snapshotFile);
+            ByteArrayOutputStream bos = new ByteArrayOutputStream(Constans.DATA_CHUNK_SIZE)) {
+          while ((dataLength = fis.read(buffer)) != -1) { // cut the file into pieces to send
+            bos.write(buffer, 0, dataLength);
+            md.update(buffer, 0, dataLength);
+            ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
+            bos.reset();
+            if (!Boolean.parseBoolean(serviceClient
+                .syncData(null, snapshotFile.getName(), buffToSend,
+                    SyncDataStatus.PROCESSING_STATUS))) {
+              logger.info("Receiver failed to receive data from {}, retry.",
+                  snapshotFile.getAbsoluteFile());
+              continue outer;
+            }
+          }
+        }
+
+        // the file is sent successfully
+        String md5OfSender = (new BigInteger(1, md.digest())).toString(16);
+        String md5OfReceiver = serviceClient.syncData(md5OfSender, snapshotFile.getName(),
+            null, SyncDataStatus.FINISH_STATUS);
+        if (md5OfSender.equals(md5OfReceiver)) {
+          logger.info("Receiver has received {} successfully.", snapshotFile.getAbsoluteFile());
+          break;
+        }
       }
+    } catch (IOException e) {
+      throw new SyncConnectionException("Cannot sync data with receiver.", e);
     }
   }
 
+
   /**
    * Establish a connection between the sender and the receiver.
    *
@@ -271,7 +380,6 @@ public class DataTransferManager implements IDataTransferManager {
     try {
       transport.open();
     } catch (TTransportException e) {
-      syncStatus = false;
       logger.error("Cannot connect to server");
       throw new SyncConnectionException(e);
     }
@@ -317,19 +425,18 @@ public class DataTransferManager implements IDataTransferManager {
   }
 
   private String generateUUID() {
-    return Constans.SYNC_CLIENT + UUID.randomUUID().toString().replaceAll("-", "");
+    return Constans.SYNC_SENDER + UUID.randomUUID().toString().replaceAll("-", "");
   }
 
   /**
    * Create snapshots for valid files.
    */
   @Override
-  public Set<String> makeFileSnapshot(Set<String> validFiles) throws IOException {
+  public Set<String> makeFileSnapshot(Set<String> validFiles) {
     Set<String> validFilesSnapshot = new HashSet<>();
-    try {
-      for (String filePath : validFiles) {
+    for (String filePath : validFiles) {
+      try {
         String snapshotFilePath = SyncUtils.getSnapshotFilePath(filePath);
-        validFilesSnapshot.add(snapshotFilePath);
         File newFile = new File(snapshotFilePath);
         if (!newFile.getParentFile().exists()) {
           newFile.getParentFile().mkdirs();
@@ -337,144 +444,126 @@ public class DataTransferManager implements IDataTransferManager {
         Path link = FileSystems.getDefault().getPath(snapshotFilePath);
         Path target = FileSystems.getDefault().getPath(filePath);
         Files.createLink(link, target);
+        validFilesSnapshot.add(snapshotFilePath);
+      } catch (IOException e) {
+        logger.error("Can not make snapshot for file {}", filePath);
       }
-    } catch (IOException e) {
-      logger.error("Can not make fileSnapshot");
-      throw new IOException(e);
     }
     return validFilesSnapshot;
   }
 
   /**
-   * Transfer data of a storage group to receiver.
-   *
-   * @param fileSnapshotList list of sending snapshot files in a storage group.
-   */
-  public void syncData(Set<String> fileSnapshotList) throws SyncConnectionException {
-    try {
-      int successNum = 0;
-      for (String snapshotFilePath : fileSnapshotList) {
-        successNum++;
-        File file = new File(snapshotFilePath);
-        List<String> filePathSplit = new ArrayList<>();
-        String os = System.getProperty("os.name");
-        if (os.toLowerCase().startsWith("windows")) {
-          String[] name = snapshotFilePath.split(File.separator + File.separator);
-          filePathSplit.add(name[name.length - 2]);
-          filePathSplit.add(name[name.length - 1]);
-        } else {
-          String[] name = snapshotFilePath.split(File.separator);
-          filePathSplit.add(name[name.length - 2]);
-          filePathSplit.add(name[name.length - 1]);
-        }
-        int retryCount = 0;
-        // Get md5 of the file.
-        MessageDigest md = MessageDigest.getInstance("MD5");
-        outer:
-        while (true) {
-          retryCount++;
-          // Sync all data to receiver
-          if (retryCount > Constans.MAX_SYNC_FILE_TRY) {
-            throw new SyncConnectionException(String
-                .format("can not sync file %s after %s tries.", snapshotFilePath,
-                    Constans.MAX_SYNC_FILE_TRY));
-          }
-          md.reset();
-          byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE];
-          int dataLength;
-          try (FileInputStream fis = new FileInputStream(file);
-              ByteArrayOutputStream bos = new ByteArrayOutputStream(Constans.DATA_CHUNK_SIZE)) {
-            while ((dataLength = fis.read(buffer)) != -1) { // cut the file into pieces to send
-              bos.write(buffer, 0, dataLength);
-              md.update(buffer, 0, dataLength);
-              ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
-              bos.reset();
-              if (!Boolean.parseBoolean(serviceClient
-                  .syncData(null, filePathSplit, buffToSend, SyncDataStatus.PROCESSING_STATUS))) {
-                logger.info("Receiver failed to receive data from {}, retry.", snapshotFilePath);
-                continue outer;
-              }
-            }
-          }
-
-          // the file is sent successfully
-          String md5OfSender = (new BigInteger(1, md.digest())).toString(16);
-          String md5OfReceiver = serviceClient.syncData(md5OfSender, filePathSplit,
-              null, SyncDataStatus.FINISH_STATUS);
-          if (md5OfSender.equals(md5OfReceiver)) {
-            logger.info("Receiver has received {} successfully.", snapshotFilePath);
-            break;
-          }
-        }
-        logger.info(String.format("Task of synchronization has completed %d/%d.", successNum,
-            fileSnapshotList.size()));
-      }
-    } catch (Exception e) {
-      throw new SyncConnectionException("Cannot sync data with receiver.", e);
-    }
-  }
-
-  /**
    * Sync schema with receiver.
    */
   @Override
-  public void syncSchema() throws SyncConnectionException {
+  public void syncSchema() throws SyncConnectionException, TException {
     int retryCount = 0;
-    outer:
+    serviceClient.initSyncData(MetadataConstant.METADATA_LOG);
     while (true) {
-      retryCount++;
       if (retryCount > Constans.MAX_SYNC_FILE_TRY) {
         throw new SyncConnectionException(String
-            .format("can not sync schema after %s tries.", Constans.MAX_SYNC_FILE_TRY));
+            .format("Can not sync schema after %s tries.", Constans.MAX_SYNC_FILE_TRY));
       }
-      byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE];
-      try (FileInputStream fis = new FileInputStream(new File(config.getSchemaPath()));
-          ByteArrayOutputStream bos = new ByteArrayOutputStream(Constans.DATA_CHUNK_SIZE)) {
-        // Get md5 of the file.
-        MessageDigest md = MessageDigest.getInstance("MD5");
-        int dataLength;
-        while ((dataLength = fis.read(buffer)) != -1) { // cut the file into pieces to send
-          bos.write(buffer, 0, dataLength);
-          md.update(buffer, 0, dataLength);
+      try {
+        if (tryToSyncSchema()) {
+          writeSyncSchemaPos(getSchemaPosFile());
+          break;
+        }
+      } finally {
+        retryCount++;
+      }
+    }
+  }
+
+  private boolean tryToSyncSchema() {
+    int schemaPos = readSyncSchemaPos(getSchemaPosFile());
+
+    // start to sync file data and get md5 of this file.
+    try (BufferedReader br = new BufferedReader(new FileReader(getSchemaLogFile()));
+        ByteArrayOutputStream bos = new ByteArrayOutputStream(Constans.DATA_CHUNK_SIZE)) {
+      schemaFileLinePos = 0;
+      while (schemaFileLinePos++ <= schemaPos) {
+        br.readLine();
+      }
+      MessageDigest md = MessageDigest.getInstance(Constans.MESSAGE_DIGIT_NAME);
+      String line;
+      int cntLine = 0;
+      while ((line = br.readLine()) != null) {
+        schemaFileLinePos++;
+        byte[] singleLineData = BytesUtils.stringToBytes(line);
+        bos.write(singleLineData);
+        md.update(singleLineData);
+        if (cntLine++ == BATCH_LINE) {
           ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
           bos.reset();
           // PROCESSING_STATUS represents there is still schema buffer to send.
-          if (!Boolean.parseBoolean(
-              serviceClient.syncSchema(null, buffToSend, SyncDataStatus.PROCESSING_STATUS))) {
+          if (serviceClient.syncData(buffToSend) == ResultStatus.FAILURE) {
             logger.error("Receiver failed to receive metadata, retry.");
-            continue outer;
+            return false;
           }
+          cntLine = 0;
         }
-        bos.close();
-        String md5OfSender = (new BigInteger(1, md.digest())).toString(16);
-        String md5OfReceiver = serviceClient
-            .syncSchema(md5OfSender, null, SyncDataStatus.FINISH_STATUS);
-        if (md5OfSender.equals(md5OfReceiver)) {
-          logger.info("Receiver has received schema successfully.");
-          /** receiver start to load metadata **/
-          if (Boolean
-              .parseBoolean(serviceClient.syncSchema(null, null, SyncDataStatus.SUCCESS_STATUS))) {
-            throw new SyncConnectionException("Receiver failed to load metadata");
-          }
-          break;
+      }
+      if (bos.size() != 0) {
+        ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
+        bos.reset();
+        if (serviceClient.syncData(buffToSend) == ResultStatus.FAILURE) {
+          logger.error("Receiver failed to receive metadata, retry.");
+          return false;
         }
-      } catch (Exception e) {
-        logger.error("Cannot sync schema ", e);
-        throw new SyncConnectionException(e);
       }
+
+      // check md5
+      return checkMD5ForSchema((new BigInteger(1, md.digest())).toString(16));
+    } catch (NoSuchAlgorithmException | IOException | TException e) {
+      logger.error("Can not finish transfer schema to receiver", e);
+      return false;
     }
   }
 
-  @Override
-  public boolean afterSynchronization() throws SyncConnectionException {
-    boolean successOrNot;
+  private File getSchemaPosFile() {
+    return new File(config.getSenderPath(), Constans.SCHEMA_POS_FILE_NAME)
+  }
+
+  private File getSchemaLogFile() {
+    return new File(IoTDBDescriptor.getInstance().getConfig().getSchemaDir(),
+        MetadataConstant.METADATA_LOG);
+  }
+
+  private boolean checkMD5ForSchema(String md5OfSender) throws TException {
+    String md5OfReceiver = serviceClient.checkDataMD5(md5OfSender);
+    if (md5OfSender.equals(md5OfReceiver)) {
+      logger.info("Receiver has received schema successfully, retry.");
+      return true;
+    } else {
+      logger.error("MD5 check of schema file {} failed, retry", getSchemaLogFile().getAbsoluteFile());
+      return false;
+    }
+  }
+
+  private int readSyncSchemaPos(File syncSchemaLogFile) {
     try {
-      successOrNot = serviceClient.load();
-    } catch (TException e) {
-      throw new SyncConnectionException(
-          "Can not finish sync process because sync receiver has broken down.", e);
+      if (syncSchemaLogFile.exists()) {
+        try (BufferedReader br = new BufferedReader(new FileReader(syncSchemaLogFile))) {
+          return Integer.parseInt(br.readLine());
+        }
+      }
+    } catch (IOException e) {
+      logger.error("Can not find file {}", syncSchemaLogFile.getAbsoluteFile(), e);
+    }
+    return 0;
+  }
+
+  private void writeSyncSchemaPos(File syncSchemaLogFile) {
+    try {
+      if (syncSchemaLogFile.exists()) {
+        try (BufferedWriter br = new BufferedWriter(new FileWriter(syncSchemaLogFile))) {
+          br.write(Integer.toString(schemaFileLinePos));
+        }
+      }
+    } catch (IOException e) {
+      logger.error("Can not find file {}", syncSchemaLogFile.getAbsoluteFile(), e);
     }
-    return successOrNot;
   }
 
   /**
@@ -490,7 +579,7 @@ public class DataTransferManager implements IDataTransferManager {
       lockFile.createNewFile();
     }
     if (!lockInstance(config.getLockFilePath())) {
-      logger.error("Sync client is running.");
+      logger.error("Sync client is already running.");
       System.exit(1);
     }
   }
@@ -498,9 +587,9 @@ public class DataTransferManager implements IDataTransferManager {
   /**
    * Try to lock lockfile. if failed, it means that sync client has benn started.
    *
-   * @param lockFile path of lockfile
+   * @param lockFile path of lock file
    */
-  private static boolean lockInstance(final String lockFile) {
+  private boolean lockInstance(final String lockFile) {
     try {
       final File file = new File(lockFile);
       final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
@@ -510,7 +599,6 @@ public class DataTransferManager implements IDataTransferManager {
           try {
             fileLock.release();
             randomAccessFile.close();
-            FileUtils.forceDelete(file);
           } catch (Exception e) {
             logger.error("Unable to remove lock file: {}", lockFile, e);
           }
@@ -528,11 +616,15 @@ public class DataTransferManager implements IDataTransferManager {
     private static final DataTransferManager INSTANCE = new DataTransferManager();
   }
 
-  public void setConfig(SyncSenderConfig config) {
-    this.config = config;
+  private File getSyncLogFile() {
+    return new File(config.getSenderPath(), Constans.SYNC_LOG_NAME);
+  }
+
+  private File getCurrentLogFile() {
+    return new File(config.getSenderPath(), Constans.CURRENT_SYNC_LOG_NAME);
   }
 
-  public List<String> getSchema() {
-    return schema;
+  public void setConfig(SyncSenderConfig config) {
+    DataTransferManager.config = config;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
index 3941a90..82b0d00 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
@@ -18,9 +18,11 @@
  */
 package org.apache.iotdb.db.sync.sender.transfer;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Set;
 import org.apache.iotdb.db.exception.SyncConnectionException;
+import org.apache.thrift.TException;
 
 /**
  * SyncSender defines the methods of a sender in sync module.
@@ -45,22 +47,20 @@ public interface IDataTransferManager {
   /**
    * Make file snapshots before sending files.
    */
-  Set<String> makeFileSnapshot(Set<String> validFiles) throws IOException;
+  Set<String> makeFileSnapshot(Set<String> validFiles);
 
   /**
    * Send schema file to receiver.
    */
-  void syncSchema() throws SyncConnectionException;
+  void syncSchema() throws SyncConnectionException, TException;
 
-  /**
-   * For all valid files, send it to receiver side and load these data in receiver.
-   */
-  void syncAllData() throws SyncConnectionException;
+  void syncDeletedFilesName(String sgName, Set<File> deletedFilesName)
+      throws SyncConnectionException;
 
   /**
-   * Close the socket after sending files.
+   * For all valid files, send it to receiver side and load these data in receiver.
    */
-  boolean afterSynchronization() throws SyncConnectionException;
+  void syncDataFilesInOneGroup(String sgName, Set<File> deletedFilesName) throws SyncConnectionException;
 
   /**
    * Execute a sync task.
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
index 5a056a9..8e7a9e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
@@ -29,9 +29,6 @@ public class SyncUtils {
 
   private static final String IP_SEPARATOR = "\\.";
 
-  private static String[] snapshotPaths = SyncSenderDescriptor.getInstance()
-      .getConfig().getSnapshotPaths();
-
   private SyncUtils() {
   }
 
@@ -41,40 +38,20 @@ public class SyncUtils {
    * sender.
    */
   public static String getSnapshotFilePath(String filePath) {
-    String[] name;
-    String relativeFilePath;
-    String os = System.getProperty("os.name");
-    if (os.toLowerCase().startsWith("windows")) {
-      name = filePath.split(File.separator + File.separator);
-      relativeFilePath = name[name.length - 2] + File.separator + name[name.length - 1];
-    } else {
-      name = filePath.split(File.separator);
-      relativeFilePath = name[name.length - 2] + File.separator + name[name.length - 1];
-    }
-    String bufferWritePath = name[0];
-    for (int i = 1; i < name.length - 2; i++) {
-      bufferWritePath = bufferWritePath + File.separatorChar + name[i];
-    }
-    for (String snapshotPath : snapshotPaths) {
-      if (snapshotPath.startsWith(bufferWritePath)) {
-        if (!new File(snapshotPath).exists()) {
-          new File(snapshotPath).mkdir();
-        }
-        if (snapshotPath.length() > 0
-            && snapshotPath.charAt(snapshotPath.length() - 1) != File.separatorChar) {
-          snapshotPath = snapshotPath + File.separatorChar;
-        }
-        return snapshotPath + relativeFilePath;
-      }
+    String relativeFilePath =
+        new File(filePath).getParent() + File.separator + new File(filePath).getName();
+    String snapshotDir = SyncSenderDescriptor.getInstance().getConfig().getSnapshotPath();
+    if (!new File(snapshotDir).exists()) {
+      new File(snapshotDir).mkdirs();
     }
-    return null;
+    return new File(snapshotDir, relativeFilePath).getAbsolutePath();
   }
 
   /**
    * Verify sending list is empty or not It's used by sync sender.
    */
-  public static boolean isEmpty(Map<String, Set<String>> sendingFileList) {
-    for (Entry<String, Set<String>> entry : sendingFileList.entrySet()) {
+  public static boolean isEmpty(Map<String, Set<File>> sendingFileList) {
+    for (Entry<String, Set<File>> entry : sendingFileList.entrySet()) {
       if (!entry.getValue().isEmpty()) {
         return false;
       }
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java
index ffb1f2b..7a06281 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java
@@ -147,7 +147,7 @@ public class SingleClientSyncTest {
       "insert into root.test.d0(timestamp,s1) values(3000,'1309')",
       "insert into root.test.d1.g0(timestamp,s0) values(400,1050)", "merge", "flush",};
   private boolean testFlag = Constant.testFlag;
-  private static final String SYNC_CLIENT = Constans.SYNC_CLIENT;
+  private static final String SYNC_CLIENT = Constans.SYNC_SENDER;
   private static final Logger logger = LoggerFactory.getLogger(SingleClientSyncTest.class);
 
   public static void main(String[] args) throws Exception {
@@ -159,7 +159,7 @@ public class SingleClientSyncTest {
   }
 
   public void setConfig() {
-    config.setUuidPath(
+    config.setSenderPath(
         config.getDataDirectory() + SYNC_CLIENT + File.separator + Constans.UUID_FILE_NAME);
     config.setLastFileInfo(
         config.getDataDirectory() + SYNC_CLIENT + File.separator + Constans.LAST_LOCAL_FILE_NAME);
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/SyncFileManagerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/SyncFileManagerTest.java
index 4c8356a..322dfe0 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/sender/SyncFileManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/SyncFileManagerTest.java
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
 
 public class SyncFileManagerTest {
 
-  private static final String POST_BACK_DIRECTORY_TEST = Constans.SYNC_CLIENT + File.separator;
+  private static final String POST_BACK_DIRECTORY_TEST = Constans.SYNC_SENDER + File.separator;
   private static final String LAST_FILE_INFO_TEST =
       POST_BACK_DIRECTORY_TEST + Constans.LAST_LOCAL_FILE_NAME;
   private static final String SENDER_FILE_PATH_TEST = POST_BACK_DIRECTORY_TEST + "data";
diff --git a/service-rpc/src/main/thrift/sync.thrift b/service-rpc/src/main/thrift/sync.thrift
index fec6079..6fc394a 100755
--- a/service-rpc/src/main/thrift/sync.thrift
+++ b/service-rpc/src/main/thrift/sync.thrift
@@ -22,17 +22,18 @@ typedef i32 int
 typedef i16 short
 typedef i64 long
 
-enum SyncDataStatus {
-  SUCCESS_STATUS,
-  FINISH_STATUS,
-  PROCESSING_STATUS
+enum ResultStatus {
+  SUCCESS,
+  FAILURE,
+  BUSY
 }
 
 service SyncService{
-	bool checkIdentity(1:string uuid, 2:string address)
-	string syncSchema(1:string md5, 2:binary buff, 3:SyncDataStatus status)
-	string syncData(1:string md5, 2:list<string> filename, 3:binary buff, 4:SyncDataStatus status)
-	bool load()
-	void cleanUp()
-	bool init(1:string storageGroupName)
+	ResultStatus checkIdentity(1:string address)
+	ResultStatus init(1:string storageGroupName)
+	ResultStatus syncDeletedFileName(1:string fileName)
+	ResultStatus initSyncData(1:string filename)
+	ResultStatus syncData(1:binary buff)
+	string checkDataMD5(1:string md5)
+	void endSync()
 }
\ No newline at end of file


[incubator-iotdb] 01/05: add sync new code framework

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 85cd7783b87e4a01fafb6dccd45629b2d2e3dc9f
Author: lta <li...@163.com>
AuthorDate: Mon Aug 12 20:18:02 2019 +0800

    add sync new code framework
---
 server/iotdb/tools/start-sync-client.bat           |  2 +-
 server/iotdb/tools/start-sync-client.sh            |  2 +-
 .../org/apache/iotdb/db/sync/package-info.java     | 20 +++++++++++++
 .../iotdb/db/sync/receiver/SyncServerManager.java  |  1 +
 .../iotdb/db/sync/receiver/load/IFileLoader.java   | 25 ++++++++++++++++
 .../receiver/recover/ISyncReceiverLogAnalyzer.java | 27 +++++++++++++++++
 .../sync/receiver/recover/ISyncReceiverLogger.java | 23 ++++++++++++++
 .../receiver/{ => transfer}/SyncServiceImpl.java   |  4 +--
 .../iotdb/db/sync/sender/SyncFileManager.java      |  6 ++--
 .../iotdb/db/sync/{ => sender}/conf/Constans.java  |  2 +-
 .../sync/{ => sender}/conf/SyncSenderConfig.java   |  2 +-
 .../{ => sender}/conf/SyncSenderDescriptor.java    |  2 +-
 .../iotdb/db/sync/sender/manage/IFileManager.java  | 25 ++++++++++++++++
 .../sender/recover/ISyncSenderLogAnalyzer.java     | 33 ++++++++++++++++++++
 .../db/sync/sender/recover/ISyncSenderLogger.java  | 35 ++++++++++++++++++++++
 .../DataTransferManager.java}                      | 21 ++++++-------
 .../IDataTransferManager.java}                     |  4 +--
 .../java/org/apache/iotdb/db/utils/SyncUtils.java  |  2 +-
 .../iotdb/db/sync/sender/SingleClientSyncTest.java |  9 +++---
 .../iotdb/db/sync/sender/SyncFileManagerTest.java  |  2 +-
 20 files changed, 219 insertions(+), 28 deletions(-)

diff --git a/server/iotdb/tools/start-sync-client.bat b/server/iotdb/tools/start-sync-client.bat
index 49360ef..dc44fcd 100755
--- a/server/iotdb/tools/start-sync-client.bat
+++ b/server/iotdb/tools/start-sync-client.bat
@@ -29,7 +29,7 @@ set IOTDB_CONF=%IOTDB_HOME%\conf
 set IOTDB_LOGS=%IOTDB_HOME%\logs
 
 
-if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.db.sync.sender.SyncSenderImpl
+if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.db.sync.sender.transfer.DataTransferManager
 if NOT DEFINED JAVA_HOME goto :err
 
 @REM -----------------------------------------------------------------------------
diff --git a/server/iotdb/tools/start-sync-client.sh b/server/iotdb/tools/start-sync-client.sh
index c796197..a3ae56a 100755
--- a/server/iotdb/tools/start-sync-client.sh
+++ b/server/iotdb/tools/start-sync-client.sh
@@ -47,7 +47,7 @@ for f in ${IOTDB_HOME}/lib/*.jar; do
   CLASSPATH=${CLASSPATH}":"$f
 done
 
-MAIN_CLASS=org.apache.iotdb.db.sync.sender.SyncSenderImpl
+MAIN_CLASS=org.apache.iotdb.db.sync.sender.transfer.DataTransferManager
 
 "$JAVA" -DIOTDB_HOME=${IOTDB_HOME} -DTSFILE_HOME=${IOTDB_HOME} -DIOTDB_CONF=${IOTDB_CONF} -Dlogback.configurationFile=${IOTDB_CONF}/logback.xml $IOTDB_DERBY_OPTS $IOTDB_JMX_OPTS -Dname=SyncClient -cp "$CLASSPATH" "$MAIN_CLASS"
 
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/package-info.java b/server/src/main/java/org/apache/iotdb/db/sync/package-info.java
new file mode 100644
index 0000000..97a4ec5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/package-info.java
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.sync;
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
index e4e220b..ee1fd82 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.sync.receiver.transfer.SyncServiceImpl;
 import org.apache.iotdb.service.sync.thrift.SyncService;
 import org.apache.iotdb.service.sync.thrift.SyncService.Processor;
 import org.apache.thrift.protocol.TBinaryProtocol;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/IFileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/IFileLoader.java
new file mode 100644
index 0000000..aaa3ff7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/IFileLoader.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.receiver.load;
+
+public interface IFileLoader {
+
+  void loadTsFiles();
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/ISyncReceiverLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/ISyncReceiverLogAnalyzer.java
new file mode 100644
index 0000000..5d6351e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/ISyncReceiverLogAnalyzer.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.receiver.recover;
+
+public interface ISyncReceiverLogAnalyzer {
+  void recover();
+
+  void scanLogger(String path);
+
+  void clearLogger();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/ISyncReceiverLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/ISyncReceiverLogger.java
new file mode 100644
index 0000000..9fd3109
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/ISyncReceiverLogger.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.receiver.recover;
+
+public interface ISyncReceiverLogger {
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
similarity index 99%
rename from server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
rename to server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index 1dcd4f5..4866d41 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.sync.receiver;
+package org.apache.iotdb.db.sync.receiver.transfer;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -54,7 +54,7 @@ import org.apache.iotdb.db.metadata.MetadataConstant;
 import org.apache.iotdb.db.metadata.MetadataOperationType;
 import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.sync.conf.Constans;
+import org.apache.iotdb.db.sync.sender.conf.Constans;
 import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.db.utils.SyncUtils;
 import org.apache.iotdb.service.sync.thrift.SyncDataStatus;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/SyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/SyncFileManager.java
index c7119aa..18c3b60 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/SyncFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/SyncFileManager.java
@@ -31,9 +31,9 @@ import java.util.Map.Entry;
 import java.util.Set;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.sync.conf.Constans;
-import org.apache.iotdb.db.sync.conf.SyncSenderConfig;
-import org.apache.iotdb.db.sync.conf.SyncSenderDescriptor;
+import org.apache.iotdb.db.sync.sender.conf.Constans;
+import org.apache.iotdb.db.sync.sender.conf.SyncSenderConfig;
+import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
similarity index 97%
rename from server/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java
rename to server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
index c810c4c..c8e2ce2 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.sync.conf;
+package org.apache.iotdb.db.sync.sender.conf;
 
 public class Constans {
 
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java
rename to server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
index fd23b64..572e5df 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.sync.conf;
+package org.apache.iotdb.db.sync.sender.conf;
 
 import java.io.File;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderDescriptor.java
similarity index 99%
rename from server/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java
rename to server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderDescriptor.java
index 2587e26..2427c10 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderDescriptor.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.sync.conf;
+package org.apache.iotdb.db.sync.sender.conf;
 
 import java.io.File;
 import java.io.FileInputStream;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/IFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/IFileManager.java
new file mode 100644
index 0000000..b1ba08f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/IFileManager.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.sender.manage;
+
+public interface IFileManager {
+
+
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
new file mode 100644
index 0000000..3b33f78
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.sender.recover;
+
+import java.util.Set;
+
+public interface ISyncSenderLogAnalyzer {
+
+  void loadLastLocalFiles(Set<String> set);
+
+  void loadLogger(Set<String> set);
+
+  void recover();
+
+  void clearLogger();
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
new file mode 100644
index 0000000..c516936
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.sender.recover;
+
+public interface ISyncSenderLogger {
+
+  void startSyncDeletedFilesName();
+
+  void finishSyncDeletedFileName(String fileName);
+
+  void endSyncDeletedFilsName();
+
+  void startSyncTsFiles();
+
+  void finishSyncTsfile(String fileName);
+
+  void endSyncTsFiles();
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/SyncSenderImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/db/sync/sender/SyncSenderImpl.java
rename to server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
index 88d6705..59dc27e 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/SyncSenderImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
@@ -15,7 +15,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.sync.sender;
+package org.apache.iotdb.db.sync.sender.transfer;
 
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
@@ -47,9 +47,10 @@ import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.exception.SyncConnectionException;
-import org.apache.iotdb.db.sync.conf.Constans;
-import org.apache.iotdb.db.sync.conf.SyncSenderConfig;
-import org.apache.iotdb.db.sync.conf.SyncSenderDescriptor;
+import org.apache.iotdb.db.sync.sender.SyncFileManager;
+import org.apache.iotdb.db.sync.sender.conf.Constans;
+import org.apache.iotdb.db.sync.sender.conf.SyncSenderConfig;
+import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor;
 import org.apache.iotdb.db.utils.SyncUtils;
 import org.apache.iotdb.service.sync.thrift.SyncDataStatus;
 import org.apache.iotdb.service.sync.thrift.SyncService;
@@ -65,9 +66,9 @@ import org.slf4j.LoggerFactory;
 /**
  * SyncSenderImpl is used to transfer tsfiles that needs to sync to receiver.
  */
-public class SyncSenderImpl implements SyncSender {
+public class DataTransferManager implements IDataTransferManager {
 
-  private static final Logger logger = LoggerFactory.getLogger(SyncSenderImpl.class);
+  private static final Logger logger = LoggerFactory.getLogger(DataTransferManager.class);
 
   private TTransport transport;
 
@@ -101,11 +102,11 @@ public class SyncSenderImpl implements SyncSender {
 
   private ScheduledExecutorService executorService;
 
-  private SyncSenderImpl() {
+  private DataTransferManager() {
     init();
   }
 
-  public static final SyncSenderImpl getInstance() {
+  public static final DataTransferManager getInstance() {
     return InstanceHolder.INSTANCE;
   }
 
@@ -116,7 +117,7 @@ public class SyncSenderImpl implements SyncSender {
    */
   public static void main(String[] args) throws IOException {
     Thread.currentThread().setName(ThreadName.SYNC_CLIENT.getName());
-    SyncSenderImpl fileSenderImpl = new SyncSenderImpl();
+    DataTransferManager fileSenderImpl = new DataTransferManager();
     fileSenderImpl.verifySingleton();
     fileSenderImpl.startMonitor();
     fileSenderImpl.startTimedTask();
@@ -524,7 +525,7 @@ public class SyncSenderImpl implements SyncSender {
 
   private static class InstanceHolder {
 
-    private static final SyncSenderImpl INSTANCE = new SyncSenderImpl();
+    private static final DataTransferManager INSTANCE = new DataTransferManager();
   }
 
   public void setConfig(SyncSenderConfig config) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/SyncSender.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/db/sync/sender/SyncSender.java
rename to server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
index d45a09e..3941a90 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/SyncSender.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.sync.sender;
+package org.apache.iotdb.db.sync.sender.transfer;
 
 import java.io.IOException;
 import java.util.Set;
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.exception.SyncConnectionException;
 /**
  * SyncSender defines the methods of a sender in sync module.
  */
-public interface SyncSender {
+public interface IDataTransferManager {
 
   /**
    * Init
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
index d395038..5a056a9 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
@@ -23,7 +23,7 @@ import java.text.DecimalFormat;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import org.apache.iotdb.db.sync.conf.SyncSenderDescriptor;
+import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor;
 
 public class SyncUtils {
 
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java
index 86b82f6..ffb1f2b 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java
@@ -35,9 +35,10 @@ import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.exception.SyncConnectionException;
 import org.apache.iotdb.db.integration.Constant;
 import org.apache.iotdb.db.service.IoTDB;
-import org.apache.iotdb.db.sync.conf.Constans;
-import org.apache.iotdb.db.sync.conf.SyncSenderConfig;
-import org.apache.iotdb.db.sync.conf.SyncSenderDescriptor;
+import org.apache.iotdb.db.sync.sender.conf.Constans;
+import org.apache.iotdb.db.sync.sender.conf.SyncSenderConfig;
+import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor;
+import org.apache.iotdb.db.sync.sender.transfer.DataTransferManager;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.jdbc.Config;
 import org.slf4j.Logger;
@@ -49,7 +50,7 @@ import org.slf4j.LoggerFactory;
  */
 public class SingleClientSyncTest {
 
-  SyncSenderImpl fileSenderImpl = SyncSenderImpl.getInstance();
+  DataTransferManager fileSenderImpl = DataTransferManager.getInstance();
   private IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
   private String serverIpTest = "192.168.130.7";
   private SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig();
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/SyncFileManagerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/SyncFileManagerTest.java
index 9398c12..4c8356a 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/sender/SyncFileManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/SyncFileManagerTest.java
@@ -26,7 +26,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
-import org.apache.iotdb.db.sync.conf.Constans;
+import org.apache.iotdb.db.sync.sender.conf.Constans;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;


[incubator-iotdb] 04/05: complete sync sender module

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit b15ec6bf1bc5dcd9779cfc6e1de5b9afbca69691
Author: lta <li...@163.com>
AuthorDate: Thu Aug 22 11:11:56 2019 +0800

    complete sync sender module
---
 .../org/apache/iotdb/db/sync/package-info.java     |  19 +
 .../db/sync/receiver/transfer/SyncServiceImpl.java | 302 ----------------
 .../apache/iotdb/db/sync/sender/conf/Constans.java |  15 +-
 .../db/sync/sender/manage/ISyncFileManager.java    |  24 ++
 .../db/sync/sender/manage/SyncFileManager.java     |  47 ++-
 .../sender/recover/ISyncSenderLogAnalyzer.java     |   9 +-
 .../db/sync/sender/recover/ISyncSenderLogger.java  |  26 ++
 .../sync/sender/recover/SyncSenderLogAnalyzer.java |   3 +-
 .../sync/sender/transfer/DataTransferManager.java  | 397 ++++++++++++---------
 .../sync/sender/transfer/IDataTransferManager.java |  42 ++-
 10 files changed, 377 insertions(+), 507 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/sync/package-info.java b/server/src/main/java/org/apache/iotdb/db/sync/package-info.java
index 97a4ec5..f3110e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/package-info.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/package-info.java
@@ -17,4 +17,23 @@
  * under the License.
  */
 
+/**
+ * <p>
+ * Package Sync is a suite tool that periodically uploads persistent tsfiles from the sender disk to
+ * the receiver and loads them. With merge module, synchronous update of write, update and delete
+ * operations can be synced.
+ *
+ * On the sender side of the sync, the sync module is a separate process, independent of the IoTDB
+ * process. It can be started and closed through a separate script.
+ *
+ * On the receiver side of the sync, the sync module is embedded in the engine of IoTDB and is in
+ * the same process with IoTDB. The receiver module listens for a separate port. Before using it, it
+ * needs to set up a whitelist at the sync receiver, which is expressed as a network segment. The
+ * receiver only accepts the data transferred from the sender located in the whitelist segment.
+ *
+ * Due to the IoTDB system supports multiple directories of data files, it will perform sub-tasks
+ * according to disks in every complete synchronization task, because hard links are needed in the
+ * execution process. Hard links can not be operated across disk partitions, and a synchronization
+ * task will be performed in turn according to disks.
+ */
 package org.apache.iotdb.db.sync;
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index 02bb5b6..61ff2ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -391,305 +391,6 @@ public class SyncServiceImpl implements SyncService.Iface {
   }
 
   /**
-   * Get all tsfiles' info which are sent from sender, it is preparing for merging these data
-   */
-  public void getFileNodeInfo() throws IOException {
-    File dataFileRoot = new File(syncDataPath);
-    File[] files = dataFileRoot.listFiles();
-    int processedNum = 0;
-    for (File storageGroupPB : files) {
-      List<String> filesPath = new ArrayList<>();
-      File[] filesSG = storageGroupPB.listFiles();
-      for (File fileTF : filesSG) { // fileTF means TsFiles
-        Map<String, Long> startTimeMap = new HashMap<>();
-        Map<String, Long> endTimeMap = new HashMap<>();
-        TsFileSequenceReader reader = null;
-        try {
-          reader = new TsFileSequenceReader(fileTF.getPath());
-          Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
-          Iterator<String> it = deviceIdMap.keySet().iterator();
-          while (it.hasNext()) {
-            String key = it.next();
-            TsDeviceMetadataIndex device = deviceIdMap.get(key);
-            startTimeMap.put(key, device.getStartTime());
-            endTimeMap.put(key, device.getEndTime());
-          }
-        } catch (IOException e) {
-          logger.error("Unable to read tsfile {}", fileTF.getPath());
-          throw new IOException(e);
-        } finally {
-          try {
-            if (reader != null) {
-              reader.close();
-            }
-          } catch (IOException e) {
-            logger.error("Cannot close tsfile stream {}", fileTF.getPath());
-            throw new IOException(e);
-          }
-        }
-        fileNodeStartTime.get().put(fileTF.getPath(), startTimeMap);
-        fileNodeEndTime.get().put(fileTF.getPath(), endTimeMap);
-        filesPath.add(fileTF.getPath());
-        processedNum++;
-        logger.info(String
-            .format("Get tsfile info has complete : %d/%d", processedNum, fileNum.get()));
-        fileNodeMap.get().put(storageGroupPB.getName(), filesPath);
-      }
-    }
-  }
-
-
-  /**
-   * It is to merge data. If data in the tsfile is new, append the tsfile to the storage group
-   * directly. If data in the tsfile is old, it has two strategy to merge.It depends on the
-   * possibility of updating historical data.
-   */
-  public void loadData() throws StorageEngineException {
-    syncDataPath = FilePathUtils.regularizePath(syncDataPath);
-    int processedNum = 0;
-    for (String storageGroup : fileNodeMap.get().keySet()) {
-      List<String> filesPath = fileNodeMap.get().get(storageGroup);
-      /**  before load external tsFile, it is necessary to order files in the same storage group **/
-      Collections.sort(filesPath, (o1, o2) -> {
-        Map<String, Long> startTimePath1 = fileNodeStartTime.get().get(o1);
-        Map<String, Long> endTimePath2 = fileNodeEndTime.get().get(o2);
-        for (Entry<String, Long> entry : endTimePath2.entrySet()) {
-          if (startTimePath1.containsKey(entry.getKey())) {
-            if (startTimePath1.get(entry.getKey()) > entry.getValue()) {
-              return 1;
-            } else {
-              return -1;
-            }
-          }
-        }
-        return 0;
-      });
-
-      for (String path : filesPath) {
-        // get startTimeMap and endTimeMap
-        Map<String, Long> startTimeMap = fileNodeStartTime.get().get(path);
-        Map<String, Long> endTimeMap = fileNodeEndTime.get().get(path);
-
-        // create a new fileNode
-        String header = syncDataPath;
-        String relativePath = path.substring(header.length());
-        TsFileResource fileNode = new TsFileResource(
-            new File(DirectoryManager.getInstance().getNextFolderIndexForSequenceFile() +
-                File.separator + relativePath), startTimeMap, endTimeMap
-        );
-        // call interface of load external file
-        try {
-          if (!STORAGE_GROUP_MANAGER.appendFileToStorageGroupProcessor(storageGroup, fileNode, path)) {
-            // it is a file with unsequence data
-            if (config.isUpdateHistoricalDataPossibility()) {
-              loadOldData(path);
-            } else {
-              List<String> overlapFiles = STORAGE_GROUP_MANAGER.getOverlapFiles(
-                  storageGroup,
-                  fileNode, uuid.get());
-              if (overlapFiles.isEmpty()) {
-                loadOldData(path);
-              } else {
-                loadOldData(path, overlapFiles);
-              }
-            }
-          }
-        } catch (StorageEngineException | IOException | ProcessorException e) {
-          logger.error("Can not load external file {}", path);
-          throw new StorageEngineException(e);
-        }
-        processedNum++;
-        logger.info(String
-            .format("Merging files has completed : %d/%d", processedNum, fileNum.get()));
-      }
-    }
-  }
-
-  /**
-   * Insert all data in the tsfile into IoTDB.
-   */
-  public void loadOldData(String filePath) throws IOException, ProcessorException {
-    Set<String> timeseriesSet = new HashSet<>();
-    TsFileSequenceReader reader = null;
-    QueryProcessExecutor insertExecutor = new QueryProcessExecutor();
-    try {
-      /** use tsfile reader to get data **/
-      reader = new TsFileSequenceReader(filePath);
-      Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
-      Iterator<Entry<String, TsDeviceMetadataIndex>> entryIterator = deviceIdMap.entrySet()
-          .iterator();
-      while (entryIterator.hasNext()) {
-        Entry<String, TsDeviceMetadataIndex> deviceMIEntry = entryIterator.next();
-        String deviceId = deviceMIEntry.getKey();
-        TsDeviceMetadataIndex deviceMI = deviceMIEntry.getValue();
-        TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(deviceMI);
-        List<ChunkGroupMetaData> rowGroupMetadataList = deviceMetadata.getChunkGroupMetaDataList();
-        timeseriesSet.clear();
-        /** firstly, get all timeseries in the same device **/
-        for (ChunkGroupMetaData chunkGroupMetaData : rowGroupMetadataList) {
-          List<ChunkMetaData> chunkMetaDataList = chunkGroupMetaData
-              .getChunkMetaDataList();
-          for (ChunkMetaData chunkMetaData : chunkMetaDataList) {
-            String measurementUID = chunkMetaData.getMeasurementUid();
-            measurementUID = deviceId + "." + measurementUID;
-            timeseriesSet.add(measurementUID);
-          }
-        }
-        /** Secondly, use tsFile Reader to form InsertPlan **/
-        ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader);
-        List<Path> paths = new ArrayList<>();
-        paths.clear();
-        for (String timeseries : timeseriesSet) {
-          paths.add(new Path(timeseries));
-        }
-        QueryExpression queryExpression = QueryExpression.create(paths, null);
-        QueryDataSet queryDataSet = readTsFile.query(queryExpression);
-        while (queryDataSet.hasNext()) {
-          RowRecord record = queryDataSet.next();
-          List<Field> fields = record.getFields();
-          List<String> measurementList = new ArrayList<>();
-          List<String> insertValues = new ArrayList<>();
-          for (int i = 0; i < fields.size(); i++) {
-            Field field = fields.get(i);
-            if (!field.isNull()) {
-              measurementList.add(paths.get(i).getMeasurement());
-              if (fields.get(i).getDataType() == TSDataType.TEXT) {
-                insertValues.add(String.format("'%s'", field.toString()));
-              } else {
-                insertValues.add(String.format("%s", field.toString()));
-              }
-            }
-          }
-          if (insertExecutor.insert(new InsertPlan(deviceId, record.getTimestamp(),
-              measurementList.toArray(new String[0]), insertValues.toArray(new String[0])))) {
-            throw new IOException("Inserting series data to IoTDB engine has failed.");
-          }
-        }
-      }
-    } catch (IOException e) {
-      logger.error("Can not parse tsfile into SQL", e);
-      throw new IOException(e);
-    } catch (ProcessorException e) {
-      logger.error("Meet error while processing non-query.");
-      throw new ProcessorException(e);
-    } finally {
-      try {
-        if (reader != null) {
-          reader.close();
-        }
-      } catch (IOException e) {
-        logger.error("Cannot close file stream {}", filePath, e);
-      }
-    }
-  }
-
-  /**
-   * Insert those valid data in the tsfile into IoTDB
-   *
-   * @param overlapFiles:files which are conflict with the sync file
-   */
-  public void loadOldData(String filePath, List<String> overlapFiles)
-      throws IOException, ProcessorException {
-    Set<String> timeseriesList = new HashSet<>();
-    QueryProcessExecutor insertExecutor = new QueryProcessExecutor();
-    Map<String, ReadOnlyTsFile> tsfilesReaders = openReaders(filePath, overlapFiles);
-    try {
-      TsFileSequenceReader reader = new TsFileSequenceReader(filePath);
-      Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
-      Iterator<String> it = deviceIdMap.keySet().iterator();
-      while (it.hasNext()) {
-        String deviceID = it.next();
-        TsDeviceMetadataIndex deviceMI = deviceIdMap.get(deviceID);
-        TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(deviceMI);
-        List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata
-            .getChunkGroupMetaDataList();
-        timeseriesList.clear();
-        /** firstly, get all timeseries in the same device **/
-        for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
-          List<ChunkMetaData> chunkMetaDataList = chunkGroupMetaData.getChunkMetaDataList();
-          for (ChunkMetaData timeSeriesChunkMetaData : chunkMetaDataList) {
-            String measurementUID = timeSeriesChunkMetaData.getMeasurementUid();
-            measurementUID = deviceID + "." + measurementUID;
-            timeseriesList.add(measurementUID);
-          }
-        }
-        reader.close();
-
-        /** secondly, use tsFile Reader to form SQL **/
-        ReadOnlyTsFile readOnlyTsFile = tsfilesReaders.get(filePath);
-        List<Path> paths = new ArrayList<>();
-        /** compare data with one timeseries in a round to get valid data **/
-        for (String timeseries : timeseriesList) {
-          paths.clear();
-          paths.add(new Path(timeseries));
-          Set<InsertPlan> originDataPoints = new HashSet<>();
-          QueryExpression queryExpression = QueryExpression.create(paths, null);
-          QueryDataSet queryDataSet = readOnlyTsFile.query(queryExpression);
-          Set<InsertPlan> newDataPoints = convertToInserPlans(queryDataSet, paths, deviceID);
-
-          /** get all data with the timeseries in all overlap files. **/
-          for (String overlapFile : overlapFiles) {
-            ReadOnlyTsFile readTsFileOverlap = tsfilesReaders.get(overlapFile);
-            QueryDataSet queryDataSetOverlap = readTsFileOverlap.query(queryExpression);
-            originDataPoints.addAll(convertToInserPlans(queryDataSetOverlap, paths, deviceID));
-          }
-
-          /** If there has no overlap data with the timeseries, inserting all data in the sync file **/
-          if (originDataPoints.isEmpty()) {
-            for (InsertPlan insertPlan : newDataPoints) {
-              if (insertExecutor.insert(insertPlan)) {
-                throw new IOException("Inserting series data to IoTDB engine has failed.");
-              }
-            }
-          } else {
-            /** Compare every data to get valid data **/
-            for (InsertPlan insertPlan : newDataPoints) {
-              if (!originDataPoints.contains(insertPlan)) {
-                if (insertExecutor.insert(insertPlan)) {
-                  throw new IOException("Inserting series data to IoTDB engine has failed.");
-                }
-              }
-            }
-          }
-        }
-      }
-    } catch (IOException e) {
-      logger.error("Can not parse tsfile into SQL", e);
-      throw new IOException(e);
-    } catch (ProcessorException e) {
-      logger.error("Meet error while processing non-query.", e);
-      throw new ProcessorException(e);
-    } finally {
-      try {
-        closeReaders(tsfilesReaders);
-      } catch (IOException e) {
-        logger.error("Cannot close file stream {}", filePath, e);
-      }
-    }
-  }
-
-  private Set<InsertPlan> convertToInserPlans(QueryDataSet queryDataSet, List<Path> paths, String deviceID) throws IOException {
-    Set<InsertPlan> plans = new HashSet<>();
-    while (queryDataSet.hasNext()) {
-      RowRecord record = queryDataSet.next();
-      List<Field> fields = record.getFields();
-      /** get all data with the timeseries in the sync file **/
-      for (int i = 0; i < fields.size(); i++) {
-        Field field = fields.get(i);
-        String[] measurementList = new String[1];
-        if (!field.isNull()) {
-          measurementList[0] = paths.get(i).getMeasurement();
-          InsertPlan insertPlan = new InsertPlan(deviceID, record.getTimestamp(),
-              measurementList, new String[]{field.getDataType() == TSDataType.TEXT ? String.format("'%s'", field.toString())
-              : field.toString()});
-          plans.add(insertPlan);
-        }
-      }
-    }
-    return plans;
-  }
-
-  /**
    * Open all tsfile reader and cache
    */
   private Map<String, ReadOnlyTsFile> openReaders(String filePath, List<String> overlapFiles)
@@ -716,11 +417,8 @@ public class SyncServiceImpl implements SyncService.Iface {
    */
   @Override
   public void cleanUp() {
-    uuid.remove();
     fileNum.remove();
     fileNodeMap.remove();
-    fileNodeStartTime.remove();
-    fileNodeEndTime.remove();
     schemaFromSenderPath.remove();
     try {
       FileUtils.deleteDirectory(new File(syncFolderPath));
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
index dfe3c52..dffa1e0 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
@@ -27,15 +27,22 @@ public class Constans {
   public static final String SYNC_SENDER = "sync-sender";
   public static final String SYNC_RECEIVER = "sync-receiver";
 
+  public static final String MESSAGE_DIGIT_NAME = "MD5";
+  public static final String SYNC_DIR_NAME_SEPARATOR = "_";
+
+  // sender section
+
   public static final String LOCK_FILE_NAME = "sync_lock";
+
   public static final String SCHEMA_POS_FILE_NAME = "sync_schema_pos";
+
   public static final String LAST_LOCAL_FILE_NAME = "last_local_files.txt";
+
   public static final String CURRENT_LOCAL_FILE_NAME = "current_local_files.txt";
+
   public static final String DATA_SNAPSHOT_NAME = "snapshot";
-  public static final String SYNC_LOG_NAME = "sync.log";
 
-  public static final String MESSAGE_DIGIT_NAME = "MD5";
-  public static final String SYNC_DIR_NAME_SEPARATOR = "_";
+  public static final String SYNC_LOG_NAME = "sync.log";
 
   /**
    * Split data file , block size at each transmission
@@ -43,7 +50,7 @@ public class Constans {
   public static final int DATA_CHUNK_SIZE = 64 * 1024 * 1024;
 
   /**
-   * Max try when syncing the same file to receiver fails.
+   * Maximum try when syncing the same file to receiver fails.
    */
   public static final int MAX_SYNC_FILE_TRY = 5;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
index 1a529be..684db79 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
@@ -21,11 +21,35 @@ package org.apache.iotdb.db.sync.sender.manage;
 import java.io.File;
 import java.io.IOException;
 
+/**
+ * This interface is used to manage deleted files and new closed files that need to be synchronized in each
+ * sync task.
+ */
 public interface ISyncFileManager {
 
+  /**
+   * Find out all closed and unmodified files, which means there has a .resource file and doesn't
+   * have a .mod file. For these files, they will eventually generate a new tsfile file as the merge
+   * operation is executed and executed in subsequent synchronization tasks.
+   *
+   * @param dataDir data directory
+   */
   void getCurrentLocalFiles(String dataDir);
 
+  /**
+   * Load last local files from file<lastLocalFile> which does not contain those tsfiles which are
+   * not synced successfully in previous sync tasks.
+   *
+   * @param lastLocalFile last local file, which may not exist in first sync task.
+   */
   void getLastLocalFiles(File lastLocalFile) throws IOException;
 
+  /**
+   * Based on current local files and last local files, we can distinguish two kinds of files
+   * between them, one is deleted files, the other is new files. These two kinds of files are valid
+   * files that need to be synchronized to the receiving end.
+   *
+   * @param dataDir data directory
+   */
   void getValidFiles(String dataDir) throws IOException;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
index c08b64f..23af20c 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
@@ -39,12 +39,32 @@ public class SyncFileManager implements ISyncFileManager {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(SyncFileManager.class);
 
+  /**
+   * All storage groups on the disk where the current sync task is executed
+   */
+  private Set<String> allSG;
+
+  /**
+   * Key is storage group, value is the set of current sealed tsfile in the sg.
+   */
   private Map<String, Set<File>> currentSealedLocalFilesMap;
 
+  /**
+   * Key is storage group, value is the set of last local tsfiles in the sg, which don't contains
+   * those tsfiles which are not synced successfully.
+   */
   private Map<String, Set<File>> lastLocalFilesMap;
 
+  /**
+   * Key is storage group, value is the valid set of deleted tsfiles which need to be synced to
+   * receiver end in the sg.
+   */
   private Map<String, Set<File>> deletedFilesMap;
 
+  /**
+   * Key is storage group, value is the valid set of new tsfiles which need to be synced to
+   * receiver end in the sg.
+   */
   private Map<String, Set<File>> toBeSyncedFilesMap;
 
   private SyncFileManager() {
@@ -58,15 +78,18 @@ public class SyncFileManager implements ISyncFileManager {
   @Override
   public void getCurrentLocalFiles(String dataDir) {
     LOGGER.info("Start to get current local files in data folder {}", dataDir);
+
     // get all files in data dir sequence folder
     Map<String, Set<File>> currentAllLocalFiles = new HashMap<>();
     File[] allSGFolders = new File(
         dataDir + File.separatorChar + IoTDBConstant.SEQUENCE_FLODER_NAME)
         .listFiles();
     for (File sgFolder : allSGFolders) {
+      allSG.add(sgFolder.getName());
       currentAllLocalFiles.putIfAbsent(sgFolder.getName(), new HashSet<>());
-      Arrays.stream(sgFolder.listFiles()).forEach(file -> currentAllLocalFiles.get(sgFolder.getName())
-          .add(new File(sgFolder.getAbsolutePath(), file.getName())));
+      Arrays.stream(sgFolder.listFiles())
+          .forEach(file -> currentAllLocalFiles.get(sgFolder.getName())
+              .add(new File(sgFolder.getAbsolutePath(), file.getName())));
     }
 
     // get sealed tsfiles
@@ -92,10 +115,14 @@ public class SyncFileManager implements ISyncFileManager {
     LOGGER.info("Start to get last local files from last local file info {}",
         lastLocalFileInfo.getAbsoluteFile());
     lastLocalFilesMap = new HashMap<>();
+    if(!lastLocalFileInfo.exists()){
+      return;
+    }
     try (BufferedReader reader = new BufferedReader(new FileReader(lastLocalFileInfo))) {
       String fileName;
       while ((fileName = reader.readLine()) != null) {
         String sgName = new File(fileName).getParent();
+        allSG.add(sgName);
         lastLocalFilesMap.putIfAbsent(sgName, new HashSet<>());
         lastLocalFilesMap.get(sgName).add(new File(fileName));
       }
@@ -104,21 +131,21 @@ public class SyncFileManager implements ISyncFileManager {
 
   @Override
   public void getValidFiles(String dataDir) throws IOException {
+    allSG = new HashSet<>();
     getCurrentLocalFiles(dataDir);
     getLastLocalFiles(new File(SyncSenderDescriptor.getInstance().getConfig().getLastFileInfo()));
     toBeSyncedFilesMap = new HashMap<>();
     deletedFilesMap = new HashMap<>();
-    for(Entry<String, Set<File>> entry: currentSealedLocalFilesMap.entrySet()){
-      String sgName = entry.getKey();
+    for (String sgName : allSG) {
       toBeSyncedFilesMap.putIfAbsent(sgName, new HashSet<>());
       deletedFilesMap.putIfAbsent(sgName, new HashSet<>());
-      for(File newFile:currentSealedLocalFilesMap.get(sgName)){
-        if(!lastLocalFilesMap.get(sgName).contains(newFile)){
+      for (File newFile : currentSealedLocalFilesMap.getOrDefault(sgName, new HashSet<>())) {
+        if (!lastLocalFilesMap.getOrDefault(sgName, new HashSet<>()).contains(newFile)) {
           toBeSyncedFilesMap.get(sgName).add(newFile);
         }
       }
-      for(File oldFile:lastLocalFilesMap.get(sgName)){
-        if(!currentSealedLocalFilesMap.get(sgName).contains(oldFile)){
+      for (File oldFile : lastLocalFilesMap.getOrDefault(sgName, new HashSet<>())) {
+        if (!currentSealedLocalFilesMap.getOrDefault(sgName, new HashSet<>()).contains(oldFile)) {
           deletedFilesMap.get(sgName).add(oldFile);
         }
       }
@@ -137,6 +164,10 @@ public class SyncFileManager implements ISyncFileManager {
     return toBeSyncedFilesMap;
   }
 
+  public Set<String> getAllSG() {
+    return allSG;
+  }
+
   private static class SyncFileManagerHolder {
 
     private static final SyncFileManager INSTANCE = new SyncFileManager();
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
index 53cb54f..d0c09b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
@@ -20,14 +20,19 @@ package org.apache.iotdb.db.sync.sender.recover;
 
 import java.util.Set;
 
+/**
+ * This interface is used to restore and clean up the status of the historical synchronization task
+ * with abnormal termination. Through the analysis of the synchronization task log, the completed
+ * progress is merged to prepare for the next synchronization task.
+ */
 public interface ISyncSenderLogAnalyzer {
 
+  void recover();
+
   void loadLastLocalFiles(Set<String> lastLocalFiles);
 
   void loadLogger(Set<String> deletedFiles, Set<String> newFiles);
 
-  void recover();
-
   void clearLogger(Set<String> currentLocalFiles);
 
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
index d1cdf9e..0229579 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
@@ -21,14 +21,40 @@ package org.apache.iotdb.db.sync.sender.recover;
 import java.io.File;
 import java.io.IOException;
 
+/**
+ * This interface is used to log progress in the process of synchronization tasks. If the
+ * synchronization tasks are completed normally and there are no exceptions, the log records will be
+ * deleted; otherwise, the status can be restored according to the log at the start of each task. It
+ * ensures the correctness of synchronization module when system crash or network abnormality
+ * occur.
+ */
 public interface ISyncSenderLogger {
 
+  /**
+   * Start sync deleted files name
+   * @throws IOException
+   */
   void startSyncDeletedFilesName() throws IOException;
 
+  /**
+   * After a deleted file name is synced to the receiver end, record it in sync log.
+   * @param file the deleted tsfile
+   * @throws IOException
+   */
   void finishSyncDeletedFileName(File file) throws IOException;
 
+  /**
+   * Start sync new tsfiles
+   * @throws IOException
+   */
   void startSyncTsFiles() throws IOException;
 
+  /**
+   *
+   * After a new tsfile is synced to the receiver end, record it in sync log.
+   * @param file new tsfile
+   * @throws IOException
+   */
   void finishSyncTsfile(File file) throws IOException;
 
   void close() throws IOException;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
index 9649806..3acfd66 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.db.sync.sender.conf.Constans;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SyncSenderLogAnalyzer implements ISyncSenderLogAnalyzer{
+public class SyncSenderLogAnalyzer implements ISyncSenderLogAnalyzer {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(SyncSenderLogAnalyzer.class);
   private File currentLocalFile;
@@ -109,6 +109,7 @@ public class SyncSenderLogAnalyzer implements ISyncSenderLogAnalyzer{
     } catch (IOException e) {
       LOGGER.error("Can not clear sync log {}", syncLogFile.getAbsoluteFile(), e);
     }
+    lastLocalFile.deleteOnExit();
     currentLocalFile.renameTo(lastLocalFile);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
index fc68960..12803f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
@@ -1,4 +1,21 @@
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.sync.sender.transfer;
 
 import java.io.BufferedReader;
@@ -19,15 +36,18 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.SyncConnectionException;
 import org.apache.iotdb.db.metadata.MetadataConstant;
 import org.apache.iotdb.db.sync.sender.conf.Constans;
@@ -60,30 +80,38 @@ public class DataTransferManager implements IDataTransferManager {
 
   private static final int BATCH_LINE = 1000;
 
+  /**
+   * When transferring schema information, it is a better choice to transfer only new schema
+   * information, avoiding duplicate data transmission. The schema log is self-increasing, so the
+   * location is recorded once after each synchronization task for the next synchronization task to
+   * use.
+   */
   private int schemaFileLinePos;
 
   private TTransport transport;
 
   private SyncService.Client serviceClient;
 
-  /**
-   * Files that need to be synchronized
-   */
+  private Set<String> allSG;
+
   private Map<String, Set<File>> toBeSyncedFilesMap;
 
   private Map<String, Set<File>> deletedFilesMap;
 
-  private Map<String, Set<File>> sucessSyncedFilesMap;
+  private Map<String, Set<File>> lastLocalFilesMap;
 
-  private Map<String, Set<File>> successDeletedFilesMap;
+  private Map<String, Set<File>> successSyncedFilesMap = new HashMap<>();
 
-  private Map<String, Set<File>> lastLocalFilesMap;
+  private Map<String, Set<File>> successDeletedFilesMap = new HashMap<>();
 
   /**
    * If true, sync is in execution.
    **/
   private volatile boolean syncStatus = false;
 
+  /**
+   * Record sync progress in log.
+   */
   private SyncSenderLogger syncLog;
 
   private SyncFileManager syncFileManager = SyncFileManager.getInstance();
@@ -99,7 +127,7 @@ public class DataTransferManager implements IDataTransferManager {
   }
 
   /**
-   * Create a sender and sync files to the receiver.
+   * Create a sender and sync files to the receiver periodically.
    */
   public static void main(String[] args) throws IOException {
     Thread.currentThread().setName(ThreadName.SYNC_CLIENT.getName());
@@ -109,10 +137,9 @@ public class DataTransferManager implements IDataTransferManager {
     fileSenderImpl.startTimedTask();
   }
 
-
   /**
-   * The method is to verify whether the client lock file is locked or not, ensuring that only one
-   * client is running.
+   * Verify whether the client lock file is locked or not, ensuring that only one client is
+   * running.
    */
   private void verifySingleton() throws IOException {
     File lockFile = new File(config.getLockFilePath());
@@ -165,7 +192,7 @@ public class DataTransferManager implements IDataTransferManager {
   }
 
   /**
-   * Start Monitor Thread, monitor sync status
+   * Start monitor thread, which monitor sync status.
    */
   private void startMonitor() {
     executorService.scheduleWithFixedDelay(() -> {
@@ -195,11 +222,12 @@ public class DataTransferManager implements IDataTransferManager {
     executorService = null;
   }
 
+  @Override
   public void syncAll() throws SyncConnectionException, IOException, TException {
 
     // 1. Connect to sync receiver and confirm identity
     establishConnection(config.getServerIp(), config.getServerPort());
-    if (!confirmIdentity(config.getSenderPath())) {
+    if (!confirmIdentity()) {
       logger.error("Sorry, you do not have the permission to connect to sync receiver.");
       System.exit(1);
     }
@@ -212,16 +240,18 @@ public class DataTransferManager implements IDataTransferManager {
     for (String dataDir : dataDirs) {
       logger.info("Start to sync data in data dir {}", dataDir);
       config.update(dataDir);
-      SyncFileManager.getInstance().getValidFiles(dataDir);
-      lastLocalFilesMap = SyncFileManager.getInstance().getLastLocalFilesMap();
-      deletedFilesMap = SyncFileManager.getInstance().getDeletedFilesMap();
-      toBeSyncedFilesMap = SyncFileManager.getInstance().getToBeSyncedFilesMap();
+      syncFileManager.getValidFiles(dataDir);
+      allSG = syncFileManager.getAllSG();
+      lastLocalFilesMap = syncFileManager.getLastLocalFilesMap();
+      deletedFilesMap = syncFileManager.getDeletedFilesMap();
+      toBeSyncedFilesMap = syncFileManager.getToBeSyncedFilesMap();
       checkRecovery();
       if (SyncUtils.isEmpty(deletedFilesMap) && SyncUtils.isEmpty(toBeSyncedFilesMap)) {
         logger.info("There has no data to sync in data dir {}", dataDir);
         continue;
       }
       sync();
+      endSync();
       logger.info("Finish to sync data in data dir {}", dataDir);
     }
 
@@ -236,31 +266,161 @@ public class DataTransferManager implements IDataTransferManager {
     }
   }
 
+  private void checkRecovery() {
+    new SyncSenderLogAnalyzer(config.getSenderPath()).recover();
+  }
+
+  @Override
+  public void establishConnection(String serverIp, int serverPort) throws SyncConnectionException {
+    transport = new TSocket(serverIp, serverPort);
+    TProtocol protocol = new TBinaryProtocol(transport);
+    serviceClient = new SyncService.Client(protocol);
+    try {
+      transport.open();
+    } catch (TTransportException e) {
+      logger.error("Cannot connect to the receiver.");
+      throw new SyncConnectionException(e);
+    }
+  }
+
+  @Override
+  public boolean confirmIdentity() throws SyncConnectionException {
+    try {
+      return serviceClient.checkIdentity(InetAddress.getLocalHost().getHostAddress())
+          == ResultStatus.SUCCESS;
+    } catch (Exception e) {
+      logger.error("Cannot confirm identity with the receiver.");
+      throw new SyncConnectionException(e);
+    }
+  }
+
+  @Override
+  public void syncSchema() throws SyncConnectionException, TException {
+    int retryCount = 0;
+    serviceClient.initSyncData(MetadataConstant.METADATA_LOG);
+    while (true) {
+      if (retryCount > Constans.MAX_SYNC_FILE_TRY) {
+        throw new SyncConnectionException(String
+            .format("Can not sync schema after %s retries.", Constans.MAX_SYNC_FILE_TRY));
+      }
+      try {
+        if (tryToSyncSchema()) {
+          writeSyncSchemaPos(getSchemaPosFile());
+          break;
+        }
+      } finally {
+        retryCount++;
+      }
+    }
+  }
+
+  private boolean tryToSyncSchema() {
+    int schemaPos = readSyncSchemaPos(getSchemaPosFile());
+
+    // start to sync file data and get md5 of this file.
+    try (BufferedReader br = new BufferedReader(new FileReader(getSchemaLogFile()));
+        ByteArrayOutputStream bos = new ByteArrayOutputStream(Constans.DATA_CHUNK_SIZE)) {
+      schemaFileLinePos = 0;
+      while (schemaFileLinePos++ <= schemaPos) {
+        br.readLine();
+      }
+      MessageDigest md = MessageDigest.getInstance(Constans.MESSAGE_DIGIT_NAME);
+      String line;
+      int cntLine = 0;
+      while ((line = br.readLine()) != null) {
+        schemaFileLinePos++;
+        byte[] singleLineData = BytesUtils.stringToBytes(line);
+        bos.write(singleLineData);
+        md.update(singleLineData);
+        if (cntLine++ == BATCH_LINE) {
+          ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
+          bos.reset();
+          if (serviceClient.syncData(buffToSend) == ResultStatus.FAILURE) {
+            logger.error("Receiver failed to receive metadata, retry.");
+            return false;
+          }
+          cntLine = 0;
+        }
+      }
+      if (bos.size() != 0) {
+        ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
+        bos.reset();
+        if (serviceClient.syncData(buffToSend) == ResultStatus.FAILURE) {
+          logger.error("Receiver failed to receive metadata, retry.");
+          return false;
+        }
+      }
+
+      // check md5
+      return checkMD5ForSchema((new BigInteger(1, md.digest())).toString(16));
+    } catch (NoSuchAlgorithmException | IOException | TException e) {
+      logger.error("Can not finish transfer schema to receiver", e);
+      return false;
+    }
+  }
+
   /**
-   * Execute a sync task.
+   * Check MD5 of schema to make sure that the receiver receives the schema correctly
    */
+  private boolean checkMD5ForSchema(String md5OfSender) throws TException {
+    String md5OfReceiver = serviceClient.checkDataMD5(md5OfSender);
+    if (md5OfSender.equals(md5OfReceiver)) {
+      logger.info("Receiver has received schema successfully, retry.");
+      return true;
+    } else {
+      logger
+          .error("MD5 check of schema file {} failed, retry", getSchemaLogFile().getAbsoluteFile());
+      return false;
+    }
+  }
+
+  private int readSyncSchemaPos(File syncSchemaLogFile) {
+    try {
+      if (syncSchemaLogFile.exists()) {
+        try (BufferedReader br = new BufferedReader(new FileReader(syncSchemaLogFile))) {
+          return Integer.parseInt(br.readLine());
+        }
+      }
+    } catch (IOException e) {
+      logger.error("Can not find file {}", syncSchemaLogFile.getAbsoluteFile(), e);
+    }
+    return 0;
+  }
+
+  private void writeSyncSchemaPos(File syncSchemaLogFile) {
+    try {
+      if (!syncSchemaLogFile.exists()) {
+        syncSchemaLogFile.createNewFile();
+      }
+      try (BufferedWriter br = new BufferedWriter(new FileWriter(syncSchemaLogFile))) {
+        br.write(Integer.toString(schemaFileLinePos));
+      }
+    } catch (IOException e) {
+      logger.error("Can not find file {}", syncSchemaLogFile.getAbsoluteFile(), e);
+    }
+  }
+
   @Override
   public void sync() throws IOException {
     try {
       syncStatus = true;
       syncLog = new SyncSenderLogger(getSchemaLogFile());
+      successSyncedFilesMap = new HashMap<>();
+      successDeletedFilesMap = new HashMap<>();
 
-      // 1. Sync data
-      for (Entry<String, Set<File>> entry : deletedFilesMap.entrySet()) {
-        checkRecovery();
+      for (String sgName : allSG) {
         syncLog = new SyncSenderLogger(getSyncLogFile());
-        // TODO deal with the situation
         try {
-          if (serviceClient.init(entry.getKey()) == ResultStatus.FAILURE) {
+          if (serviceClient.init(sgName) == ResultStatus.FAILURE) {
             throw new SyncConnectionException("unable init receiver");
           }
         } catch (TException | SyncConnectionException e) {
           throw new SyncConnectionException("Unable to connect to receiver", e);
         }
-        logger.info("Sync process starts to transfer data of storage group {}", entry.getKey());
-        syncDeletedFilesName(entry.getKey(), entry.getValue());
-        syncDataFilesInOneGroup(entry.getKey(), entry.getValue());
-        clearSyncLog();
+        logger.info("Sync process starts to transfer data of storage group {}", sgName);
+        syncDeletedFilesNameInOneGroup(sgName,
+            deletedFilesMap.getOrDefault(sgName, new HashSet<>()));
+        syncDataFilesInOneGroup(sgName, toBeSyncedFilesMap.getOrDefault(sgName, new HashSet<>()));
       }
 
     } catch (SyncConnectionException e) {
@@ -273,12 +433,9 @@ public class DataTransferManager implements IDataTransferManager {
     }
   }
 
-  private void checkRecovery() {
-    new SyncSenderLogAnalyzer(config.getSenderPath()).recover();
-  }
-
   @Override
-  public void syncDeletedFilesName(String sgName, Set<File> deletedFilesName) throws IOException {
+  public void syncDeletedFilesNameInOneGroup(String sgName, Set<File> deletedFilesName)
+      throws IOException {
     if (deletedFilesName.isEmpty()) {
       logger.info("There has no deleted files to be synced in storage group {}", sgName);
       return;
@@ -313,7 +470,8 @@ public class DataTransferManager implements IDataTransferManager {
       try {
         snapshotFile = makeFileSnapshot(tsfile);
         syncSingleFile(snapshotFile);
-        sucessSyncedFilesMap.get(sgName).add(tsfile);
+        syncSingleFile(new File(snapshotFile, TsFileResource.RESOURCE_SUFFIX));
+        successSyncedFilesMap.get(sgName).add(tsfile);
         syncLog.finishSyncTsfile(tsfile);
         logger.info("Task of synchronization has completed {}/{}.", cnt, toBeSyncFiles.size());
       } catch (IOException e) {
@@ -329,6 +487,11 @@ public class DataTransferManager implements IDataTransferManager {
     logger.info("Sync process has finished storage group {}.", sgName);
   }
 
+  /**
+   * Make snapshot<hard link> for new tsfile and its .restore file.
+   *
+   * @param file new tsfile to be synced
+   */
   private File makeFileSnapshot(File file) throws IOException {
     File snapshotFile = SyncUtils.getSnapshotFile(file);
     if (!snapshotFile.getParentFile().exists()) {
@@ -337,11 +500,16 @@ public class DataTransferManager implements IDataTransferManager {
     Path link = FileSystems.getDefault().getPath(snapshotFile.getAbsolutePath());
     Path target = FileSystems.getDefault().getPath(snapshotFile.getAbsolutePath());
     Files.createLink(link, target);
+    link = FileSystems.getDefault()
+        .getPath(snapshotFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+    target = FileSystems.getDefault()
+        .getPath(snapshotFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+    Files.createLink(link, target);
     return snapshotFile;
   }
 
   /**
-   * Transfer data of a storage group to receiver.
+   * Transfer data of a tsfile to the receiver.
    */
   private void syncSingleFile(File snapshotFile) throws SyncConnectionException {
     try {
@@ -387,156 +555,18 @@ public class DataTransferManager implements IDataTransferManager {
     }
   }
 
-
-  /**
-   * Establish a connection between the sender and the receiver.
-   *
-   * @param serverIp the ip address of the receiver
-   * @param serverPort must be same with port receiver set.
-   */
-  @Override
-  public void establishConnection(String serverIp, int serverPort) throws SyncConnectionException {
-    transport = new TSocket(serverIp, serverPort);
-    TProtocol protocol = new TBinaryProtocol(transport);
-    serviceClient = new SyncService.Client(protocol);
-    try {
-      transport.open();
-    } catch (TTransportException e) {
-      logger.error("Cannot connect to server");
-      throw new SyncConnectionException(e);
-    }
-  }
-
-  /**
-   * UUID marks the identity of sender for receiver.
-   */
-  @Override
-  public boolean confirmIdentity(String uuidPath) throws SyncConnectionException {
-    try {
-      return serviceClient.checkIdentity(InetAddress.getLocalHost().getHostAddress())
-          == ResultStatus.SUCCESS;
-    } catch (Exception e) {
-      logger.error("Cannot confirm identity with receiver");
-      throw new SyncConnectionException(e);
-    }
-  }
-
-  /**
-   * Sync schema with receiver.
-   */
-  @Override
-  public void syncSchema() throws SyncConnectionException, TException {
-    int retryCount = 0;
-    serviceClient.initSyncData(MetadataConstant.METADATA_LOG);
-    while (true) {
-      if (retryCount > Constans.MAX_SYNC_FILE_TRY) {
-        throw new SyncConnectionException(String
-            .format("Can not sync schema after %s tries.", Constans.MAX_SYNC_FILE_TRY));
-      }
-      try {
-        if (tryToSyncSchema()) {
-          writeSyncSchemaPos(getSchemaPosFile());
-          break;
-        }
-      } finally {
-        retryCount++;
-      }
-    }
-  }
-
-  private boolean tryToSyncSchema() {
-    int schemaPos = readSyncSchemaPos(getSchemaPosFile());
-
-    // start to sync file data and get md5 of this file.
-    try (BufferedReader br = new BufferedReader(new FileReader(getSchemaLogFile()));
-        ByteArrayOutputStream bos = new ByteArrayOutputStream(Constans.DATA_CHUNK_SIZE)) {
-      schemaFileLinePos = 0;
-      while (schemaFileLinePos++ <= schemaPos) {
-        br.readLine();
-      }
-      MessageDigest md = MessageDigest.getInstance(Constans.MESSAGE_DIGIT_NAME);
-      String line;
-      int cntLine = 0;
-      while ((line = br.readLine()) != null) {
-        schemaFileLinePos++;
-        byte[] singleLineData = BytesUtils.stringToBytes(line);
-        bos.write(singleLineData);
-        md.update(singleLineData);
-        if (cntLine++ == BATCH_LINE) {
-          ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
-          bos.reset();
-          // PROCESSING_STATUS represents there is still schema buffer to send.
-          if (serviceClient.syncData(buffToSend) == ResultStatus.FAILURE) {
-            logger.error("Receiver failed to receive metadata, retry.");
-            return false;
-          }
-          cntLine = 0;
-        }
-      }
-      if (bos.size() != 0) {
-        ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
-        bos.reset();
-        if (serviceClient.syncData(buffToSend) == ResultStatus.FAILURE) {
-          logger.error("Receiver failed to receive metadata, retry.");
-          return false;
-        }
-      }
-
-      // check md5
-      return checkMD5ForSchema((new BigInteger(1, md.digest())).toString(16));
-    } catch (NoSuchAlgorithmException | IOException | TException e) {
-      logger.error("Can not finish transfer schema to receiver", e);
-      return false;
-    }
-  }
-
-
-  private boolean checkMD5ForSchema(String md5OfSender) throws TException {
-    String md5OfReceiver = serviceClient.checkDataMD5(md5OfSender);
-    if (md5OfSender.equals(md5OfReceiver)) {
-      logger.info("Receiver has received schema successfully, retry.");
-      return true;
-    } else {
-      logger
-          .error("MD5 check of schema file {} failed, retry", getSchemaLogFile().getAbsoluteFile());
-      return false;
-    }
-  }
-
-  private int readSyncSchemaPos(File syncSchemaLogFile) {
-    try {
-      if (syncSchemaLogFile.exists()) {
-        try (BufferedReader br = new BufferedReader(new FileReader(syncSchemaLogFile))) {
-          return Integer.parseInt(br.readLine());
-        }
-      }
-    } catch (IOException e) {
-      logger.error("Can not find file {}", syncSchemaLogFile.getAbsoluteFile(), e);
-    }
-    return 0;
-  }
-
-  private void writeSyncSchemaPos(File syncSchemaLogFile) {
-    try {
-      if (syncSchemaLogFile.exists()) {
-        try (BufferedWriter br = new BufferedWriter(new FileWriter(syncSchemaLogFile))) {
-          br.write(Integer.toString(schemaFileLinePos));
-        }
-      }
-    } catch (IOException e) {
-      logger.error("Can not find file {}", syncSchemaLogFile.getAbsoluteFile(), e);
-    }
-  }
-
-  private void clearSyncLog() {
+  private void endSync() {
+    // 1. Organize current local files based on sync result
     for (Entry<String, Set<File>> entry : lastLocalFilesMap.entrySet()) {
       entry.getValue()
           .removeAll(successDeletedFilesMap.getOrDefault(entry.getKey(), new HashSet<>()));
       entry.getValue()
-          .removeAll(sucessSyncedFilesMap.getOrDefault(entry.getKey(), new HashSet<>()));
+          .removeAll(successSyncedFilesMap.getOrDefault(entry.getKey(), new HashSet<>()));
     }
     File currentLocalFile = getCurrentLogFile();
     File lastLocalFile = new File(config.getLastFileInfo());
+
+    // 2. Write file list to currentLocalFile
     try (BufferedWriter bw = new BufferedWriter(new FileWriter(currentLocalFile))) {
       for (Set<File> currentLocalFiles : lastLocalFilesMap.values()) {
         for (File file : currentLocalFiles) {
@@ -548,7 +578,20 @@ public class DataTransferManager implements IDataTransferManager {
     } catch (IOException e) {
       logger.error("Can not clear sync log {}", lastLocalFile.getAbsoluteFile(), e);
     }
+
+    // 3. Rename currentLocalFile to lastLocalFile
+    lastLocalFile.deleteOnExit();
     currentLocalFile.renameTo(lastLocalFile);
+
+    // 4. delete snapshot directory
+    try {
+      FileUtils.deleteDirectory(new File(config.getSnapshotPath()));
+    } catch (IOException e) {
+      logger.error("Can not clear snapshot directory {}", config.getSnapshotPath(), e);
+    }
+
+    // 5. delete sync log file
+    getSyncLogFile().deleteOnExit();
   }
 
 
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
index 3366b91..bff3f2f 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
@@ -25,45 +25,61 @@ import org.apache.iotdb.db.exception.SyncConnectionException;
 import org.apache.thrift.TException;
 
 /**
- * SyncSender defines the methods of a sender in sync module.
+ * This interface is used to realize the data transmission part of synchronization task, and is also
+ * the most important part of synchronization task. By screening out all transmission files to be
+ * synchronized in <class>SyncFileManager</class>, these files are synchronized to the receiving end
+ * to complete the synchronization task.
  */
 public interface IDataTransferManager {
 
-  /**
-   * Init
-   */
   void init();
 
   /**
-   * Connect to server.
+   * Establish a connection to receiver end.
    */
   void establishConnection(String serverIp, int serverPort) throws SyncConnectionException;
 
   /**
-   * Transfer UUID to receiver.
+   * Confirm identity, the receiver will check whether the sender has synchronization privileges.
    */
-  boolean confirmIdentity(String uuidPath) throws SyncConnectionException, IOException;
+  boolean confirmIdentity() throws SyncConnectionException, IOException;
 
   /**
-   * Send schema file to receiver.
+   * Sync schema file to receiver before all data to be synced.
    */
   void syncSchema() throws SyncConnectionException, TException;
 
-  void syncDeletedFilesName(String sgName, Set<File> deletedFilesName)
+  /**
+   * For deleted files in a storage group, sync them to receiver side and load these data in
+   * receiver.
+   *
+   * @param sgName storage group name
+   * @param deletedFilesName list of deleted file names
+   */
+  void syncDeletedFilesNameInOneGroup(String sgName, Set<File> deletedFilesName)
       throws SyncConnectionException, IOException;
 
   /**
-   * For all valid files, send it to receiver side and load these data in receiver.
+   * Execute a sync task for all data directory.
    */
-  void syncDataFilesInOneGroup(String sgName, Set<File> deletedFilesName)
-      throws SyncConnectionException, IOException;
+  void syncAll() throws SyncConnectionException, IOException, TException;
 
   /**
-   * Execute a sync task.
+   * Execute a sync task for a data directory.
    */
   void sync() throws SyncConnectionException, IOException;
 
   /**
+   * For new valid files in a storage group, sync them to receiver side and load these data in
+   * receiver.
+   *
+   * @param sgName storage group name
+   * @param toBeSyncFiles list of new tsfile names
+   */
+  void syncDataFilesInOneGroup(String sgName, Set<File> toBeSyncFiles)
+      throws SyncConnectionException, IOException;
+
+  /**
    * Stop sync process
    */
   void stop();