You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/03/08 03:31:35 UTC

[iotdb] branch new_sync updated: The transport part of new sync (#5162)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch new_sync
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/new_sync by this push:
     new 510a9b8  The transport part of new sync (#5162)
510a9b8 is described below

commit 510a9b8247d5dc9a9fc3df7dc2cc04ca3b277285
Author: Irvine <ir...@gmail.com>
AuthorDate: Tue Mar 8 11:28:30 2022 +0800

    The transport part of new sync (#5162)
    
    Co-authored-by: wangjunqing <wa...@alibaba-inc.com>
---
 .../newsync/transport/client/ITransportClient.java |   3 +
 .../newsync/transport/client/TransportClient.java  | 494 +++++++++++++++++++++
 .../db/newsync/transport/conf/TransportConfig.java |  39 ++
 .../newsync/transport/conf/TransportConstant.java  |  17 +
 .../transport/server/TransportServerManager.java   | 115 +++++
 .../server/TransportServerManagerMBean.java        |  15 +
 .../server/TransportServerThriftHandler.java       |  33 ++
 .../transport/server/TransportServiceImpl.java     | 286 ++++++++++++
 thrift-sync/src/main/thrift/transport.thrift       |  73 +++
 9 files changed, 1075 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/transport/client/ITransportClient.java b/server/src/main/java/org/apache/iotdb/db/newsync/transport/client/ITransportClient.java
new file mode 100644
index 0000000..0499fb1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/transport/client/ITransportClient.java
@@ -0,0 +1,3 @@
+package org.apache.iotdb.db.newsync.transport.client;
+
+public interface ITransportClient {}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/transport/client/TransportClient.java b/server/src/main/java/org/apache/iotdb/db/newsync/transport/client/TransportClient.java
new file mode 100644
index 0000000..037543b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/transport/client/TransportClient.java
@@ -0,0 +1,494 @@
+package org.apache.iotdb.db.newsync.transport.client;
+
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.SyncConnectionException;
+import org.apache.iotdb.db.newsync.pipedata.PipeData;
+import org.apache.iotdb.db.newsync.pipedata.TsFilePipeData;
+import org.apache.iotdb.db.newsync.sender.pipe.Pipe;
+import org.apache.iotdb.db.newsync.transport.conf.TransportConstant;
+import org.apache.iotdb.db.sync.conf.SyncConstant;
+import org.apache.iotdb.db.sync.conf.SyncSenderConfig;
+import org.apache.iotdb.db.sync.conf.SyncSenderDescriptor;
+import org.apache.iotdb.db.sync.sender.transfer.SyncClient;
+import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.service.transport.thrift.IdentityInfo;
+import org.apache.iotdb.service.transport.thrift.MetaInfo;
+import org.apache.iotdb.service.transport.thrift.SyncRequest;
+import org.apache.iotdb.service.transport.thrift.SyncResponse;
+import org.apache.iotdb.service.transport.thrift.TransportService;
+import org.apache.iotdb.service.transport.thrift.TransportStatus;
+import org.apache.iotdb.service.transport.thrift.Type;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.UUID;
+
+import static org.apache.iotdb.db.newsync.transport.conf.TransportConfig.isCheckFileDegistAgain;
+import static org.apache.iotdb.db.newsync.transport.conf.TransportConstant.REBASE_CODE;
+import static org.apache.iotdb.db.newsync.transport.conf.TransportConstant.RETRY_CODE;
+import static org.apache.iotdb.db.newsync.transport.conf.TransportConstant.SUCCESS_CODE;
+
+public class TransportClient implements ITransportClient, Runnable {
+
+  private static final Logger logger = LoggerFactory.getLogger(SyncClient.class);
+
+  // TODO: Need to change to transport config
+  private static SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig();
+
+  private static final IoTDBConfig ioTDBConfig = IoTDBDescriptor.getInstance().getConfig();
+
+  private static final int TIMEOUT_MS = 2000_000;
+
+  private TTransport transport = null;
+
+  private TransportService.Client serviceClient = null;
+
+  private String ipAddress = null;
+
+  private int port = -1;
+
+  private String uuid = null;
+
+  private IdentityInfo identityInfo = null;
+
+  private Pipe pipe = null;
+
+  @TestOnly
+  private TransportClient() {
+    Thread.currentThread().setName(ThreadName.SYNC_CLIENT.getName());
+  }
+
+  @TestOnly
+  public static TransportClient getInstance() {
+    return TransportClient.InstanceHolder.INSTANCE;
+  }
+
+  @TestOnly
+  public void setServerConfig(String ipAddress, int port) throws IOException {
+    this.ipAddress = ipAddress;
+    this.port = port;
+    this.uuid = getOrCreateUUID(getUuidFile());
+  }
+
+  public TransportClient(Pipe pipe, String ipAddress, int port) throws IOException {
+    Thread.currentThread().setName(ThreadName.SYNC_CLIENT.getName());
+    RpcTransportFactory.setThriftMaxFrameSize(ioTDBConfig.getThriftMaxFrameSize());
+
+    this.pipe = pipe;
+    this.ipAddress = ipAddress;
+    this.port = port;
+    this.uuid = getOrCreateUUID(getUuidFile());
+
+    handshake();
+  }
+
+  private boolean handshake() {
+    int handshakeCounter = 0;
+    while (!handshakeWithVersion()) {
+      handshakeCounter++;
+      if (handshakeCounter > config.getMaxNumOfSyncFileRetry()) {
+        logger.error(
+            String.format(
+                "Handshake failed %s times! Check network.", config.getMaxNumOfSyncFileRetry()));
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean handshakeWithVersion() {
+
+    if (transport != null && transport.isOpen()) {
+      transport.close();
+    }
+
+    try (Socket socket = new Socket(this.ipAddress, this.port)) {
+      transport =
+          RpcTransportFactory.INSTANCE.getTransport(
+              config.getServerIp(), config.getServerPort(), TIMEOUT_MS);
+      TProtocol protocol;
+      if (ioTDBConfig.isRpcThriftCompressionEnable()) {
+        protocol = new TCompactProtocol(transport);
+      } else {
+        protocol = new TBinaryProtocol(transport);
+      }
+      serviceClient = new TransportService.Client(protocol);
+
+      // Underlay socket open.
+      if (!transport.isOpen()) {
+        transport.open();
+      }
+
+      identityInfo =
+          new IdentityInfo(
+              socket.getLocalAddress().getHostAddress(),
+              this.uuid,
+              ioTDBConfig.getIoTDBMajorVersion());
+      TransportStatus status = serviceClient.handshake(identityInfo);
+      if (status.code != SUCCESS_CODE) {
+        throw new SyncConnectionException(
+            "The receiver rejected the synchronization task because " + status.msg);
+      }
+    } catch (TTransportException e) {
+      logger.error("Cannot connect to the receiver. ", e);
+      // TODO: Do actions with exception.
+      return false;
+    } catch (SyncConnectionException | TException | IOException e) {
+      logger.error("Cannot confirm identity with the receiver. ", e);
+      // TODO: Do actions with exception.
+      return false;
+    }
+    return true;
+  }
+
+  private boolean senderTransport(PipeData pipeData) {
+
+    int retryCount = 0;
+
+    while (true) {
+      retryCount++;
+      if (retryCount > config.getMaxNumOfSyncFileRetry()) {
+        logger.error(
+            String.format("After %s tries, stop the transport of current pipeData!", retryCount));
+        return false;
+      }
+
+      try {
+        if (pipeData instanceof TsFilePipeData) {
+          for (File file : ((TsFilePipeData) pipeData).getTsFiles()) {
+            transportSingleFile(file);
+          }
+        }
+        transportPipeData(pipeData);
+        logger.info("Finish current pipeData transport!");
+        break;
+      } catch (SyncConnectionException e) {
+        // handshake and retry
+        handshake();
+      } catch (IOException | NoSuchAlgorithmException e) {
+        logger.error("Transport failed. ", e);
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /** Transfer data of a tsfile to the receiver. */
+  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+  private void transportSingleFile(File file)
+      throws SyncConnectionException, IOException, NoSuchAlgorithmException {
+
+    MessageDigest messageDigest = MessageDigest.getInstance("SHA-256");
+
+    while (true) {
+      transportSingleFilePieceByPiece(file, messageDigest);
+
+      if (isCheckFileDegistAgain) {
+        // Check file digest as entirety.
+        messageDigest.reset();
+        try (InputStream inputStream = new FileInputStream(file)) {
+          byte[] block = new byte[TransportConstant.DATA_CHUNK_SIZE];
+          int length;
+          while ((length = inputStream.read(block)) > 0) {
+            messageDigest.update(block, 0, length);
+          }
+        }
+        MetaInfo metaInfo = new MetaInfo(Type.FILE, file.getName(), 0);
+
+        TransportStatus status = null;
+
+        int retryCount = 0;
+
+        while (true) {
+          retryCount++;
+          if (retryCount > config.getMaxNumOfSyncFileRetry()) {
+            throw new SyncConnectionException(
+                String.format(
+                    "Can not sync file %s after %s tries.",
+                    file.getAbsoluteFile(), config.getMaxNumOfSyncFileRetry()));
+          }
+          try {
+            status =
+                serviceClient.checkFileDigest(
+                    identityInfo, metaInfo, ByteBuffer.wrap(messageDigest.digest()));
+          } catch (TException e) {
+            // retry
+            logger.error("TException happens! ", e);
+            continue;
+          }
+          break;
+        }
+
+        if (status.code != SUCCESS_CODE) {
+          logger.error("Digest check of tsfile {} failed, retry", file.getAbsoluteFile());
+          continue;
+        }
+      }
+      break;
+    }
+
+    logger.info("Receiver has received {} successfully.", file.getAbsoluteFile());
+  }
+
+  private void transportSingleFilePieceByPiece(File file, MessageDigest messageDigest)
+      throws SyncConnectionException {
+
+    // Cut the file into pieces to send
+    long position = 0;
+
+    // Try small piece to rebase the file position.
+    byte[] buffer = new byte[1024 * 1024];
+
+    outer:
+    while (true) {
+
+      // Normal piece.
+      if (position != 0L && buffer.length != TransportConstant.DATA_CHUNK_SIZE) {
+        buffer = new byte[TransportConstant.DATA_CHUNK_SIZE];
+      }
+
+      int dataLength;
+      try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");
+          ByteArrayOutputStream byteArrayOutputStream =
+              new ByteArrayOutputStream(TransportConstant.DATA_CHUNK_SIZE)) {
+
+        randomAccessFile.seek(position);
+        while ((dataLength = randomAccessFile.read(buffer)) != -1) {
+          messageDigest.reset();
+          byteArrayOutputStream.write(buffer, 0, dataLength);
+          messageDigest.update(buffer, 0, dataLength);
+          ByteBuffer buffToSend = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+          byteArrayOutputStream.reset();
+          MetaInfo metaInfo = new MetaInfo(Type.FILE, file.getName(), position);
+
+          TransportStatus status = null;
+          int retryCount = 0;
+          while (true) {
+            retryCount++;
+            if (retryCount > config.getMaxNumOfSyncFileRetry()) {
+              throw new SyncConnectionException(
+                  String.format(
+                      "Can not sync file %s after %s tries.",
+                      file.getAbsoluteFile(), config.getMaxNumOfSyncFileRetry()));
+            }
+            try {
+              status =
+                  serviceClient.transportData(
+                      identityInfo, metaInfo, buffToSend, ByteBuffer.wrap(messageDigest.digest()));
+            } catch (TException e) {
+              // retry
+              logger.error("TException happened! ", e);
+              continue;
+            }
+            break;
+          }
+
+          if (status.code == REBASE_CODE) {
+            position = Long.parseLong(status.msg);
+            continue outer;
+          } else if (status.code == RETRY_CODE) {
+            logger.info(
+                "Receiver failed to receive data from {} because {}, retry.",
+                file.getAbsoluteFile(),
+                status.msg);
+            continue outer;
+          } else if (status.code != SUCCESS_CODE) {
+            logger.info(
+                "Receiver failed to receive data from {} because {}, abort.",
+                file.getAbsoluteFile(),
+                status.msg);
+            throw new RuntimeException("Error! Replace this exception!");
+          } else { // Success
+            position += dataLength;
+          }
+        }
+      } catch (IOException e) {
+        // retry
+        logger.error("IOException happened! ", e);
+      } catch (SyncConnectionException e) {
+        logger.error("Cannot sync data with receiver. ", e);
+        throw e;
+      }
+    }
+  }
+
+  private void transportPipeData(PipeData pipeData)
+      throws SyncConnectionException, NoSuchAlgorithmException {
+
+    MessageDigest messageDigest = MessageDigest.getInstance("SHA-256");
+
+    int retryCount = 0;
+    while (true) {
+
+      retryCount++;
+      if (retryCount > config.getMaxNumOfSyncFileRetry()) {
+        throw new SyncConnectionException(
+            String.format(
+                "Can not sync pipe data after %s tries.", config.getMaxNumOfSyncFileRetry()));
+      }
+
+      try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+          DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
+        int dataLength = new Long(pipeData.serialize(dataOutputStream)).intValue();
+        byte[] buffer = new byte[dataLength];
+
+        byteArrayOutputStream.write(buffer, 0, dataLength);
+        messageDigest.reset();
+        messageDigest.update(buffer, 0, dataLength);
+        ByteBuffer buffToSend = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+        byteArrayOutputStream.reset();
+
+        MetaInfo metaInfo =
+            new MetaInfo(Type.findByValue(pipeData.getType().ordinal()), "fileName", 0);
+        TransportStatus status =
+            serviceClient.transportData(
+                identityInfo, metaInfo, buffToSend, ByteBuffer.wrap(messageDigest.digest()));
+
+        if (status.code == SUCCESS_CODE) {
+          break;
+        } else {
+          logger.error("Digest check of pipeData failed, retry");
+        }
+      } catch (IOException | TException e) {
+        // retry
+        logger.error("Exception happened!", e);
+      }
+    }
+  }
+
+  /** UUID marks the identity of sender for receiver. */
+  private String getOrCreateUUID(File uuidFile) throws IOException {
+    if (!uuidFile.getParentFile().exists()) {
+      uuidFile.getParentFile().mkdirs();
+    }
+
+    String uuid;
+    if (uuidFile.exists()) {
+      try (BufferedReader bf = new BufferedReader((new FileReader(uuidFile)))) {
+        uuid = bf.readLine();
+      } catch (IOException e) {
+        logger.error("Cannot read UUID from file {}", uuidFile.getPath());
+        throw new IOException(e);
+      }
+
+      if ((uuid == null) || (uuid.length() == 0)) {
+        logger.warn("UUID in file {} is empty.", uuidFile.getPath());
+        uuidFile.delete();
+      } else {
+        return uuid;
+      }
+    }
+
+    // uuidFile not exist or uuid in uuidFile is invalid
+    try (FileOutputStream out = new FileOutputStream(uuidFile)) {
+      uuid = generateUUID();
+      out.write(uuid.getBytes());
+    } catch (IOException e) {
+      logger.error("Cannot insert UUID to file {}", uuidFile.getPath());
+      throw new IOException(e);
+    }
+
+    return uuid;
+  }
+
+  private String generateUUID() {
+    return UUID.randomUUID().toString().replaceAll("-", "");
+  }
+
+  private File getUuidFile() {
+    return new File(ioTDBConfig.getSyncDir(), SyncConstant.UUID_FILE_NAME);
+  }
+
+  /**
+   * When an object implementing interface <code>Runnable</code> is used to create a thread,
+   * starting the thread causes the object's <code>run</code> method to be called in that separately
+   * executing thread.
+   *
+   * <p>The general contract of the method <code>run</code> is that it may take any action
+   * whatsoever.
+   *
+   * @see Thread#run()
+   */
+  @Override
+  public void run() {
+
+    //    while (true) {
+    //      List<PipeData> pipeDataList = this.pipe.pull(Long.MAX_VALUE);
+    //      for (PipeData pipeData: pipeDataList) {
+    //        boolean sendResult = senderTransport(pipeData);
+    //        if (!sendResult) {
+    //          // Cannot handle the error.
+    //          return;
+    //        }
+    //        this.pipe.commit(pipeData.getSerialNumber());
+    //      }
+    //
+    //      try {
+    //        Thread.sleep(1000);
+    //      } catch (InterruptedException e) {
+    //        Thread.currentThread().interrupt();
+    //        throw new RuntimeException(e);
+    //      }
+    //    }
+
+  }
+
+  public SyncResponse heartbeat(SyncRequest syncRequest) throws TException {
+    return serviceClient.heartbeat(identityInfo, syncRequest);
+  }
+
+  @TestOnly
+  public static void main(String[] args) throws IOException {
+
+    TransportClient.getInstance().setServerConfig("127.0.0.1", 5555);
+
+    if (!TransportClient.getInstance().handshake()) {
+      // Deal with the error here.
+      return;
+    }
+
+    // Example 1. Send TSFILE.
+    //    List<File> files = new ArrayList<>();
+    //    files.add(new File(System.getProperty(IoTDBConstant.IOTDB_HOME) + "/files/test1"));
+    //    files.add(new File(System.getProperty(IoTDBConstant.IOTDB_HOME) + "/files/test2"));
+    //    files.add(new File(System.getProperty(IoTDBConstant.IOTDB_HOME) + "/files/test3"));
+
+    // if (!TransportClient.getInstance().senderTransport()) {
+    // Deal with the error here.
+    // }
+
+    // Example 2. Send DELETION
+    // TsFilePipeData.Type.DELETION.name();
+
+    // Example 3. Send PHYSICALPLAN
+    // TsFilePipeData.Type.PHYSICALPLAN.name();
+  }
+
+  private static class InstanceHolder {
+    private static final TransportClient INSTANCE = new TransportClient();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/transport/conf/TransportConfig.java b/server/src/main/java/org/apache/iotdb/db/newsync/transport/conf/TransportConfig.java
new file mode 100644
index 0000000..316f6f5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/transport/conf/TransportConfig.java
@@ -0,0 +1,39 @@
+package org.apache.iotdb.db.newsync.transport.conf;
+
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.sync.conf.SyncConstant;
+
+import java.io.File;
+
+public class TransportConfig {
+  private TransportConfig() {}
+
+  /** default base dir, stores all IoTDB runtime files */
+  private static final String DEFAULT_BASE_DIR = addHomeDir("data");
+
+  private static String addHomeDir(String dir) {
+    String homeDir = System.getProperty(IoTDBConstant.IOTDB_HOME, null);
+    if (!new File(dir).isAbsolute() && homeDir != null && homeDir.length() > 0) {
+      if (!homeDir.endsWith(File.separator)) {
+        dir = homeDir + File.separatorChar + dir;
+      } else {
+        dir = homeDir + dir;
+      }
+    }
+    return dir;
+  }
+
+  public static String getSyncedDir(String ipAddress, String uuid) {
+    return DEFAULT_BASE_DIR
+        + File.separator
+        + SyncConstant.SYNC_RECEIVER
+        + File.separator
+        + ipAddress
+        + SyncConstant.SYNC_DIR_NAME_SEPARATOR
+        + uuid
+        + File.separator
+        + SyncConstant.RECEIVER_DATA_FOLDER_NAME;
+  }
+
+  public static boolean isCheckFileDegistAgain = false;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/transport/conf/TransportConstant.java b/server/src/main/java/org/apache/iotdb/db/newsync/transport/conf/TransportConstant.java
new file mode 100644
index 0000000..daf1d0d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/transport/conf/TransportConstant.java
@@ -0,0 +1,17 @@
+package org.apache.iotdb.db.newsync.transport.conf;
+
+import org.apache.iotdb.rpc.RpcUtils;
+
+public class TransportConstant {
+
+  private TransportConstant() {}
+
+  public static final int DATA_CHUNK_SIZE =
+      Math.min(16 * 1024 * 1024, RpcUtils.THRIFT_FRAME_MAX_SIZE);
+
+  public static final int SUCCESS_CODE = 1;
+  public static final int ERROR_CODE = -1;
+  public static final int REBASE_CODE = -2;
+  public static final int RETRY_CODE = -3;
+  public static final int CONFLICT_CODE = -4;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServerManager.java b/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServerManager.java
new file mode 100644
index 0000000..7f9a540
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServerManager.java
@@ -0,0 +1,115 @@
+package org.apache.iotdb.db.newsync.transport.server;
+
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.service.thrift.ThriftService;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.service.transport.thrift.TransportService;
+
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TransportServerManager extends ThriftService
+    implements Runnable, TransportServerManagerMBean {
+
+  private static final Logger logger = LoggerFactory.getLogger(TransportServerManager.class);
+  private TransportServiceImpl serviceImpl;
+
+  @Override
+  public void run() {
+    TransportServerManager serverManager = new TransportServerManager();
+    try {
+      serverManager.start();
+    } catch (StartupException e) {
+      e.printStackTrace();
+    }
+  }
+
+  private static class ServiceManagerHolder {
+    private static final TransportServerManager INSTANCE = new TransportServerManager();
+  }
+
+  public static TransportServerManager getInstance() {
+    return TransportServerManager.ServiceManagerHolder.INSTANCE;
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.SYNC_SERVICE;
+  }
+
+  @Override
+  public ThriftService getImplementation() {
+    return getInstance();
+  }
+
+  @Override
+  public void initTProcessor() {
+    initSyncedServiceImpl(null);
+    serviceImpl = new TransportServiceImpl();
+    processor = new TransportService.Processor<>(serviceImpl);
+  }
+
+  @Override
+  public void initThriftServiceThread() {
+    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+    thriftServiceThread =
+        new ThriftServiceThread(
+            processor,
+            getID().getName(),
+            ThreadName.SYNC_CLIENT.getName(),
+            config.getRpcAddress(),
+            config.getSyncServerPort(),
+            Integer.MAX_VALUE,
+            config.getThriftServerAwaitTimeForStopService(),
+            new TransportServerThriftHandler(serviceImpl),
+            config.isRpcThriftCompressionEnable());
+    thriftServiceThread.setName(ThreadName.SYNC_SERVER.getName());
+  }
+
+  @Override
+  public String getBindIP() {
+    // TODO: Whether to change this config here
+    return IoTDBDescriptor.getInstance().getConfig().getRpcAddress();
+  }
+
+  @Override
+  public int getBindPort() {
+    // TODO: Whether to change this config here
+    return IoTDBDescriptor.getInstance().getConfig().getSyncServerPort();
+  }
+
+  //  @Override
+  public int getRPCPort() {
+    return getBindPort();
+  }
+
+  @Override
+  public void startService() throws StartupException {
+    // TODO: Whether to change this config here
+    if (!IoTDBDescriptor.getInstance().getConfig().isSyncEnable()) {
+      return;
+    }
+    super.startService();
+  }
+
+  @Override
+  public void stopService() {
+    // TODO: Whether to change this config here
+    if (IoTDBDescriptor.getInstance().getConfig().isSyncEnable()) {
+      super.stopService();
+    }
+  }
+
+  @TestOnly
+  public static void main(String[] args) throws TTransportException, StartupException {
+    logger.info("Transport server for testing only.");
+    TransportServerManager serverManager = new TransportServerManager();
+    serverManager.start();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServerManagerMBean.java b/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServerManagerMBean.java
new file mode 100644
index 0000000..fec1947
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServerManagerMBean.java
@@ -0,0 +1,15 @@
+package org.apache.iotdb.db.newsync.transport.server;
+
+import org.apache.iotdb.db.exception.StartupException;
+
+public interface TransportServerManagerMBean {
+  String getRPCServiceStatus();
+
+  int getRPCPort();
+
+  void startService() throws StartupException;
+
+  void restartService() throws StartupException;
+
+  void stopService();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServerThriftHandler.java b/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServerThriftHandler.java
new file mode 100644
index 0000000..8a3898c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServerThriftHandler.java
@@ -0,0 +1,33 @@
+package org.apache.iotdb.db.newsync.transport.server;
+
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.transport.TTransport;
+
+public class TransportServerThriftHandler implements TServerEventHandler {
+
+  private TransportServiceImpl serviceImpl;
+
+  TransportServerThriftHandler(TransportServiceImpl serviceImpl) {
+    this.serviceImpl = serviceImpl;
+  }
+
+  @Override
+  public void preServe() {}
+
+  @Override
+  public ServerContext createContext(TProtocol input, TProtocol output) {
+    return null;
+  }
+
+  @Override
+  public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) {
+    // release query resources.
+    serviceImpl.handleClientExit();
+  }
+
+  @Override
+  public void processContext(
+      ServerContext serverContext, TTransport inputTransport, TTransport outputTransport) {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServiceImpl.java
new file mode 100644
index 0000000..0160cf3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/newsync/transport/server/TransportServiceImpl.java
@@ -0,0 +1,286 @@
+package org.apache.iotdb.db.newsync.transport.server;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.newsync.pipedata.PipeData;
+import org.apache.iotdb.service.transport.thrift.IdentityInfo;
+import org.apache.iotdb.service.transport.thrift.MetaInfo;
+import org.apache.iotdb.service.transport.thrift.SyncRequest;
+import org.apache.iotdb.service.transport.thrift.SyncResponse;
+import org.apache.iotdb.service.transport.thrift.TransportService;
+import org.apache.iotdb.service.transport.thrift.TransportStatus;
+import org.apache.iotdb.service.transport.thrift.Type;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.RandomAccessFile;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+
+import static org.apache.iotdb.db.newsync.transport.conf.TransportConfig.getSyncedDir;
+import static org.apache.iotdb.db.newsync.transport.conf.TransportConstant.CONFLICT_CODE;
+import static org.apache.iotdb.db.newsync.transport.conf.TransportConstant.ERROR_CODE;
+import static org.apache.iotdb.db.newsync.transport.conf.TransportConstant.REBASE_CODE;
+import static org.apache.iotdb.db.newsync.transport.conf.TransportConstant.RETRY_CODE;
+import static org.apache.iotdb.db.newsync.transport.conf.TransportConstant.SUCCESS_CODE;
+import static org.apache.iotdb.db.sync.conf.SyncConstant.DATA_CHUNK_SIZE;
+
+public class TransportServiceImpl implements TransportService.Iface {
+  private static Logger logger = LoggerFactory.getLogger(TransportServiceImpl.class);
+
+  private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  private class CheckResult {
+    boolean result;
+    String index;
+
+    public CheckResult(boolean result, String index) {
+      this.result = result;
+      this.index = index;
+    }
+
+    public boolean isResult() {
+      return result;
+    }
+
+    public String getIndex() {
+      return index;
+    }
+  }
+
+  private CheckResult checkStartIndexValid(File file, long startIndex) throws IOException {
+    File recordFile = new File(file.getAbsolutePath() + ".record");
+
+    if (!recordFile.exists() && startIndex != 0) {
+      logger.error(
+          "The start index {} of data sync is not valid. "
+              + "The file {} is not exist and start index should equal to 0).",
+          startIndex,
+          recordFile.getAbsolutePath());
+      return new CheckResult(false, "0");
+    }
+
+    if (recordFile.exists()) {
+      try (InputStream inputStream = new FileInputStream(recordFile);
+          BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) {
+        String index = bufferedReader.readLine();
+
+        if ((index == null) || (index.length() == 0)) {
+          if (startIndex != 0) {
+            logger.error(
+                "The start index {} of data sync is not valid. "
+                    + "The file {} is not exist and start index is should equal to 0.",
+                startIndex,
+                recordFile.getAbsolutePath());
+            return new CheckResult(false, "0");
+          }
+        }
+
+        if (Long.parseLong(index) != startIndex) {
+          logger.error(
+              "The start index {} of data sync is not valid. "
+                  + "The start index of the file {} should equal to {}.",
+              startIndex,
+              recordFile.getAbsolutePath(),
+              index);
+          return new CheckResult(false, index);
+        }
+      }
+    }
+
+    return new CheckResult(true, "0");
+  }
+
+  @Override
+  public TransportStatus handshake(IdentityInfo identityInfo) throws TException {
+    logger.debug("Invoke handshake method from client ip = {}", identityInfo.address);
+
+    // Version check
+    if (!config.getIoTDBMajorVersion(identityInfo.version).equals(config.getIoTDBMajorVersion())) {
+      return new TransportStatus(
+          ERROR_CODE,
+          String.format(
+              "Version mismatch: the sender <%s>, the receiver <%s>",
+              identityInfo.version, config.getIoTDBVersion()));
+    }
+
+    if (!new File(getSyncedDir(identityInfo.getAddress(), identityInfo.getUuid())).exists()) {
+      new File(getSyncedDir(identityInfo.getAddress(), identityInfo.getUuid())).mkdirs();
+    }
+    return new TransportStatus(SUCCESS_CODE, "");
+  }
+
+  @Override
+  public TransportStatus transportData(
+      IdentityInfo identityInfo, MetaInfo metaInfo, ByteBuffer buff, ByteBuffer digest) {
+    logger.debug("Invoke transportData method from client ip = {}", identityInfo.address);
+
+    String ipAddress = identityInfo.address;
+    String uuid = identityInfo.uuid;
+    synchronized (uuid.intern()) {
+      Type type = metaInfo.type;
+      String fileName = metaInfo.fileName;
+      long startIndex = metaInfo.startIndex;
+
+      // Check file start index valid
+      if (type == Type.FILE) {
+        try {
+          CheckResult result =
+              checkStartIndexValid(new File(getSyncedDir(ipAddress, uuid), fileName), startIndex);
+          if (!result.isResult()) {
+            return new TransportStatus(REBASE_CODE, result.getIndex());
+          }
+        } catch (IOException e) {
+          logger.error(e.getMessage());
+          return new TransportStatus(ERROR_CODE, e.getMessage());
+        }
+      }
+
+      // Check buff digest
+      int pos = buff.position();
+      MessageDigest messageDigest = null;
+      try {
+        messageDigest = MessageDigest.getInstance("SHA-256");
+      } catch (NoSuchAlgorithmException e) {
+        logger.error(e.getMessage());
+        return new TransportStatus(ERROR_CODE, e.getMessage());
+      }
+      messageDigest.update(buff);
+      byte[] digestBytes = new byte[digest.capacity()];
+      digest.get(digestBytes);
+      if (!Arrays.equals(messageDigest.digest(), digestBytes)) {
+        return new TransportStatus(RETRY_CODE, "Data digest check error, retry.");
+      }
+
+      if (type != Type.FILE) {
+
+        buff.position(pos);
+        int length = buff.capacity();
+        byte[] byteArray = new byte[length];
+        buff.get(byteArray);
+        try (InputStream inputStream = new ByteArrayInputStream(byteArray);
+            DataInputStream dataInputStream = new DataInputStream(inputStream)) {
+          PipeData pipeData = PipeData.deserialize(dataInputStream);
+          // Do with file
+          // BufferedPipeDataQueue.offer(pipeData);
+        } catch (IOException | IllegalPathException e) {
+          e.printStackTrace();
+        }
+      } else {
+        // Write buff to {file}.patch
+        buff.position(pos);
+        File file = new File(getSyncedDir(ipAddress, uuid), fileName + ".patch");
+        try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw")) {
+          randomAccessFile.seek(startIndex);
+          int length = buff.capacity();
+          byte[] byteArray = new byte[length];
+          buff.get(byteArray);
+          randomAccessFile.write(byteArray);
+          writeRecordFile(
+              new File(getSyncedDir(ipAddress, uuid), fileName + ".record"), startIndex + length);
+          logger.debug(
+              "Sync "
+                  + fileName
+                  + " start at "
+                  + startIndex
+                  + " to "
+                  + (startIndex + length)
+                  + " is done.");
+        } catch (IOException e) {
+          logger.error(e.getMessage());
+          e.printStackTrace();
+          return new TransportStatus(ERROR_CODE, e.getMessage());
+        }
+      }
+    }
+    return new TransportStatus(SUCCESS_CODE, "");
+  }
+
+  @Override
+  public TransportStatus checkFileDigest(
+      IdentityInfo identityInfo, MetaInfo metaInfo, ByteBuffer digest) throws TException {
+    logger.debug("Invoke checkFileDigest method from client ip = {}", identityInfo.address);
+
+    String ipAddress = identityInfo.getAddress();
+    String uuid = identityInfo.getUuid();
+    synchronized (uuid.intern()) {
+      String fileName = metaInfo.fileName;
+      MessageDigest messageDigest = null;
+      try {
+        messageDigest = MessageDigest.getInstance("SHA-256");
+      } catch (NoSuchAlgorithmException e) {
+        logger.error(e.getMessage());
+        return new TransportStatus(ERROR_CODE, e.getMessage());
+      }
+
+      try (InputStream inputStream =
+          new FileInputStream(new File(getSyncedDir(ipAddress, uuid), fileName + ".patch"))) {
+        byte[] block = new byte[DATA_CHUNK_SIZE];
+        int length;
+        while ((length = inputStream.read(block)) > 0) {
+          messageDigest.update(block, 0, length);
+        }
+
+        String localDigest = (new BigInteger(1, messageDigest.digest())).toString(16);
+        byte[] digestBytes = new byte[digest.capacity()];
+        digest.get(digestBytes);
+        if (!Arrays.equals(messageDigest.digest(), digestBytes)) {
+          logger.error(
+              "The file {} digest check error. "
+                  + "The local digest is {} (should be equal to {}).",
+              fileName,
+              localDigest,
+              digest);
+          new File(getSyncedDir(ipAddress, uuid), fileName + ".record").delete();
+          return new TransportStatus(CONFLICT_CODE, "File digest check error.");
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        return new TransportStatus(ERROR_CODE, e.getMessage());
+      }
+
+      return new TransportStatus(SUCCESS_CODE, "");
+    }
+  }
+
+  @Override
+  public SyncResponse heartbeat(IdentityInfo identityInfo, SyncRequest syncRequest)
+      throws TException {
+
+    // return ReceiverService.getInstance().recMsg(syncRequest);
+    return null;
+  }
+
+  private void writeRecordFile(File recordFile, long position) throws IOException {
+    File tmpFile = new File(recordFile.getAbsolutePath() + ".tmp");
+    FileWriter fileWriter = new FileWriter(tmpFile, false);
+    fileWriter.write(String.valueOf(position));
+    fileWriter.close();
+    Files.move(tmpFile.toPath(), recordFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+  }
+
+  /**
+   * release resources or cleanup when a client (a sender) is disconnected (normally or abnormally).
+   */
+  public void handleClientExit() {
+    // TODO: Handle client exit here.
+    // do nothing now
+  }
+}
diff --git a/thrift-sync/src/main/thrift/transport.thrift b/thrift-sync/src/main/thrift/transport.thrift
new file mode 100644
index 0000000..3eccde3
--- /dev/null
+++ b/thrift-sync/src/main/thrift/transport.thrift
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+namespace java org.apache.iotdb.service.transport.thrift
+namespace py iotdb.thrift.transport
+
+struct TransportStatus{
+  1:required i32 code
+  2:required string msg
+}
+
+// The sender and receiver need to check some info to confirm validity
+struct IdentityInfo{
+  // Check whether the ip of sender is in the white list of receiver.
+  1:required string address
+
+  // Sender needs to tell receiver its identity.
+  2:required string uuid
+
+  // The version of sender and receiver need to be the same.
+  3:required string version
+}
+
+enum Type {
+  TSFILE,
+  DELETION,
+  PHYSICALPLAN,
+  FILE
+}
+
+struct MetaInfo{
+  // The type of the pipeData in sending.
+  1:required Type type
+
+  // The name of the file in sending.
+  2:required string fileName
+
+  // The start index of the file slice in sending.
+  3:required i64 startIndex
+}
+
+struct SyncRequest{
+  1:required i32 code
+  2:required string msg
+}
+
+struct SyncResponse{
+  1:required i32 code
+  2:required string msg
+}
+
+service TransportService{
+  TransportStatus handshake(IdentityInfo info);
+  TransportStatus transportData(1:IdentityInfo identityInfo, 2:MetaInfo metaInfo, 3:binary buff, 4:binary digest);
+  TransportStatus checkFileDigest(1:IdentityInfo identityInfo, 2:MetaInfo metaInfo, 3:binary digest);
+  // TransportStatus finishTransportFile(1:IdentityInfo identityInfo, 2:MetaInfo metaInfo)
+  SyncResponse heartbeat(1:IdentityInfo identityInfo, 2:SyncRequest syncRequest)
+}