You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/08/22 03:14:14 UTC

[incubator-iotdb] 03/05: complete all sender module

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 7b57574ce0e47b6457d2cfe6de086f0d30bba029
Author: lta <li...@163.com>
AuthorDate: Wed Aug 21 17:03:52 2019 +0800

    complete all sender module
---
 .../receiver/transfer/SyncServiceImplBackup.java   | 736 ---------------------
 .../apache/iotdb/db/sync/sender/conf/Constans.java |   2 +-
 .../db/sync/sender/manage/ISyncFileManager.java    |   4 -
 .../db/sync/sender/manage/SyncFileManager.java     |   9 -
 .../sender/recover/ISyncSenderLogAnalyzer.java     |   6 +-
 .../db/sync/sender/recover/ISyncSenderLogger.java  |   8 -
 .../sync/sender/recover/SyncSenderLogAnalyzer.java | 114 ++++
 .../db/sync/sender/recover/SyncSenderLogger.java   |  50 +-
 .../sync/sender/transfer/DataTransferManager.java  | 276 ++++----
 .../sync/sender/transfer/IDataTransferManager.java |  10 +-
 .../java/org/apache/iotdb/db/utils/SyncUtils.java  |   7 +-
 11 files changed, 255 insertions(+), 967 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImplBackup.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImplBackup.java
deleted file mode 100644
index 6f2f754..0000000
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImplBackup.java
+++ /dev/null
@@ -1,736 +0,0 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing,
-// * software distributed under the License is distributed on an
-// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// * KIND, either express or implied.  See the License for the
-// * specific language governing permissions and limitations
-// * under the License.
-// */
-//package org.apache.iotdb.db.sync.receiver.transfer;
-//
-//import java.io.BufferedReader;
-//import java.io.File;
-//import java.io.FileInputStream;
-//import java.io.FileNotFoundException;
-//import java.io.FileOutputStream;
-//import java.io.IOException;
-//import java.math.BigInteger;
-//import java.nio.ByteBuffer;
-//import java.nio.channels.FileChannel;
-//import java.security.MessageDigest;
-//import java.util.ArrayList;
-//import java.util.Collections;
-//import java.util.HashMap;
-//import java.util.HashSet;
-//import java.util.Iterator;
-//import java.util.List;
-//import java.util.Map;
-//import java.util.Map.Entry;
-//import java.util.Set;
-//import org.apache.commons.io.FileUtils;
-//import org.apache.commons.lang3.StringUtils;
-//import org.apache.iotdb.db.concurrent.ThreadName;
-//import org.apache.iotdb.db.conf.IoTDBConfig;
-//import org.apache.iotdb.db.conf.IoTDBDescriptor;
-//import org.apache.iotdb.db.conf.directories.DirectoryManager;
-//import org.apache.iotdb.db.engine.StorageEngine;
-//import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-//import org.apache.iotdb.db.exception.MetadataErrorException;
-//import org.apache.iotdb.db.exception.PathErrorException;
-//import org.apache.iotdb.db.exception.ProcessorException;
-//import org.apache.iotdb.db.exception.StorageEngineException;
-//import org.apache.iotdb.db.metadata.MManager;
-//import org.apache.iotdb.db.metadata.MetadataConstant;
-//import org.apache.iotdb.db.metadata.MetadataOperationType;
-//import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
-//import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-//import org.apache.iotdb.db.sync.sender.conf.Constans;
-//import org.apache.iotdb.db.utils.FilePathUtils;
-//import org.apache.iotdb.db.utils.SyncUtils;
-//import org.apache.iotdb.service.sync.thrift.SyncDataStatus;
-//import org.apache.iotdb.service.sync.thrift.SyncService;
-//import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
-//import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-//import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
-//import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
-//import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-//import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-//import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-//import org.apache.iotdb.tsfile.read.ReadOnlyTsFile;
-//import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-//import org.apache.iotdb.tsfile.read.common.Field;
-//import org.apache.iotdb.tsfile.read.common.Path;
-//import org.apache.iotdb.tsfile.read.common.RowRecord;
-//import org.apache.iotdb.tsfile.read.expression.QueryExpression;
-//import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-//public class SyncServiceImplBackup implements SyncService.Iface {
-//
-//  private static final Logger logger = LoggerFactory.getLogger(SyncServiceImplBackup.class);
-//
-//  private static final StorageEngine STORAGE_GROUP_MANAGER = StorageEngine.getInstance();
-//  /**
-//   * Metadata manager
-//   **/
-//  private static final MManager metadataManger = MManager.getInstance();
-//
-//  private static final String SYNC_SERVER = Constans.SYNC_RECEIVER;
-//
-//  private ThreadLocal<String> uuid = new ThreadLocal<>();
-//  /**
-//   * String means storage group,List means the set of new files(path) in local IoTDB and String
-//   * means path of new Files
-//   **/
-//  private ThreadLocal<Map<String, List<String>>> fileNodeMap = new ThreadLocal<>();
-//  /**
-//   * Map String1 means timeseries String2 means path of new Files, long means startTime
-//   **/
-//  private ThreadLocal<Map<String, Map<String, Long>>> fileNodeStartTime = new ThreadLocal<>();
-//  /**
-//   * Map String1 means timeseries String2 means path of new Files, long means endTime
-//   **/
-//  private ThreadLocal<Map<String, Map<String, Long>>> fileNodeEndTime = new ThreadLocal<>();
-//
-//  /**
-//   * Total num of files that needs to be loaded
-//   */
-//  private ThreadLocal<Integer> fileNum = new ThreadLocal<>();
-//
-//  /**
-//   * IoTDB config
-//   **/
-//  private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-//
-//  /**
-//   * IoTDB data directory
-//   **/
-//  private String baseDir = config.getBaseDir();
-//
-//  /**
-//   * IoTDB  multiple bufferWrite directory
-//   **/
-//  private String[] bufferWritePaths = config.getDataDirs();
-//
-//  /**
-//   * The path to store metadata file of sender
-//   */
-//  private ThreadLocal<String> schemaFromSenderPath = new ThreadLocal<>();
-//
-//  /**
-//   * Sync folder path of server
-//   **/
-//  private String syncFolderPath;
-//
-//  /**
-//   * Sync data path of server
-//   */
-//  private String syncDataPath;
-//
-//  /**
-//   * Init threadLocal variable and delete old useless files.
-//   */
-//  @Override
-//  public boolean init(String storageGroup) {
-//    logger.info("Sync process starts to receive data of storage group {}", storageGroup);
-//    fileNum.set(0);
-//    fileNodeMap.set(new HashMap<>());
-//    fileNodeStartTime.set(new HashMap<>());
-//    fileNodeEndTime.set(new HashMap<>());
-//    try {
-//      FileUtils.deleteDirectory(new File(syncDataPath));
-//    } catch (IOException e) {
-//      logger.error("cannot delete directory {} ", syncFolderPath);
-//      return false;
-//    }
-//    for (String bufferWritePath : bufferWritePaths) {
-//      bufferWritePath = FilePathUtils.regularizePath(bufferWritePath);
-//      String backupPath = bufferWritePath + SYNC_SERVER + File.separator;
-//      File backupDirectory = new File(backupPath, this.uuid.get());
-//      if (backupDirectory.exists() && backupDirectory.list().length != 0) {
-//        try {
-//          FileUtils.deleteDirectory(backupDirectory);
-//        } catch (IOException e) {
-//          logger.error("cannot delete directory {} ", syncFolderPath);
-//          return false;
-//        }
-//      }
-//    }
-//    return true;
-//  }
-//
-//  /**
-//   * Verify IP address of sender
-//   */
-//  @Override
-//  public boolean checkIdentity(String uuid, String ipAddress) {
-//    Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName());
-//    this.uuid.set(uuid);
-//    initPath();
-//    return SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress);
-//  }
-//
-//  /**
-//   * Init file path and clear data if last sync process failed.
-//   */
-//  private void initPath() {
-//    baseDir = FilePathUtils.regularizePath(baseDir);
-//    syncFolderPath = baseDir + SYNC_SERVER + File.separatorChar + this.uuid.get();
-//    syncDataPath = syncFolderPath + File.separatorChar + Constans.DATA_SNAPSHOT_NAME;
-//    schemaFromSenderPath
-//        .set(syncFolderPath + File.separator + MetadataConstant.METADATA_LOG);
-//  }
-//
-//  /**
-//   * Acquire schema from sender
-//   *
-//   * @param status: FINIFSH_STATUS, SUCCESS_STATUS or PROCESSING_STATUS. status = FINISH_STATUS :
-//   * finish receiving schema file, start to sync schema. status = PROCESSING_STATUS : the schema
-//   * file has not received completely.SUCCESS_STATUS: load metadata.
-//   */
-//  @Override
-//  public String syncSchema(String md5, ByteBuffer schema, SyncDataStatus status) {
-//    String md5OfReceiver = Boolean.toString(Boolean.TRUE);
-//    if (status == SyncDataStatus.SUCCESS_STATUS) {
-//      /** sync metadata, include storage group and timeseries **/
-//      return Boolean.toString(loadMetadata());
-//    } else if (status == SyncDataStatus.PROCESSING_STATUS) {
-//      File file = new File(schemaFromSenderPath.get());
-//      if (!file.getParentFile().exists()) {
-//        try {
-//          file.getParentFile().mkdirs();
-//          file.createNewFile();
-//        } catch (IOException e) {
-//          logger.error("Cannot make schema file {}.", file.getPath(), e);
-//          md5OfReceiver = Boolean.toString(Boolean.FALSE);
-//        }
-//      }
-//      try (FileOutputStream fos = new FileOutputStream(file, true);
-//          FileChannel channel = fos.getChannel()) {
-//        channel.write(schema);
-//      } catch (Exception e) {
-//        logger.error("Cannot insert data to file {}.", file.getPath(), e);
-//        md5OfReceiver = Boolean.toString(Boolean.FALSE);
-//      }
-//    } else {
-//      try (FileInputStream fis = new FileInputStream(schemaFromSenderPath.get())) {
-//        MessageDigest md = MessageDigest.getInstance("MD5");
-//        byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE];
-//        int n;
-//        while ((n = fis.read(buffer)) != -1) {
-//          md.update(buffer, 0, n);
-//        }
-//        md5OfReceiver = (new BigInteger(1, md.digest())).toString(16);
-//        if (!md5.equals(md5OfReceiver)) {
-//          FileUtils.forceDelete(new File(schemaFromSenderPath.get()));
-//        }
-//      } catch (Exception e) {
-//        logger.error("Receiver cannot generate md5 {}", schemaFromSenderPath.get(), e);
-//      }
-//    }
-//    return md5OfReceiver;
-//  }
-//
-//  /**
-//   * Load metadata from sender
-//   */
-//  private boolean loadMetadata() {
-//    if (new File(schemaFromSenderPath.get()).exists()) {
-//      try (BufferedReader br = new BufferedReader(
-//          new java.io.FileReader(schemaFromSenderPath.get()))) {
-//        String metadataOperation;
-//        while ((metadataOperation = br.readLine()) != null) {
-//          operation(metadataOperation);
-//        }
-//      } catch (FileNotFoundException e) {
-//        logger.error("Cannot read the file {}.",
-//            schemaFromSenderPath.get(), e);
-//        return false;
-//      } catch (IOException e) {
-//        /** multiple insert schema, ignore it **/
-//      } catch (Exception e) {
-//        logger.error("Parse metadata operation failed.", e);
-//        return false;
-//      }
-//    }
-//    return true;
-//  }
-//
-//  /**
-//   * Operate metadata operation in MManager
-//   *
-//   * @param cmd metadata operation
-//   */
-//  private void operation(String cmd)
-//      throws PathErrorException, IOException, MetadataErrorException {
-//    String[] args = cmd.trim().split(",");
-//    switch (args[0]) {
-//      case MetadataOperationType.ADD_PATH_TO_MTREE:
-//        Map<String, String> props;
-//        String[] kv;
-//        props = new HashMap<>(args.length - 5 + 1, 1);
-//        for (int k = 5; k < args.length; k++) {
-//          kv = args[k].split("=");
-//          props.put(kv[0], kv[1]);
-//        }
-//        metadataManger.addPathToMTree(new Path(args[1]), TSDataType.deserialize(Short.valueOf(args[2])),
-//            TSEncoding.deserialize(Short.valueOf(args[3])),
-//            CompressionType.deserialize(Short.valueOf(args[4])),
-//            props);
-//        break;
-//      case MetadataOperationType.DELETE_PATH_FROM_MTREE:
-//        metadataManger.deletePaths(Collections.singletonList(new Path(args[1])));
-//        break;
-//      case MetadataOperationType.SET_STORAGE_LEVEL_TO_MTREE:
-//        metadataManger.setStorageLevelToMTree(args[1]);
-//        break;
-//      case MetadataOperationType.ADD_A_PTREE:
-//        metadataManger.addAPTree(args[1]);
-//        break;
-//      case MetadataOperationType.ADD_A_PATH_TO_PTREE:
-//        metadataManger.addPathToPTree(args[1]);
-//        break;
-//      case MetadataOperationType.DELETE_PATH_FROM_PTREE:
-//        metadataManger.deletePathFromPTree(args[1]);
-//        break;
-//      case MetadataOperationType.LINK_MNODE_TO_PTREE:
-//        metadataManger.linkMNodeToPTree(args[1], args[2]);
-//        break;
-//      case MetadataOperationType.UNLINK_MNODE_FROM_PTREE:
-//        metadataManger.unlinkMNodeFromPTree(args[1], args[2]);
-//        break;
-//      default:
-//        logger.error("Unrecognizable command {}", cmd);
-//    }
-//  }
-//
-//  /**
-//   * Start receiving tsfile from sender
-//   *
-//   * @param status status = SUCCESS_STATUS : finish receiving one tsfile status = PROCESSING_STATUS
-//   * : tsfile has not received completely.
-//   */
-//  @Override
-//  public String syncData(String md5OfSender, List<String> filePathSplit,
-//      ByteBuffer dataToReceive, SyncDataStatus status) {
-//    String md5OfReceiver = Boolean.toString(Boolean.TRUE);
-//    FileChannel channel;
-//    /** Recombination File Path **/
-//    String filePath = StringUtils.join(filePathSplit, File.separatorChar);
-//    syncDataPath = FilePathUtils.regularizePath(syncDataPath);
-//    filePath = syncDataPath + filePath;
-//    if (status == SyncDataStatus.PROCESSING_STATUS) { // there are still data stream to add
-//      File file = new File(filePath);
-//      if (!file.getParentFile().exists()) {
-//        try {
-//          file.getParentFile().mkdirs();
-//          file.createNewFile();
-//        } catch (IOException e) {
-//          logger.error("cannot make file {}", file.getPath(), e);
-//          md5OfReceiver = Boolean.toString(Boolean.FALSE);
-//        }
-//      }
-//      try (FileOutputStream fos = new FileOutputStream(file, true)) {// append new data
-//        channel = fos.getChannel();
-//        channel.write(dataToReceive);
-//      } catch (IOException e) {
-//        logger.error("cannot insert data to file {}", file.getPath(), e);
-//        md5OfReceiver = Boolean.toString(Boolean.FALSE);
-//
-//      }
-//    } else { // all data in the same file has received successfully
-//      try (FileInputStream fis = new FileInputStream(filePath)) {
-//        MessageDigest md = MessageDigest.getInstance("MD5");
-//        byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE];
-//        int n;
-//        while ((n = fis.read(buffer)) != -1) {
-//          md.update(buffer, 0, n);
-//        }
-//        md5OfReceiver = (new BigInteger(1, md.digest())).toString(16);
-//        if (md5OfSender.equals(md5OfReceiver)) {
-//          fileNum.set(fileNum.get() + 1);
-//
-//          logger.info(String.format("Receiver has received %d files from sender", fileNum.get()));
-//        } else {
-//          FileUtils.forceDelete(new File(filePath));
-//        }
-//      } catch (Exception e) {
-//        logger.error("Receiver cannot generate md5 {}", filePath, e);
-//      }
-//    }
-//    return md5OfReceiver;
-//  }
-//
-//
-//  @Override
-//  public boolean load() {
-//    try {
-//      getFileNodeInfo();
-//      loadData();
-//    } catch (Exception e) {
-//      logger.error("fail to load data", e);
-//      return false;
-//    }
-//    return true;
-//  }
-//
-//  /**
-//   * Get all tsfiles' info which are sent from sender, it is preparing for merging these data
-//   */
-//  public void getFileNodeInfo() throws IOException {
-//    File dataFileRoot = new File(syncDataPath);
-//    File[] files = dataFileRoot.listFiles();
-//    int processedNum = 0;
-//    for (File storageGroupPB : files) {
-//      List<String> filesPath = new ArrayList<>();
-//      File[] filesSG = storageGroupPB.listFiles();
-//      for (File fileTF : filesSG) { // fileTF means TsFiles
-//        Map<String, Long> startTimeMap = new HashMap<>();
-//        Map<String, Long> endTimeMap = new HashMap<>();
-//        TsFileSequenceReader reader = null;
-//        try {
-//          reader = new TsFileSequenceReader(fileTF.getPath());
-//          Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
-//          Iterator<String> it = deviceIdMap.keySet().iterator();
-//          while (it.hasNext()) {
-//            String key = it.next();
-//            TsDeviceMetadataIndex device = deviceIdMap.get(key);
-//            startTimeMap.put(key, device.getStartTime());
-//            endTimeMap.put(key, device.getEndTime());
-//          }
-//        } catch (IOException e) {
-//          logger.error("Unable to read tsfile {}", fileTF.getPath());
-//          throw new IOException(e);
-//        } finally {
-//          try {
-//            if (reader != null) {
-//              reader.close();
-//            }
-//          } catch (IOException e) {
-//            logger.error("Cannot close tsfile stream {}", fileTF.getPath());
-//            throw new IOException(e);
-//          }
-//        }
-//        fileNodeStartTime.get().put(fileTF.getPath(), startTimeMap);
-//        fileNodeEndTime.get().put(fileTF.getPath(), endTimeMap);
-//        filesPath.add(fileTF.getPath());
-//        processedNum++;
-//        logger.info(String
-//            .format("Get tsfile info has complete : %d/%d", processedNum, fileNum.get()));
-//        fileNodeMap.get().put(storageGroupPB.getName(), filesPath);
-//      }
-//    }
-//  }
-//
-//
-//  /**
-//   * It is to merge data. If data in the tsfile is new, append the tsfile to the storage group
-//   * directly. If data in the tsfile is old, it has two strategy to merge.It depends on the
-//   * possibility of updating historical data.
-//   */
-//  public void loadData() throws StorageEngineException {
-//    syncDataPath = FilePathUtils.regularizePath(syncDataPath);
-//    int processedNum = 0;
-//    for (String storageGroup : fileNodeMap.get().keySet()) {
-//      List<String> filesPath = fileNodeMap.get().get(storageGroup);
-//      /**  before load external tsFile, it is necessary to order files in the same storage group **/
-//      Collections.sort(filesPath, (o1, o2) -> {
-//        Map<String, Long> startTimePath1 = fileNodeStartTime.get().get(o1);
-//        Map<String, Long> endTimePath2 = fileNodeEndTime.get().get(o2);
-//        for (Entry<String, Long> entry : endTimePath2.entrySet()) {
-//          if (startTimePath1.containsKey(entry.getKey())) {
-//            if (startTimePath1.get(entry.getKey()) > entry.getValue()) {
-//              return 1;
-//            } else {
-//              return -1;
-//            }
-//          }
-//        }
-//        return 0;
-//      });
-//
-//      for (String path : filesPath) {
-//        // get startTimeMap and endTimeMap
-//        Map<String, Long> startTimeMap = fileNodeStartTime.get().get(path);
-//        Map<String, Long> endTimeMap = fileNodeEndTime.get().get(path);
-//
-//        // create a new fileNode
-//        String header = syncDataPath;
-//        String relativePath = path.substring(header.length());
-//        TsFileResource fileNode = new TsFileResource(
-//            new File(DirectoryManager.getInstance().getNextFolderIndexForSequenceFile() +
-//                File.separator + relativePath), startTimeMap, endTimeMap
-//        );
-//        // call interface of load external file
-//        try {
-//          if (!STORAGE_GROUP_MANAGER.appendFileToStorageGroupProcessor(storageGroup, fileNode, path)) {
-//            // it is a file with unsequence data
-//            if (config.isUpdateHistoricalDataPossibility()) {
-//              loadOldData(path);
-//            } else {
-//              List<String> overlapFiles = STORAGE_GROUP_MANAGER.getOverlapFiles(
-//                  storageGroup,
-//                  fileNode, uuid.get());
-//              if (overlapFiles.isEmpty()) {
-//                loadOldData(path);
-//              } else {
-//                loadOldData(path, overlapFiles);
-//              }
-//            }
-//          }
-//        } catch (StorageEngineException | IOException | ProcessorException e) {
-//          logger.error("Can not load external file {}", path);
-//          throw new StorageEngineException(e);
-//        }
-//        processedNum++;
-//        logger.info(String
-//            .format("Merging files has completed : %d/%d", processedNum, fileNum.get()));
-//      }
-//    }
-//  }
-//
-//  /**
-//   * Insert all data in the tsfile into IoTDB.
-//   */
-//  public void loadOldData(String filePath) throws IOException, ProcessorException {
-//    Set<String> timeseriesSet = new HashSet<>();
-//    TsFileSequenceReader reader = null;
-//    QueryProcessExecutor insertExecutor = new QueryProcessExecutor();
-//    try {
-//      /** use tsfile reader to get data **/
-//      reader = new TsFileSequenceReader(filePath);
-//      Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
-//      Iterator<Entry<String, TsDeviceMetadataIndex>> entryIterator = deviceIdMap.entrySet()
-//          .iterator();
-//      while (entryIterator.hasNext()) {
-//        Entry<String, TsDeviceMetadataIndex> deviceMIEntry = entryIterator.next();
-//        String deviceId = deviceMIEntry.getKey();
-//        TsDeviceMetadataIndex deviceMI = deviceMIEntry.getValue();
-//        TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(deviceMI);
-//        List<ChunkGroupMetaData> rowGroupMetadataList = deviceMetadata.getChunkGroupMetaDataList();
-//        timeseriesSet.clear();
-//        /** firstly, get all timeseries in the same device **/
-//        for (ChunkGroupMetaData chunkGroupMetaData : rowGroupMetadataList) {
-//          List<ChunkMetaData> chunkMetaDataList = chunkGroupMetaData
-//              .getChunkMetaDataList();
-//          for (ChunkMetaData chunkMetaData : chunkMetaDataList) {
-//            String measurementUID = chunkMetaData.getMeasurementUid();
-//            measurementUID = deviceId + "." + measurementUID;
-//            timeseriesSet.add(measurementUID);
-//          }
-//        }
-//        /** Secondly, use tsFile Reader to form InsertPlan **/
-//        ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader);
-//        List<Path> paths = new ArrayList<>();
-//        paths.clear();
-//        for (String timeseries : timeseriesSet) {
-//          paths.add(new Path(timeseries));
-//        }
-//        QueryExpression queryExpression = QueryExpression.create(paths, null);
-//        QueryDataSet queryDataSet = readTsFile.query(queryExpression);
-//        while (queryDataSet.hasNext()) {
-//          RowRecord record = queryDataSet.next();
-//          List<Field> fields = record.getFields();
-//          List<String> measurementList = new ArrayList<>();
-//          List<String> insertValues = new ArrayList<>();
-//          for (int i = 0; i < fields.size(); i++) {
-//            Field field = fields.get(i);
-//            if (!field.isNull()) {
-//              measurementList.add(paths.get(i).getMeasurement());
-//              if (fields.get(i).getDataType() == TSDataType.TEXT) {
-//                insertValues.add(String.format("'%s'", field.toString()));
-//              } else {
-//                insertValues.add(String.format("%s", field.toString()));
-//              }
-//            }
-//          }
-//          if (insertExecutor.insert(new InsertPlan(deviceId, record.getTimestamp(),
-//              measurementList.toArray(new String[0]), insertValues.toArray(new String[0])))) {
-//            throw new IOException("Inserting series data to IoTDB engine has failed.");
-//          }
-//        }
-//      }
-//    } catch (IOException e) {
-//      logger.error("Can not parse tsfile into SQL", e);
-//      throw new IOException(e);
-//    } catch (ProcessorException e) {
-//      logger.error("Meet error while processing non-query.");
-//      throw new ProcessorException(e);
-//    } finally {
-//      try {
-//        if (reader != null) {
-//          reader.close();
-//        }
-//      } catch (IOException e) {
-//        logger.error("Cannot close file stream {}", filePath, e);
-//      }
-//    }
-//  }
-//
-//  /**
-//   * Insert those valid data in the tsfile into IoTDB
-//   *
-//   * @param overlapFiles:files which are conflict with the sync file
-//   */
-//  public void loadOldData(String filePath, List<String> overlapFiles)
-//      throws IOException, ProcessorException {
-//    Set<String> timeseriesList = new HashSet<>();
-//    QueryProcessExecutor insertExecutor = new QueryProcessExecutor();
-//    Map<String, ReadOnlyTsFile> tsfilesReaders = openReaders(filePath, overlapFiles);
-//    try {
-//      TsFileSequenceReader reader = new TsFileSequenceReader(filePath);
-//      Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap();
-//      Iterator<String> it = deviceIdMap.keySet().iterator();
-//      while (it.hasNext()) {
-//        String deviceID = it.next();
-//        TsDeviceMetadataIndex deviceMI = deviceIdMap.get(deviceID);
-//        TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(deviceMI);
-//        List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata
-//            .getChunkGroupMetaDataList();
-//        timeseriesList.clear();
-//        /** firstly, get all timeseries in the same device **/
-//        for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
-//          List<ChunkMetaData> chunkMetaDataList = chunkGroupMetaData.getChunkMetaDataList();
-//          for (ChunkMetaData timeSeriesChunkMetaData : chunkMetaDataList) {
-//            String measurementUID = timeSeriesChunkMetaData.getMeasurementUid();
-//            measurementUID = deviceID + "." + measurementUID;
-//            timeseriesList.add(measurementUID);
-//          }
-//        }
-//        reader.close();
-//
-//        /** secondly, use tsFile Reader to form SQL **/
-//        ReadOnlyTsFile readOnlyTsFile = tsfilesReaders.get(filePath);
-//        List<Path> paths = new ArrayList<>();
-//        /** compare data with one timeseries in a round to get valid data **/
-//        for (String timeseries : timeseriesList) {
-//          paths.clear();
-//          paths.add(new Path(timeseries));
-//          Set<InsertPlan> originDataPoints = new HashSet<>();
-//          QueryExpression queryExpression = QueryExpression.create(paths, null);
-//          QueryDataSet queryDataSet = readOnlyTsFile.query(queryExpression);
-//          Set<InsertPlan> newDataPoints = convertToInserPlans(queryDataSet, paths, deviceID);
-//
-//          /** get all data with the timeseries in all overlap files. **/
-//          for (String overlapFile : overlapFiles) {
-//            ReadOnlyTsFile readTsFileOverlap = tsfilesReaders.get(overlapFile);
-//            QueryDataSet queryDataSetOverlap = readTsFileOverlap.query(queryExpression);
-//            originDataPoints.addAll(convertToInserPlans(queryDataSetOverlap, paths, deviceID));
-//          }
-//
-//          /** If there has no overlap data with the timeseries, inserting all data in the sync file **/
-//          if (originDataPoints.isEmpty()) {
-//            for (InsertPlan insertPlan : newDataPoints) {
-//              if (insertExecutor.insert(insertPlan)) {
-//                throw new IOException("Inserting series data to IoTDB engine has failed.");
-//              }
-//            }
-//          } else {
-//            /** Compare every data to get valid data **/
-//            for (InsertPlan insertPlan : newDataPoints) {
-//              if (!originDataPoints.contains(insertPlan)) {
-//                if (insertExecutor.insert(insertPlan)) {
-//                  throw new IOException("Inserting series data to IoTDB engine has failed.");
-//                }
-//              }
-//            }
-//          }
-//        }
-//      }
-//    } catch (IOException e) {
-//      logger.error("Can not parse tsfile into SQL", e);
-//      throw new IOException(e);
-//    } catch (ProcessorException e) {
-//      logger.error("Meet error while processing non-query.", e);
-//      throw new ProcessorException(e);
-//    } finally {
-//      try {
-//        closeReaders(tsfilesReaders);
-//      } catch (IOException e) {
-//        logger.error("Cannot close file stream {}", filePath, e);
-//      }
-//    }
-//  }
-//
-//  private Set<InsertPlan> convertToInserPlans(QueryDataSet queryDataSet, List<Path> paths, String deviceID) throws IOException {
-//    Set<InsertPlan> plans = new HashSet<>();
-//    while (queryDataSet.hasNext()) {
-//      RowRecord record = queryDataSet.next();
-//      List<Field> fields = record.getFields();
-//      /** get all data with the timeseries in the sync file **/
-//      for (int i = 0; i < fields.size(); i++) {
-//        Field field = fields.get(i);
-//        String[] measurementList = new String[1];
-//        if (!field.isNull()) {
-//          measurementList[0] = paths.get(i).getMeasurement();
-//          InsertPlan insertPlan = new InsertPlan(deviceID, record.getTimestamp(),
-//              measurementList, new String[]{field.getDataType() == TSDataType.TEXT ? String.format("'%s'", field.toString())
-//              : field.toString()});
-//          plans.add(insertPlan);
-//        }
-//      }
-//    }
-//    return plans;
-//  }
-//
-//  /**
-//   * Open all tsfile reader and cache
-//   */
-//  private Map<String, ReadOnlyTsFile> openReaders(String filePath, List<String> overlapFiles)
-//      throws IOException {
-//    Map<String, ReadOnlyTsFile> tsfileReaders = new HashMap<>();
-//    tsfileReaders.put(filePath, new ReadOnlyTsFile(new TsFileSequenceReader(filePath)));
-//    for (String overlapFile : overlapFiles) {
-//      tsfileReaders.put(overlapFile, new ReadOnlyTsFile(new TsFileSequenceReader(overlapFile)));
-//    }
-//    return tsfileReaders;
-//  }
-//
-//  /**
-//   * Close all tsfile reader
-//   */
-//  private void closeReaders(Map<String, ReadOnlyTsFile> readers) throws IOException {
-//    for (ReadOnlyTsFile tsfileReader : readers.values()) {
-//      tsfileReader.close();
-//    }
-//  }
-//
-//  /**
-//   * Release threadLocal variable resources
-//   */
-//  @Override
-//  public void cleanUp() {
-//    uuid.remove();
-//    fileNum.remove();
-//    fileNodeMap.remove();
-//    fileNodeStartTime.remove();
-//    fileNodeEndTime.remove();
-//    schemaFromSenderPath.remove();
-//    try {
-//      FileUtils.deleteDirectory(new File(syncFolderPath));
-//    } catch (IOException e) {
-//      logger.error("can not delete directory {}", syncFolderPath, e);
-//    }
-//    logger.info("Synchronization has finished!");
-//  }
-//
-//  public Map<String, List<String>> getFileNodeMap() {
-//    return fileNodeMap.get();
-//  }
-//
-//  public void setFileNodeMap(Map<String, List<String>> fileNodeMap) {
-//    this.fileNodeMap.set(fileNodeMap);
-//  }
-//
-//}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
index 05b67c0..dfe3c52 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
@@ -30,9 +30,9 @@ public class Constans {
   public static final String LOCK_FILE_NAME = "sync_lock";
   public static final String SCHEMA_POS_FILE_NAME = "sync_schema_pos";
   public static final String LAST_LOCAL_FILE_NAME = "last_local_files.txt";
+  public static final String CURRENT_LOCAL_FILE_NAME = "current_local_files.txt";
   public static final String DATA_SNAPSHOT_NAME = "snapshot";
   public static final String SYNC_LOG_NAME = "sync.log";
-  public static final String CURRENT_SYNC_LOG_NAME = "current_sync.log";
 
   public static final String MESSAGE_DIGIT_NAME = "MD5";
   public static final String SYNC_DIR_NAME_SEPARATOR = "_";
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
index d4f7e33..1a529be 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
@@ -20,8 +20,6 @@ package org.apache.iotdb.db.sync.sender.manage;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
 
 public interface ISyncFileManager {
 
@@ -30,6 +28,4 @@ public interface ISyncFileManager {
   void getLastLocalFiles(File lastLocalFile) throws IOException;
 
   void getValidFiles(String dataDir) throws IOException;
-
-  void updateLastLocalFiles(File lastLocalFile, Set<String> localFiles);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
index 20f1dca..c08b64f 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
@@ -125,15 +125,6 @@ public class SyncFileManager implements ISyncFileManager {
     }
   }
 
-  @Override
-  public void updateLastLocalFiles(File lastLocalFile, Set<String> localFiles) {
-
-  }
-
-  public Map<String, Set<File>> getCurrentSealedLocalFilesMap() {
-    return currentSealedLocalFilesMap;
-  }
-
   public Map<String, Set<File>> getLastLocalFilesMap() {
     return lastLocalFilesMap;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
index 3b33f78..53cb54f 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
@@ -22,12 +22,12 @@ import java.util.Set;
 
 public interface ISyncSenderLogAnalyzer {
 
-  void loadLastLocalFiles(Set<String> set);
+  void loadLastLocalFiles(Set<String> lastLocalFiles);
 
-  void loadLogger(Set<String> set);
+  void loadLogger(Set<String> deletedFiles, Set<String> newFiles);
 
   void recover();
 
-  void clearLogger();
+  void clearLogger(Set<String> currentLocalFiles);
 
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
index 15df693..d1cdf9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
@@ -23,22 +23,14 @@ import java.io.IOException;
 
 public interface ISyncSenderLogger {
 
-  void startSync() throws IOException;
-
-  void endSync() throws IOException;
-
   void startSyncDeletedFilesName() throws IOException;
 
   void finishSyncDeletedFileName(File file) throws IOException;
 
-  void endSyncDeletedFilsName() throws IOException;
-
   void startSyncTsFiles() throws IOException;
 
   void finishSyncTsfile(File file) throws IOException;
 
-  void endSyncTsFiles() throws IOException;
-
   void close() throws IOException;
 
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
new file mode 100644
index 0000000..9649806
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.sender.recover;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.iotdb.db.sync.sender.conf.Constans;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SyncSenderLogAnalyzer implements ISyncSenderLogAnalyzer{
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SyncSenderLogAnalyzer.class);
+  private File currentLocalFile;
+  private File lastLocalFile;
+  private File syncLogFile;
+
+  public SyncSenderLogAnalyzer(String senderPath) {
+    this.currentLocalFile = new File(senderPath, Constans.CURRENT_LOCAL_FILE_NAME);
+    this.lastLocalFile = new File(senderPath, Constans.LAST_LOCAL_FILE_NAME);
+    this.syncLogFile = new File(senderPath, Constans.SYNC_LOG_NAME);
+  }
+
+  @Override
+  public void recover() {
+    if (currentLocalFile.exists() && !lastLocalFile.exists()) {
+      currentLocalFile.renameTo(lastLocalFile);
+    } else {
+      Set<String> lastLocalFiles = new HashSet<>();
+      Set<String> deletedFiles = new HashSet<>();
+      Set<String> newFiles = new HashSet<>();
+      loadLastLocalFiles(lastLocalFiles);
+      loadLogger(deletedFiles, newFiles);
+      lastLocalFiles.removeAll(deletedFiles);
+      lastLocalFiles.addAll(newFiles);
+      clearLogger(lastLocalFiles);
+    }
+  }
+
+  @Override
+  public void loadLastLocalFiles(Set<String> lastLocalFiles) {
+    try (BufferedReader br = new BufferedReader(new FileReader(lastLocalFile))) {
+      String line;
+      while ((line = br.readLine()) != null) {
+        lastLocalFiles.add(line);
+      }
+    } catch (IOException e) {
+      LOGGER
+          .error("Can not load last local file list from file {}", lastLocalFile.getAbsoluteFile(),
+              e);
+    }
+  }
+
+  @Override
+  public void loadLogger(Set<String> deletedFiles, Set<String> newFiles) {
+    try (BufferedReader br = new BufferedReader(new FileReader(syncLogFile))) {
+      String line;
+      int mode = 0;
+      while ((line = br.readLine()) != null) {
+        if (line.equals(SyncSenderLogger.SYNC_DELETED_FILE_NAME_START)) {
+          mode = -1;
+        } else if (line.equals(SyncSenderLogger.SYNC_TSFILE_START)) {
+          mode = 1;
+        } else {
+          if (mode == -1) {
+            deletedFiles.add(line);
+          } else if (mode == 1) {
+            newFiles.add(line);
+          }
+        }
+      }
+    } catch (IOException e) {
+      LOGGER
+          .error("Can not load last local file list from file {}", lastLocalFile.getAbsoluteFile(),
+              e);
+    }
+  }
+
+  @Override
+  public void clearLogger(Set<String> currentLocalFiles) {
+    try (BufferedWriter bw = new BufferedWriter(new FileWriter(currentLocalFile))) {
+      for (String line : currentLocalFiles) {
+        bw.write(line);
+        bw.newLine();
+      }
+      bw.flush();
+    } catch (IOException e) {
+      LOGGER.error("Can not clear sync log {}", syncLogFile.getAbsoluteFile(), e);
+    }
+    currentLocalFile.renameTo(lastLocalFile);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java
index 8171d0f..8e118d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.sync.sender.recover;
 
 import java.io.BufferedWriter;
@@ -7,12 +25,8 @@ import java.io.IOException;
 
 public class SyncSenderLogger implements ISyncSenderLogger {
 
-  public static final String SYNC_START = "sync start";
-  public static final String SYNC_END = "sync end";
   public static final String SYNC_DELETED_FILE_NAME_START = "sync deleted file names start";
-  public static final String SYNC_DELETED_FILE_NAME_END = "sync deleted file names end";
   public static final String SYNC_TSFILE_START = "sync tsfile start";
-  public static final String SYNC_TSFILE_END = "sync tsfile end";
   private BufferedWriter bw;
 
   public SyncSenderLogger(String filePath) throws IOException {
@@ -24,20 +38,6 @@ public class SyncSenderLogger implements ISyncSenderLogger {
   }
 
   @Override
-  public void startSync() throws IOException {
-    bw.write(SYNC_START);
-    bw.newLine();
-    bw.flush();
-  }
-
-  @Override
-  public void endSync() throws IOException {
-    bw.write(SYNC_END);
-    bw.newLine();
-    bw.flush();
-  }
-
-  @Override
   public void startSyncDeletedFilesName() throws IOException {
     bw.write(SYNC_DELETED_FILE_NAME_START);
     bw.newLine();
@@ -52,13 +52,6 @@ public class SyncSenderLogger implements ISyncSenderLogger {
   }
 
   @Override
-  public void endSyncDeletedFilsName() throws IOException {
-    bw.write(SYNC_DELETED_FILE_NAME_END);
-    bw.newLine();
-    bw.flush();
-  }
-
-  @Override
   public void startSyncTsFiles() throws IOException {
     bw.write(SYNC_TSFILE_START);
     bw.newLine();
@@ -73,13 +66,6 @@ public class SyncSenderLogger implements ISyncSenderLogger {
   }
 
   @Override
-  public void endSyncTsFiles() throws IOException {
-    bw.write(SYNC_TSFILE_END);
-    bw.newLine();
-    bw.flush();
-  }
-
-  @Override
   public void close() throws IOException {
     bw.close();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
index c616376..fc68960 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
@@ -1,16 +1,4 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements.  See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with the License.  You may obtain
- * a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied.  See the License for the specific language
- * governing permissions and limitations under the License.
- */
+
 package org.apache.iotdb.db.sync.sender.transfer;
 
 import java.io.BufferedReader;
@@ -18,8 +6,6 @@ import java.io.BufferedWriter;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
@@ -33,12 +19,10 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
@@ -50,10 +34,10 @@ import org.apache.iotdb.db.sync.sender.conf.Constans;
 import org.apache.iotdb.db.sync.sender.conf.SyncSenderConfig;
 import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor;
 import org.apache.iotdb.db.sync.sender.manage.SyncFileManager;
+import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogAnalyzer;
 import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogger;
 import org.apache.iotdb.db.utils.SyncUtils;
 import org.apache.iotdb.service.sync.thrift.ResultStatus;
-import org.apache.iotdb.service.sync.thrift.SyncDataStatus;
 import org.apache.iotdb.service.sync.thrift.SyncService;
 import org.apache.iotdb.tsfile.utils.BytesUtils;
 import org.apache.thrift.TException;
@@ -91,7 +75,7 @@ public class DataTransferManager implements IDataTransferManager {
 
   private Map<String, Set<File>> sucessSyncedFilesMap;
 
-  private Map<String, Set<File>> successDeleyedFilesMap;
+  private Map<String, Set<File>> successDeletedFilesMap;
 
   private Map<String, Set<File>> lastLocalFilesMap;
 
@@ -102,11 +86,6 @@ public class DataTransferManager implements IDataTransferManager {
 
   private SyncSenderLogger syncLog;
 
-  /**
-   * Key means storage group, Set means corresponding tsfiles
-   **/
-  private Map<String, Set<String>> validFileSnapshot = new HashMap<>();
-
   private SyncFileManager syncFileManager = SyncFileManager.getInstance();
 
   private ScheduledExecutorService executorService;
@@ -130,6 +109,53 @@ public class DataTransferManager implements IDataTransferManager {
     fileSenderImpl.startTimedTask();
   }
 
+
+  /**
+   * The method is to verify whether the client lock file is locked or not, ensuring that only one
+   * client is running.
+   */
+  private void verifySingleton() throws IOException {
+    File lockFile = new File(config.getLockFilePath());
+    if (!lockFile.getParentFile().exists()) {
+      lockFile.getParentFile().mkdirs();
+    }
+    if (!lockFile.exists()) {
+      lockFile.createNewFile();
+    }
+    if (!lockInstance(config.getLockFilePath())) {
+      logger.error("Sync client is already running.");
+      System.exit(1);
+    }
+  }
+
+  /**
+   * Try to lock lockfile. if failed, it means that sync client has benn started.
+   *
+   * @param lockFile path of lock file
+   */
+  private boolean lockInstance(final String lockFile) {
+    try {
+      final File file = new File(lockFile);
+      final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
+      final FileLock fileLock = randomAccessFile.getChannel().tryLock();
+      if (fileLock != null) {
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+          try {
+            fileLock.release();
+            randomAccessFile.close();
+          } catch (Exception e) {
+            logger.error("Unable to remove lock file: {}", lockFile, e);
+          }
+        }));
+        return true;
+      }
+    } catch (Exception e) {
+      logger.error("Unable to create and/or lock file: {}", lockFile, e);
+    }
+    return false;
+  }
+
+
   @Override
   public void init() {
     if (executorService == null) {
@@ -221,6 +247,8 @@ public class DataTransferManager implements IDataTransferManager {
 
       // 1. Sync data
       for (Entry<String, Set<File>> entry : deletedFilesMap.entrySet()) {
+        checkRecovery();
+        syncLog = new SyncSenderLogger(getSyncLogFile());
         // TODO deal with the situation
         try {
           if (serviceClient.init(entry.getKey()) == ResultStatus.FAILURE) {
@@ -232,12 +260,10 @@ public class DataTransferManager implements IDataTransferManager {
         logger.info("Sync process starts to transfer data of storage group {}", entry.getKey());
         syncDeletedFilesName(entry.getKey(), entry.getValue());
         syncDataFilesInOneGroup(entry.getKey(), entry.getValue());
+        clearSyncLog();
       }
 
-      // 2. Clear sync log
-      clearSyncLog();
-
-    } catch (SyncConnectionException | TException e) {
+    } catch (SyncConnectionException e) {
       logger.error("cannot finish sync process", e);
     } finally {
       if (syncLog != null) {
@@ -248,23 +274,22 @@ public class DataTransferManager implements IDataTransferManager {
   }
 
   private void checkRecovery() {
-
-  }
-
-  public void clearSyncLog() {
-
+    new SyncSenderLogAnalyzer(config.getSenderPath()).recover();
   }
 
   @Override
-  public void syncDeletedFilesName(String sgName, Set<File> deletedFilesName) {
+  public void syncDeletedFilesName(String sgName, Set<File> deletedFilesName) throws IOException {
     if (deletedFilesName.isEmpty()) {
       logger.info("There has no deleted files to be synced in storage group {}", sgName);
       return;
     }
+    syncLog.startSyncDeletedFilesName();
     logger.info("Start to sync names of deleted files in storage group {}", sgName);
-    for(File file:deletedFilesName){
+    for (File file : deletedFilesName) {
       try {
         serviceClient.syncDeletedFileName(file.getName());
+        successDeletedFilesMap.get(sgName).add(file);
+        syncLog.finishSyncDeletedFileName(file);
       } catch (TException e) {
         logger.error("Can not sync deleted file name {}, skip it.", file);
       }
@@ -274,27 +299,29 @@ public class DataTransferManager implements IDataTransferManager {
 
   @Override
   public void syncDataFilesInOneGroup(String sgName, Set<File> toBeSyncFiles)
-      throws SyncConnectionException {
-    Set<String> validSnapshot = validFileSnapshot.get(sgName);
-    if (validSnapshot.isEmpty()) {
+      throws SyncConnectionException, IOException {
+    if (toBeSyncFiles.isEmpty()) {
       logger.info("There has no new tsfiles to be synced in storage group {}", sgName);
       return;
     }
+    syncLog.startSyncTsFiles();
     logger.info("Sync process starts to transfer data of storage group {}", sgName);
     int cnt = 0;
-    for (String tsfilePath : toBeSyncFiles) {
+    for (File tsfile : toBeSyncFiles) {
       cnt++;
       File snapshotFile = null;
       try {
-        snapshotFile = makeFileSnapshot(tsfilePath);
+        snapshotFile = makeFileSnapshot(tsfile);
         syncSingleFile(snapshotFile);
+        sucessSyncedFilesMap.get(sgName).add(tsfile);
+        syncLog.finishSyncTsfile(tsfile);
         logger.info("Task of synchronization has completed {}/{}.", cnt, toBeSyncFiles.size());
       } catch (IOException e) {
         logger.info(
             "Tsfile {} can not make snapshot, so skip the tsfile and continue to sync other tsfiles",
-            tsfilePath, e);
+            tsfile, e);
       } finally {
-        if(snapshotFile != null) {
+        if (snapshotFile != null) {
           snapshotFile.deleteOnExit();
         }
       }
@@ -302,27 +329,25 @@ public class DataTransferManager implements IDataTransferManager {
     logger.info("Sync process has finished storage group {}.", sgName);
   }
 
-  private File makeFileSnapshot(String filePath) throws IOException {
-    String snapshotFilePath = SyncUtils.getSnapshotFilePath(filePath);
-    File newFile = new File(snapshotFilePath);
-    if (!newFile.getParentFile().exists()) {
-      newFile.getParentFile().mkdirs();
+  private File makeFileSnapshot(File file) throws IOException {
+    File snapshotFile = SyncUtils.getSnapshotFile(file);
+    if (!snapshotFile.getParentFile().exists()) {
+      snapshotFile.getParentFile().mkdirs();
     }
-    Path link = FileSystems.getDefault().getPath(snapshotFilePath);
-    Path target = FileSystems.getDefault().getPath(filePath);
+    Path link = FileSystems.getDefault().getPath(snapshotFile.getAbsolutePath());
+    Path target = FileSystems.getDefault().getPath(snapshotFile.getAbsolutePath());
     Files.createLink(link, target);
-    return newFile;
+    return snapshotFile;
   }
 
-
   /**
    * Transfer data of a storage group to receiver.
-   *
    */
   private void syncSingleFile(File snapshotFile) throws SyncConnectionException {
     try {
       int retryCount = 0;
       MessageDigest md = MessageDigest.getInstance(Constans.MESSAGE_DIGIT_NAME);
+      serviceClient.initSyncData(snapshotFile.getName());
       outer:
       while (true) {
         retryCount++;
@@ -341,9 +366,7 @@ public class DataTransferManager implements IDataTransferManager {
             md.update(buffer, 0, dataLength);
             ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
             bos.reset();
-            if (!Boolean.parseBoolean(serviceClient
-                .syncData(null, snapshotFile.getName(), buffToSend,
-                    SyncDataStatus.PROCESSING_STATUS))) {
+            if (serviceClient.syncData(buffToSend) == ResultStatus.FAILURE) {
               logger.info("Receiver failed to receive data from {}, retry.",
                   snapshotFile.getAbsoluteFile());
               continue outer;
@@ -353,14 +376,13 @@ public class DataTransferManager implements IDataTransferManager {
 
         // the file is sent successfully
         String md5OfSender = (new BigInteger(1, md.digest())).toString(16);
-        String md5OfReceiver = serviceClient.syncData(md5OfSender, snapshotFile.getName(),
-            null, SyncDataStatus.FINISH_STATUS);
+        String md5OfReceiver = serviceClient.checkDataMD5(md5OfSender);
         if (md5OfSender.equals(md5OfReceiver)) {
           logger.info("Receiver has received {} successfully.", snapshotFile.getAbsoluteFile());
           break;
         }
       }
-    } catch (IOException e) {
+    } catch (IOException | TException | NoSuchAlgorithmException e) {
       throw new SyncConnectionException("Cannot sync data with receiver.", e);
     }
   }
@@ -389,67 +411,14 @@ public class DataTransferManager implements IDataTransferManager {
    * UUID marks the identity of sender for receiver.
    */
   @Override
-  public boolean confirmIdentity(String uuidPath) throws SyncConnectionException, IOException {
-    File file = new File(uuidPath);
-    /** Mark the identity of sender **/
-    String uuid;
-    if (!file.getParentFile().exists()) {
-      file.getParentFile().mkdirs();
-    }
-    if (!file.exists()) {
-      try (FileOutputStream out = new FileOutputStream(file)) {
-        file.createNewFile();
-        uuid = generateUUID();
-        out.write(uuid.getBytes());
-      } catch (IOException e) {
-        logger.error("Cannot insert UUID to file {}", file.getPath());
-        throw new IOException(e);
-      }
-    } else {
-      try (BufferedReader bf = new BufferedReader((new FileReader(uuidPath)))) {
-        uuid = bf.readLine();
-      } catch (IOException e) {
-        logger.error("Cannot read UUID from file{}", file.getPath());
-        throw new IOException(e);
-      }
-    }
-    boolean legalConnection;
+  public boolean confirmIdentity(String uuidPath) throws SyncConnectionException {
     try {
-      legalConnection = serviceClient.checkIdentity(uuid,
-          InetAddress.getLocalHost().getHostAddress());
+      return serviceClient.checkIdentity(InetAddress.getLocalHost().getHostAddress())
+          == ResultStatus.SUCCESS;
     } catch (Exception e) {
       logger.error("Cannot confirm identity with receiver");
       throw new SyncConnectionException(e);
     }
-    return legalConnection;
-  }
-
-  private String generateUUID() {
-    return Constans.SYNC_SENDER + UUID.randomUUID().toString().replaceAll("-", "");
-  }
-
-  /**
-   * Create snapshots for valid files.
-   */
-  @Override
-  public Set<String> makeFileSnapshot(Set<String> validFiles) {
-    Set<String> validFilesSnapshot = new HashSet<>();
-    for (String filePath : validFiles) {
-      try {
-        String snapshotFilePath = SyncUtils.getSnapshotFilePath(filePath);
-        File newFile = new File(snapshotFilePath);
-        if (!newFile.getParentFile().exists()) {
-          newFile.getParentFile().mkdirs();
-        }
-        Path link = FileSystems.getDefault().getPath(snapshotFilePath);
-        Path target = FileSystems.getDefault().getPath(filePath);
-        Files.createLink(link, target);
-        validFilesSnapshot.add(snapshotFilePath);
-      } catch (IOException e) {
-        logger.error("Can not make snapshot for file {}", filePath);
-      }
-    }
-    return validFilesSnapshot;
   }
 
   /**
@@ -521,14 +490,6 @@ public class DataTransferManager implements IDataTransferManager {
     }
   }
 
-  private File getSchemaPosFile() {
-    return new File(config.getSenderPath(), Constans.SCHEMA_POS_FILE_NAME)
-  }
-
-  private File getSchemaLogFile() {
-    return new File(IoTDBDescriptor.getInstance().getConfig().getSchemaDir(),
-        MetadataConstant.METADATA_LOG);
-  }
 
   private boolean checkMD5ForSchema(String md5OfSender) throws TException {
     String md5OfReceiver = serviceClient.checkDataMD5(md5OfSender);
@@ -536,7 +497,8 @@ public class DataTransferManager implements IDataTransferManager {
       logger.info("Receiver has received schema successfully, retry.");
       return true;
     } else {
-      logger.error("MD5 check of schema file {} failed, retry", getSchemaLogFile().getAbsoluteFile());
+      logger
+          .error("MD5 check of schema file {} failed, retry", getSchemaLogFile().getAbsoluteFile());
       return false;
     }
   }
@@ -566,49 +528,37 @@ public class DataTransferManager implements IDataTransferManager {
     }
   }
 
-  /**
-   * The method is to verify whether the client lock file is locked or not, ensuring that only one
-   * client is running.
-   */
-  private void verifySingleton() throws IOException {
-    File lockFile = new File(config.getLockFilePath());
-    if (!lockFile.getParentFile().exists()) {
-      lockFile.getParentFile().mkdirs();
-    }
-    if (!lockFile.exists()) {
-      lockFile.createNewFile();
+  private void clearSyncLog() {
+    for (Entry<String, Set<File>> entry : lastLocalFilesMap.entrySet()) {
+      entry.getValue()
+          .removeAll(successDeletedFilesMap.getOrDefault(entry.getKey(), new HashSet<>()));
+      entry.getValue()
+          .removeAll(sucessSyncedFilesMap.getOrDefault(entry.getKey(), new HashSet<>()));
     }
-    if (!lockInstance(config.getLockFilePath())) {
-      logger.error("Sync client is already running.");
-      System.exit(1);
+    File currentLocalFile = getCurrentLogFile();
+    File lastLocalFile = new File(config.getLastFileInfo());
+    try (BufferedWriter bw = new BufferedWriter(new FileWriter(currentLocalFile))) {
+      for (Set<File> currentLocalFiles : lastLocalFilesMap.values()) {
+        for (File file : currentLocalFiles) {
+          bw.write(file.getAbsolutePath());
+          bw.newLine();
+        }
+        bw.flush();
+      }
+    } catch (IOException e) {
+      logger.error("Can not clear sync log {}", lastLocalFile.getAbsoluteFile(), e);
     }
+    currentLocalFile.renameTo(lastLocalFile);
   }
 
-  /**
-   * Try to lock lockfile. if failed, it means that sync client has benn started.
-   *
-   * @param lockFile path of lock file
-   */
-  private boolean lockInstance(final String lockFile) {
-    try {
-      final File file = new File(lockFile);
-      final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
-      final FileLock fileLock = randomAccessFile.getChannel().tryLock();
-      if (fileLock != null) {
-        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-          try {
-            fileLock.release();
-            randomAccessFile.close();
-          } catch (Exception e) {
-            logger.error("Unable to remove lock file: {}", lockFile, e);
-          }
-        }));
-        return true;
-      }
-    } catch (Exception e) {
-      logger.error("Unable to create and/or lock file: {}", lockFile, e);
-    }
-    return false;
+
+  private File getSchemaPosFile() {
+    return new File(config.getSenderPath(), Constans.SCHEMA_POS_FILE_NAME);
+  }
+
+  private File getSchemaLogFile() {
+    return new File(IoTDBDescriptor.getInstance().getConfig().getSchemaDir(),
+        MetadataConstant.METADATA_LOG);
   }
 
   private static class InstanceHolder {
@@ -621,7 +571,7 @@ public class DataTransferManager implements IDataTransferManager {
   }
 
   private File getCurrentLogFile() {
-    return new File(config.getSenderPath(), Constans.CURRENT_SYNC_LOG_NAME);
+    return new File(config.getSenderPath(), Constans.CURRENT_LOCAL_FILE_NAME);
   }
 
   public void setConfig(SyncSenderConfig config) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
index 82b0d00..3366b91 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/IDataTransferManager.java
@@ -45,22 +45,18 @@ public interface IDataTransferManager {
   boolean confirmIdentity(String uuidPath) throws SyncConnectionException, IOException;
 
   /**
-   * Make file snapshots before sending files.
-   */
-  Set<String> makeFileSnapshot(Set<String> validFiles);
-
-  /**
    * Send schema file to receiver.
    */
   void syncSchema() throws SyncConnectionException, TException;
 
   void syncDeletedFilesName(String sgName, Set<File> deletedFilesName)
-      throws SyncConnectionException;
+      throws SyncConnectionException, IOException;
 
   /**
    * For all valid files, send it to receiver side and load these data in receiver.
    */
-  void syncDataFilesInOneGroup(String sgName, Set<File> deletedFilesName) throws SyncConnectionException;
+  void syncDataFilesInOneGroup(String sgName, Set<File> deletedFilesName)
+      throws SyncConnectionException, IOException;
 
   /**
    * Execute a sync task.
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
index 8e7a9e8..13247cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
@@ -37,14 +37,13 @@ public class SyncUtils {
    * multiple directories, it's necessary to make a snapshot in the same disk. It's used by sync
    * sender.
    */
-  public static String getSnapshotFilePath(String filePath) {
-    String relativeFilePath =
-        new File(filePath).getParent() + File.separator + new File(filePath).getName();
+  public static File getSnapshotFile(File file) {
+    String relativeFilePath = file.getParent() + File.separator + file.getName();
     String snapshotDir = SyncSenderDescriptor.getInstance().getConfig().getSnapshotPath();
     if (!new File(snapshotDir).exists()) {
       new File(snapshotDir).mkdirs();
     }
-    return new File(snapshotDir, relativeFilePath).getAbsolutePath();
+    return new File(snapshotDir, relativeFilePath);
   }
 
   /**