You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/08/17 07:51:58 UTC

[iotdb] branch master updated: [IOTDB-4135] Merge thrift-sync into ClientRPC (#7004)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 42b00d63c1 [IOTDB-4135] Merge thrift-sync into ClientRPC (#7004)
42b00d63c1 is described below

commit 42b00d63c17a0f232fc97253f737c59bc48be27a
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Wed Aug 17 15:51:53 2022 +0800

    [IOTDB-4135] Merge thrift-sync into ClientRPC (#7004)
---
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  15 +-
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |   4 -
 .../db/integration/sync/IoTDBSyncReceiverIT.java   |  25 +--
 node-commons/pom.xml                               |   5 -
 .../apache/iotdb/commons/sync/SyncConstant.java    |   6 -
 .../apache/iotdb/commons/sync/SyncPathUtil.java    |   6 +-
 pom.xml                                            |   1 -
 .../resources/conf/iotdb-datanode.properties       |   4 -
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  11 --
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   9 -
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  31 ----
 .../org/apache/iotdb/db/qp/logical/Operator.java   |   2 -
 .../db/qp/logical/sys/ShowPipeServerOperator.java  |  38 -----
 .../db/qp/logical/sys/StartPipeServerOperator.java |  38 -----
 .../db/qp/logical/sys/StopPipeServerOperator.java  |  38 -----
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |   8 -
 .../db/qp/physical/sys/ShowPipeServerPlan.java     |  26 ---
 .../apache/iotdb/db/qp/physical/sys/ShowPlan.java  |   1 -
 .../db/qp/physical/sys/StartPipeServerPlan.java    |  56 -------
 .../db/qp/physical/sys/StopPipeServerPlan.java     |  56 -------
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |  18 --
 .../service/thrift/impl/ClientRPCServiceImpl.java  |  23 +++
 .../db/service/thrift/impl/TSServiceImpl.java      |  21 +++
 .../java/org/apache/iotdb/db/sync/SyncService.java | 127 ++++++--------
 .../iotdb/db/sync/common/ISyncInfoFetcher.java     |   8 -
 .../iotdb/db/sync/common/LocalSyncInfoFetcher.java |  28 ----
 .../org/apache/iotdb/db/sync/common/SyncInfo.java  |  33 +---
 .../db/sync/common/persistence/SyncLogReader.java  |  13 --
 .../db/sync/common/persistence/SyncLogWriter.java  |  14 --
 .../db/sync/sender/service/TransportHandler.java   |   4 +-
 .../db/sync/transport/client/ClientWrapper.java    |  25 ++-
 ...rtClient.java => IoTDBSinkTransportClient.java} |  52 +++---
 ...nsportServiceImpl.java => ReceiverManager.java} | 183 +++++++++++++--------
 .../transport/server/TransportServerManager.java   | 137 ---------------
 .../server/TransportServerManagerMBean.java        |  34 ----
 .../server/TransportServerThriftHandler.java       |  71 --------
 .../iotdb/db/qp/physical/PhysicalPlanTest.java     |  28 ----
 .../db/sync/receiver/manager/SyncInfoTest.java     |   1 -
 .../db/sync/receiver/recovery/SyncLogTest.java     |   4 -
 ...portServiceTest.java => SyncTransportTest.java} |  21 +--
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   3 +
 thrift-sync/README.md                              |  22 ---
 thrift-sync/pom.xml                                |  62 -------
 thrift-sync/rpc-changelist.md                      | 181 --------------------
 thrift-sync/src/main/thrift/transport.thrift       |  63 -------
 thrift/src/main/thrift/client.thrift               |  33 ++++
 46 files changed, 310 insertions(+), 1279 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 18c4115d9c..71f210c4c9 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -65,8 +65,7 @@ utilityStatement
     | loadConfiguration | loadTimeseries | loadFile | removeFile | unloadFile;
 
 syncStatement
-    : startPipeServer | stopPipeServer | showPipeServer
-    | createPipeSink | showPipeSinkType | showPipeSink | dropPipeSink
+    : createPipeSink | showPipeSinkType | showPipeSink | dropPipeSink
     | createPipe | showPipe | stopPipe | startPipe | dropPipe;
 
 /**
@@ -764,18 +763,6 @@ syncAttributeClauses
     : attributePair (COMMA attributePair)*
     ;
 
-// sync receiver
-startPipeServer
-    : START PIPESERVER
-    ;
-
-stopPipeServer
-    : STOP PIPESERVER
-    ;
-
-showPipeServer
-    : SHOW PIPESERVER
-    ;
 
 /**
  * 7. Common Clauses
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index 5ad902f8e5..65efbcecf8 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -379,10 +379,6 @@ PIPES
     : P I P E S
     ;
 
-PIPESERVER
-    : P I P E S E R V E R
-    ;
-
 PIPESINK
     : P I P E S I N K
     ;
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
index 016ccb84c7..67a4ab4618 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
@@ -21,19 +21,17 @@ package org.apache.iotdb.db.integration.sync;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.exception.sync.PipeServerException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.db.sync.SyncService;
 import org.apache.iotdb.db.sync.pipedata.DeletionPipeData;
 import org.apache.iotdb.db.sync.pipedata.PipeData;
 import org.apache.iotdb.db.sync.pipedata.SchemaPipeData;
 import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
 import org.apache.iotdb.db.sync.sender.pipe.Pipe;
 import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
-import org.apache.iotdb.db.sync.transport.client.IoTDBSInkTransportClient;
+import org.apache.iotdb.db.sync.transport.client.IoTDBSinkTransportClient;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.itbase.category.LocalStandaloneTest;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -51,7 +49,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.net.Socket;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -71,7 +68,7 @@ public class IoTDBSyncReceiverIT {
   String remoteIp1;
   long createdTime1 = System.currentTimeMillis();
   String showPipeSql = "SHOW PIPE";
-  IoTDBSInkTransportClient client;
+  IoTDBSinkTransportClient client;
 
   @Before
   public void setUp() throws Exception {
@@ -94,15 +91,9 @@ public class IoTDBSyncReceiverIT {
     FileUtils.moveDirectory(srcDir, tmpDir);
     EnvironmentUtils.cleanEnv();
     EnvironmentUtils.envSetUp();
-    try {
-      SyncService.getInstance().startPipeServer(true);
-      new Socket("localhost", 6670).close();
-    } catch (Exception e) {
-      Assert.fail("Failed to start pipe server because " + e.getMessage());
-    }
     Pipe pipe = new TsFilePipe(createdTime1, pipeName1, null, 0, false);
     remoteIp1 = "127.0.0.1";
-    client = new IoTDBSInkTransportClient(pipe, remoteIp1, 6670, "127.0.0.1");
+    client = new IoTDBSinkTransportClient(pipe, remoteIp1, 6667, "127.0.0.1");
     client.handshake();
   }
 
@@ -120,16 +111,6 @@ public class IoTDBSyncReceiverIT {
     EnvironmentUtils.cleanEnv();
   }
 
-  @Test
-  public void testStopPipeServer() {
-    logger.info("testStopPipeServerCheck");
-    try {
-      SyncService.getInstance().stopPipeServer();
-    } catch (PipeServerException e) {
-      Assert.fail("Can not stop pipe server");
-    }
-  }
-
   @Test
   public void testReceiveDataAndLoad() {
     logger.info("testReceiveDataAndLoad");
diff --git a/node-commons/pom.xml b/node-commons/pom.xml
index 21ae448050..737ade8e06 100644
--- a/node-commons/pom.xml
+++ b/node-commons/pom.xml
@@ -77,11 +77,6 @@
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.iotdb</groupId>
-            <artifactId>iotdb-thrift</artifactId>
-            <version>${project.version}</version>
-        </dependency>
         <dependency>
             <groupId>org.apache.iotdb</groupId>
             <artifactId>iotdb-thrift-confignode</artifactId>
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncConstant.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncConstant.java
index 8abb84765f..bf9f4afb09 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncConstant.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncConstant.java
@@ -72,12 +72,6 @@ public class SyncConstant {
   public static final int DATA_CHUNK_SIZE =
       Math.min(16 * 1024 * 1024, RpcUtils.THRIFT_FRAME_MAX_SIZE);
 
-  public static final int SUCCESS_CODE = 1;
-  public static final int ERROR_CODE = -1;
-  public static final int REBASE_CODE = -2;
-  public static final int RETRY_CODE = -3;
-  public static final int CONFLICT_CODE = -4;
-
   /** receiver */
   public static final String RECEIVER_DIR_NAME = "receiver";
 
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncPathUtil.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncPathUtil.java
index 323cc635b0..f10fc43f2d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncPathUtil.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncPathUtil.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.commons.sync;
 
 import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.service.transport.thrift.IdentityInfo;
+import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
 
 import java.io.File;
 import java.io.IOException;
@@ -105,12 +105,12 @@ public class SyncPathUtil {
         + SyncConstant.FILE_DATA_DIR_NAME;
   }
 
-  public static String getFileDataDirPath(IdentityInfo identityInfo) {
+  public static String getFileDataDirPath(TSyncIdentityInfo identityInfo) {
     return SyncPathUtil.getReceiverFileDataDir(
         identityInfo.getPipeName(), identityInfo.getAddress(), identityInfo.getCreateTime());
   }
 
-  public static String getPipeLogDirPath(IdentityInfo identityInfo) {
+  public static String getPipeLogDirPath(TSyncIdentityInfo identityInfo) {
     return SyncPathUtil.getReceiverPipeLogDir(
         identityInfo.getPipeName(), identityInfo.getAddress(), identityInfo.getCreateTime());
   }
diff --git a/pom.xml b/pom.xml
index 198403d10c..d0f831bdbe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -85,7 +85,6 @@
         <module>thrift-commons</module>
         <module>thrift-confignode</module>
         <module>thrift-multi-leader-consensus</module>
-        <module>thrift-sync</module>
         <module>thrift-influxdb</module>
         <module>service-rpc</module>
         <module>jdbc</module>
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 9a70ba7680..e24b1d9ed8 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -675,10 +675,6 @@ timestamp_precision=ms
 ####################
 ### PIPE Server Configuration
 ####################
-# PIPE server port to listen
-# Datatype: int
-# pipe_server_port=6670
-
 # White IP list of Sync client.
 # Please use the form of network segment to present the range of IP, for example: 192.168.0.0/16
 # If there are more than one IP segment, please separate them by commas
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 2aa2b771ac..39bbe10647 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -501,9 +501,6 @@ public class IoTDBConfig {
    */
   private int externalSortThreshold = 1000;
 
-  /** If this IoTDB instance is a receiver of sync, set the server port. */
-  private int pipeServerPort = 6670;
-
   /** White list for sync */
   private String ipWhiteList = "0.0.0.0/0";
 
@@ -1441,14 +1438,6 @@ public class IoTDBConfig {
     this.mRemoteSchemaCacheSize = mRemoteSchemaCacheSize;
   }
 
-  public int getPipeServerPort() {
-    return pipeServerPort;
-  }
-
-  public void setPipeServerPort(int pipeServerPort) {
-    this.pipeServerPort = pipeServerPort;
-  }
-
   public int getMaxNumberOfSyncFileRetry() {
     return maxNumberOfSyncFileRetry;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index dec138deb1..dc0db70e5d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -417,11 +417,6 @@ public class IoTDBDescriptor {
               properties.getProperty(
                   "session_timeout_threshold",
                   Integer.toString(conf.getSessionTimeoutThreshold()))));
-      conf.setPipeServerPort(
-          Integer.parseInt(
-              properties
-                  .getProperty("pipe_server_port", Integer.toString(conf.getPipeServerPort()))
-                  .trim()));
       conf.setMaxNumberOfSyncFileRetry(
           Integer.parseInt(
               properties
@@ -1417,10 +1412,6 @@ public class IoTDBDescriptor {
                   String.valueOf(conf.getSelectIntoInsertTabletPlanRowLimit()))));
 
       // update sync config
-      conf.setPipeServerPort(
-          Integer.parseInt(
-              properties.getProperty(
-                  "pipe_server_port", String.valueOf(conf.getPipeServerPort()))));
       conf.setMaxNumberOfSyncFileRetry(
           Integer.parseInt(
               properties
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 04f57311fb..96bdb1bc88 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -62,7 +62,6 @@ import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sync.PipeException;
-import org.apache.iotdb.db.exception.sync.PipeServerException;
 import org.apache.iotdb.db.exception.sync.PipeSinkException;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
@@ -133,7 +132,6 @@ import org.apache.iotdb.db.qp.physical.sys.ShowNodesInTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPathsSetTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPathsUsingTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPipePlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPipeSinkPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowStorageGroupPlan;
@@ -433,10 +431,6 @@ public class PlanExecutor implements IPlanExecutor {
       case DROP_PIPESINK:
         dropPipeSink((DropPipeSinkPlan) plan);
         return true;
-      case START_PIPE_SERVER:
-        return operateStartPipeServer();
-      case STOP_PIPE_SERVER:
-        return operateStopPipeServer();
       case CREATE_PIPE:
         createPipe((CreatePipePlan) plan);
         return true;
@@ -451,24 +445,6 @@ public class PlanExecutor implements IPlanExecutor {
     }
   }
 
-  private boolean operateStopPipeServer() throws QueryProcessException {
-    try {
-      SyncService.getInstance().stopPipeServer();
-    } catch (PipeServerException e) {
-      throw new QueryProcessException(e);
-    }
-    return true;
-  }
-
-  private boolean operateStartPipeServer() throws QueryProcessException {
-    try {
-      SyncService.getInstance().startPipeServer(false);
-    } catch (PipeServerException e) {
-      throw new QueryProcessException(e);
-    }
-    return true;
-  }
-
   private boolean createTemplate(CreateTemplatePlan createTemplatePlan)
       throws QueryProcessException {
     try {
@@ -764,8 +740,6 @@ public class PlanExecutor implements IPlanExecutor {
         return processShowPipeSink((ShowPipeSinkPlan) showPlan);
       case PIPESINKTYPE:
         return processShowPipeSinkType();
-      case PIPESERVER:
-        return processShowPipeServer((ShowPipeServerPlan) showPlan);
       case PIPE:
         return processShowPipes((ShowPipePlan) showPlan);
       default:
@@ -773,10 +747,6 @@ public class PlanExecutor implements IPlanExecutor {
     }
   }
 
-  private QueryDataSet processShowPipeServer(ShowPipeServerPlan plan) {
-    return SyncService.getInstance().showPipeServer(plan);
-  }
-
   private QueryDataSet processCountNodes(CountPlan countPlan) throws MetadataException {
     int num =
         getNodesNumInGivenLevel(
@@ -1344,7 +1314,6 @@ public class PlanExecutor implements IPlanExecutor {
                 TSDataType.TEXT,
                 TSDataType.TEXT));
     SyncService.getInstance().showPipe(plan, listDataSet);
-    SyncService.getInstance().showPipe(plan, listDataSet);
     // sort by create time
     listDataSet.sort(Comparator.comparing(o -> o.getFields().get(0).getStringValue()));
     return listDataSet;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index 55cf2828b6..48fad9c0a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -205,8 +205,6 @@ public abstract class Operator {
     STOP_PIPE,
     START_PIPE,
     DROP_PIPE,
-    START_PIPE_SERVER,
-    STOP_PIPE_SERVER,
 
     ACTIVATE_TEMPLATE_IN_CLUSTER
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeServerOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeServerOperator.java
deleted file mode 100644
index 310c909692..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeServerOperator.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.qp.logical.sys;
-
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
-import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
-
-public class ShowPipeServerOperator extends ShowOperator {
-
-  public ShowPipeServerOperator(int tokenIntType) {
-    super(tokenIntType);
-  }
-
-  @Override
-  public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
-      throws QueryProcessException {
-    return new ShowPipeServerPlan();
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StartPipeServerOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StartPipeServerOperator.java
deleted file mode 100644
index e23833a0dd..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StartPipeServerOperator.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.qp.logical.sys;
-
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.StartPipeServerPlan;
-import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
-
-public class StartPipeServerOperator extends Operator {
-  public StartPipeServerOperator(int tokenIntType) {
-    super(tokenIntType);
-    operatorType = OperatorType.START_PIPE_SERVER;
-  }
-
-  @Override
-  public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
-      throws QueryProcessException {
-    return new StartPipeServerPlan();
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StopPipeServerOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StopPipeServerOperator.java
deleted file mode 100644
index 76746c5fcb..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StopPipeServerOperator.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.qp.logical.sys;
-
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.StopPipeServerPlan;
-import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
-
-public class StopPipeServerOperator extends Operator {
-  public StopPipeServerOperator(int tokenIntType) {
-    super(tokenIntType);
-    operatorType = OperatorType.STOP_PIPE_SERVER;
-  }
-
-  @Override
-  public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
-      throws QueryProcessException {
-    return new StopPipeServerPlan();
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index d74059e946..e49be4304e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -70,9 +70,7 @@ import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.StartPipeServerPlan;
 import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan;
-import org.apache.iotdb.db.qp.physical.sys.StopPipeServerPlan;
 import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan;
 import org.apache.iotdb.db.qp.physical.sys.StorageGroupMNodePlan;
 import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
@@ -490,12 +488,6 @@ public abstract class PhysicalPlan implements IConsensusRequest {
         case SET_SYSTEM_MODE:
           plan = new SetSystemModePlan();
           break;
-        case START_PIPE_SERVER:
-          plan = new StartPipeServerPlan();
-          break;
-        case STOP_PIPE_SERVER:
-          plan = new StopPipeServerPlan();
-          break;
         case ACTIVATE_TEMPLATE_IN_CLUSTER:
           plan = new ActivateTemplateInClusterPlan();
           break;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeServerPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeServerPlan.java
deleted file mode 100644
index 67b664c1b7..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPipeServerPlan.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.qp.physical.sys;
-
-public class ShowPipeServerPlan extends ShowPlan {
-
-  public ShowPipeServerPlan() {
-    super(ShowContentType.PIPESERVER);
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
index c7c7b7a019..0398a2f014 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
@@ -130,6 +130,5 @@ public class ShowPlan extends PhysicalPlan {
     PIPESINK,
     PIPESINKTYPE,
     PIPE,
-    PIPESERVER
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StartPipeServerPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StartPipeServerPlan.java
deleted file mode 100644
index b3f2f8b4c6..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StartPipeServerPlan.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.qp.physical.sys;
-
-import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-
-public class StartPipeServerPlan extends PhysicalPlan {
-
-  public StartPipeServerPlan() {
-    super(Operator.OperatorType.START_PIPE_SERVER);
-    canBeSplit = false;
-  }
-
-  @Override
-  public List<? extends PartialPath> getPaths() {
-    return Collections.emptyList();
-  }
-
-  @Override
-  public void serialize(DataOutputStream stream) throws IOException {
-    stream.writeByte((byte) PhysicalPlanType.START_PIPE_SERVER.ordinal());
-  }
-
-  @Override
-  public void serializeImpl(ByteBuffer buffer) {
-    buffer.put((byte) PhysicalPlanType.START_PIPE_SERVER.ordinal());
-  }
-
-  @Override
-  public void deserialize(ByteBuffer buffer) throws IllegalPathException {}
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StopPipeServerPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StopPipeServerPlan.java
deleted file mode 100644
index 2fe41ba880..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StopPipeServerPlan.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.qp.physical.sys;
-
-import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-
-public class StopPipeServerPlan extends PhysicalPlan {
-
-  public StopPipeServerPlan() {
-    super(Operator.OperatorType.STOP_PIPE_SERVER);
-    canBeSplit = false;
-  }
-
-  @Override
-  public List<? extends PartialPath> getPaths() {
-    return Collections.emptyList();
-  }
-
-  @Override
-  public void serialize(DataOutputStream stream) throws IOException {
-    stream.writeByte((byte) PhysicalPlanType.STOP_PIPE_SERVER.ordinal());
-  }
-
-  @Override
-  public void serializeImpl(ByteBuffer buffer) {
-    buffer.put((byte) PhysicalPlanType.STOP_PIPE_SERVER.ordinal());
-  }
-
-  @Override
-  public void deserialize(ByteBuffer buffer) throws IllegalPathException {}
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
index f348f44e9e..fd3ce0b205 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
@@ -133,7 +133,6 @@ import org.apache.iotdb.db.qp.logical.sys.ShowOperator;
 import org.apache.iotdb.db.qp.logical.sys.ShowPathsSetTemplateOperator;
 import org.apache.iotdb.db.qp.logical.sys.ShowPathsUsingTemplateOperator;
 import org.apache.iotdb.db.qp.logical.sys.ShowPipeOperator;
-import org.apache.iotdb.db.qp.logical.sys.ShowPipeServerOperator;
 import org.apache.iotdb.db.qp.logical.sys.ShowPipeSinkOperator;
 import org.apache.iotdb.db.qp.logical.sys.ShowPipeSinkTypeOperator;
 import org.apache.iotdb.db.qp.logical.sys.ShowQueryResourceOperate;
@@ -143,10 +142,8 @@ import org.apache.iotdb.db.qp.logical.sys.ShowTemplatesOperator;
 import org.apache.iotdb.db.qp.logical.sys.ShowTimeSeriesOperator;
 import org.apache.iotdb.db.qp.logical.sys.ShowTriggersOperator;
 import org.apache.iotdb.db.qp.logical.sys.StartPipeOperator;
-import org.apache.iotdb.db.qp.logical.sys.StartPipeServerOperator;
 import org.apache.iotdb.db.qp.logical.sys.StartTriggerOperator;
 import org.apache.iotdb.db.qp.logical.sys.StopPipeOperator;
-import org.apache.iotdb.db.qp.logical.sys.StopPipeServerOperator;
 import org.apache.iotdb.db.qp.logical.sys.StopTriggerOperator;
 import org.apache.iotdb.db.qp.logical.sys.UnSetTTLOperator;
 import org.apache.iotdb.db.qp.logical.sys.UnloadFileOperator;
@@ -2531,21 +2528,6 @@ public class IoTDBSqlVisitor extends IoTDBSqlParserBaseVisitor<Operator> {
     }
   }
 
-  @Override
-  public Operator visitStartPipeServer(IoTDBSqlParser.StartPipeServerContext ctx) {
-    return new StartPipeServerOperator(SQLConstant.TOK_PIPE_SERVER_START);
-  }
-
-  @Override
-  public Operator visitStopPipeServer(IoTDBSqlParser.StopPipeServerContext ctx) {
-    return new StopPipeServerOperator(SQLConstant.TOK_PIPE_SERVER_STOP);
-  }
-
-  @Override
-  public Operator visitShowPipeServer(IoTDBSqlParser.ShowPipeServerContext ctx) {
-    return new ShowPipeServerOperator(SQLConstant.TOK_SHOW_PIPE_SERVER);
-  }
-
   /** 7. Common Clauses */
 
   // IoTDB Objects
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 3377225280..f8511ce804 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -59,6 +59,7 @@ import org.apache.iotdb.db.query.control.SessionTimeoutManager;
 import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
 import org.apache.iotdb.db.service.metrics.MetricService;
 import org.apache.iotdb.db.service.metrics.enums.Operation;
+import org.apache.iotdb.db.sync.SyncService;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
 import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
 import org.apache.iotdb.metrics.utils.MetricLevel;
@@ -103,6 +104,8 @@ import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
 import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
+import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
@@ -111,6 +114,7 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -1462,6 +1466,24 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public TSStatus handshake(TSyncIdentityInfo info) throws TException {
+    // TODO(sync): Check permissions here
+    return SyncService.getInstance().handshake(info);
+  }
+
+  @Override
+  public TSStatus transportData(TSyncTransportMetaInfo metaInfo, ByteBuffer buff, ByteBuffer digest)
+      throws TException {
+    return SyncService.getInstance().transportData(metaInfo, buff, digest);
+  }
+
+  @Override
+  public TSStatus checkFileDigest(TSyncTransportMetaInfo metaInfo, ByteBuffer digest)
+      throws TException {
+    return SyncService.getInstance().checkFileDigest(metaInfo, digest);
+  }
+
   @Override
   public TSStatus insertStringRecord(TSInsertStringRecordReq req) {
     long t1 = System.currentTimeMillis();
@@ -1550,6 +1572,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
       closeSession(req);
     }
+    SyncService.getInstance().handleClientExit();
   }
 
   private void cleanupQueryExecution(Long queryId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index b32a23c28e..55aa206c5b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -74,6 +74,7 @@ import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
 import org.apache.iotdb.db.service.basic.ServiceProvider;
 import org.apache.iotdb.db.service.metrics.MetricService;
 import org.apache.iotdb.db.service.metrics.enums.Operation;
+import org.apache.iotdb.db.sync.SyncService;
 import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
@@ -122,6 +123,8 @@ import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
 import org.apache.iotdb.service.rpc.thrift.TSTracingInfo;
 import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
+import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -1119,6 +1122,7 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
       TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
       closeSession(req);
     }
+    SyncService.getInstance().handleClientExit();
   }
 
   @Override
@@ -2103,6 +2107,23 @@ public class TSServiceImpl implements IClientRPCServiceWithHandler {
     return status != null ? status : executeNonQueryPlan(plan);
   }
 
+  @Override
+  public TSStatus handshake(TSyncIdentityInfo info) throws TException {
+    return SyncService.getInstance().handshake(info);
+  }
+
+  @Override
+  public TSStatus transportData(TSyncTransportMetaInfo metaInfo, ByteBuffer buff, ByteBuffer digest)
+      throws TException {
+    return SyncService.getInstance().transportData(metaInfo, buff, digest);
+  }
+
+  @Override
+  public TSStatus checkFileDigest(TSyncTransportMetaInfo metaInfo, ByteBuffer digest)
+      throws TException {
+    return SyncService.getInstance().checkFileDigest(metaInfo, digest);
+  }
+
   protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
     try {
       return serviceProvider.executeNonQuery(plan)
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
index f78978855d..46eaa93eab 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
@@ -23,18 +23,15 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.ShutdownException;
 import org.apache.iotdb.commons.exception.StartupException;
-import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.sync.SyncConstant;
 import org.apache.iotdb.commons.sync.SyncPathUtil;
 import org.apache.iotdb.db.exception.sync.PipeException;
-import org.apache.iotdb.db.exception.sync.PipeServerException;
 import org.apache.iotdb.db.exception.sync.PipeSinkException;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPipePlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
 import org.apache.iotdb.db.qp.utils.DatetimeUtils;
 import org.apache.iotdb.db.query.dataset.ListDataSet;
 import org.apache.iotdb.db.sync.common.ISyncInfoFetcher;
@@ -50,27 +47,26 @@ import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
 import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
 import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
 import org.apache.iotdb.db.sync.sender.service.TransportHandler;
-import org.apache.iotdb.db.sync.transport.server.TransportServerManager;
+import org.apache.iotdb.db.sync.transport.server.ReceiverManager;
 import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
 import org.apache.iotdb.pipe.external.api.IExternalPipeSinkWriterFactory;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
+import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Field;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.utils.Binary;
 
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Collections;
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PIPESERVER_STATUS;
-
 public class SyncService implements IService {
   private static final Logger logger = LoggerFactory.getLogger(SyncService.class);
 
@@ -83,7 +79,12 @@ public class SyncService implements IService {
 
   private ISyncInfoFetcher syncInfoFetcher = LocalSyncInfoFetcher.getInstance();
 
-  private SyncService() {}
+  /* handle rpc in receiver-side*/
+  private ReceiverManager receiverManager;
+
+  private SyncService() {
+    receiverManager = new ReceiverManager();
+  }
 
   private static class SyncServiceHolder {
     private static final SyncService INSTANCE = new SyncService();
@@ -95,6 +96,30 @@ public class SyncService implements IService {
     return SyncServiceHolder.INSTANCE;
   }
 
+  // region Interfaces and Implementation of Transport Layer
+
+  public TSStatus handshake(TSyncIdentityInfo identityInfo) {
+    return receiverManager.handshake(identityInfo);
+  }
+
+  public TSStatus transportData(TSyncTransportMetaInfo metaInfo, ByteBuffer buff, ByteBuffer digest)
+      throws TException {
+    return receiverManager.transportData(metaInfo, buff, digest);
+  }
+
+  // TODO: this will be deleted later
+  public TSStatus checkFileDigest(TSyncTransportMetaInfo metaInfo, ByteBuffer digest)
+      throws TException {
+    return receiverManager.checkFileDigest(metaInfo, digest);
+  }
+
+  public void handleClientExit() {
+    // Handle client exit here.
+    receiverManager.handleClientExit();
+  }
+
+  // endregion
+
   // region Interfaces and Implementation of PipeSink
 
   public PipeSink getPipeSink(String name) {
@@ -121,60 +146,6 @@ public class SyncService implements IService {
 
   // endregion
 
-  // region Interfaces and Implementation of PipeServer
-
-  /**
-   * start receiver service
-   *
-   * @param isRecovery if isRecovery, it will ignore check and force a start
-   */
-  public synchronized void startPipeServer(boolean isRecovery) throws PipeServerException {
-    if (syncInfoFetcher.isPipeServerEnable() && !isRecovery) {
-      return;
-    }
-    try {
-      TransportServerManager.getInstance().startService();
-      TSStatus status = syncInfoFetcher.startPipeServer();
-      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        throw new PipeServerException("Failed to start pipe server because " + status.getMessage());
-      }
-    } catch (StartupException e) {
-      throw new PipeServerException("Failed to start pipe server because " + e.getMessage());
-    }
-  }
-
-  /** stop receiver service */
-  public synchronized void stopPipeServer() throws PipeServerException {
-    if (!syncInfoFetcher.isPipeServerEnable()) {
-      return;
-    }
-    TransportServerManager.getInstance().stopService();
-    TSStatus status = syncInfoFetcher.stopPipeServer();
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new PipeServerException("Failed to stop pipe server because " + status.getMessage());
-    }
-  }
-
-  /**
-   * query by sql SHOW PIPESERVER STATUS
-   *
-   * @return QueryDataSet contained one column: enable
-   */
-  public QueryDataSet showPipeServer(ShowPipeServerPlan plan) {
-    ListDataSet dataSet =
-        new ListDataSet(
-            Collections.singletonList(new PartialPath(COLUMN_PIPESERVER_STATUS, false)),
-            Collections.singletonList(TSDataType.BOOLEAN));
-    RowRecord rowRecord = new RowRecord(0);
-    Field status = new Field(TSDataType.BOOLEAN);
-    status.setBoolV(syncInfoFetcher.isPipeServerEnable());
-    rowRecord.addField(status);
-    dataSet.putRecord(rowRecord);
-    return dataSet;
-  }
-
-  // endregion
-
   // region Interfaces and Implementation of Pipe
 
   public synchronized void addPipe(CreatePipePlan plan) throws PipeException {
@@ -334,6 +305,7 @@ public class SyncService implements IService {
 
   public void showPipe(ShowPipePlan plan, ListDataSet listDataSet) {
     boolean showAll = "".equals(plan.getPipeName());
+    // show pipe in sender
     for (PipeInfo pipe : SyncService.getInstance().getAllPipeInfos()) {
       if (showAll || plan.getPipeName().equals(pipe.getPipeName())) {
         RowRecord record = new RowRecord(0);
@@ -376,7 +348,23 @@ public class SyncService implements IService {
         listDataSet.putRecord(record);
       }
     }
-    // TODO: implement show pipe in receiver
+    // show pipe in receiver
+    List<TSyncIdentityInfo> identityInfoList = receiverManager.getAllTSyncIdentityInfos();
+    for (TSyncIdentityInfo identityInfo : identityInfoList) {
+      // TODO(sync): Removing duplicate rows
+      RowRecord record = new RowRecord(0);
+      record.addField(
+          Binary.valueOf(DatetimeUtils.convertLongToDate(identityInfo.getCreateTime())),
+          TSDataType.TEXT);
+      record.addField(Binary.valueOf(identityInfo.getPipeName()), TSDataType.TEXT);
+      record.addField(Binary.valueOf(IoTDBConstant.SYNC_RECEIVER_ROLE), TSDataType.TEXT);
+      record.addField(Binary.valueOf(identityInfo.getAddress()), TSDataType.TEXT);
+      record.addField(Binary.valueOf(Pipe.PipeStatus.RUNNING.name()), TSDataType.TEXT);
+      record.addField(Binary.valueOf(""), TSDataType.TEXT);
+      record.addField(Binary.valueOf("N/A"), TSDataType.TEXT);
+      record.addField(Binary.valueOf("N/A"), TSDataType.TEXT);
+      listDataSet.putRecord(record);
+    }
   }
 
   // endregion
@@ -432,15 +420,6 @@ public class SyncService implements IService {
   /** IService * */
   @Override
   public void start() throws StartupException {
-    // recover receiver
-    if (syncInfoFetcher.isPipeServerEnable()) {
-      try {
-        startPipeServer(true);
-      } catch (PipeServerException e) {
-        throw new StartupException(e.getMessage());
-      }
-    }
-    // recover sender
     // == Check whether loading extPipe plugin successfully.
     ExtPipePluginRegister extPipePluginRegister = ExtPipePluginRegister.getInstance();
     if (extPipePluginRegister == null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
index b1bf9e1c90..f13483444d 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
@@ -29,14 +29,6 @@ import java.util.List;
 
 public interface ISyncInfoFetcher {
 
-  // region Interfaces of PipeServer
-  TSStatus startPipeServer();
-
-  TSStatus stopPipeServer();
-
-  boolean isPipeServerEnable();
-  // endregion
-
   // region Interfaces of PipeSink
   // TODO: use CreatePipeSinkNode as parameter
   TSStatus addPipeSink(CreatePipeSinkPlan plan);
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
index 93e12d4c38..e74c6748ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
@@ -46,34 +46,6 @@ public class LocalSyncInfoFetcher implements ISyncInfoFetcher {
     syncInfo = new SyncInfo();
   }
 
-  // region Interfaces of PipeServer
-  @Override
-  public TSStatus startPipeServer() {
-    try {
-      syncInfo.startServer();
-    } catch (IOException e) {
-      RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
-    }
-    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-  }
-
-  @Override
-  public TSStatus stopPipeServer() {
-    try {
-      syncInfo.stopServer();
-    } catch (IOException e) {
-      RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
-    }
-    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-  }
-
-  @Override
-  public boolean isPipeServerEnable() {
-    return syncInfo.isPipeServerEnable();
-  }
-
-  // endregion
-
   // region Implement of PipeSink
   @Override
   public TSStatus addPipeSink(CreatePipeSinkPlan plan) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java b/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java
index 8f00ca3ec9..936d4a7ea7 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java
@@ -47,7 +47,6 @@ public class SyncInfo {
 
   protected static final Logger LOGGER = LoggerFactory.getLogger(SyncInfo.class);
 
-  protected boolean pipeServerEnable;
   // <pipeFolderName, pipeMsg>
   protected Map<String, List<PipeMessage>> pipeMessageMap;
 
@@ -60,21 +59,19 @@ public class SyncInfo {
 
   public SyncInfo() {
     syncLogWriter = SyncLogWriter.getInstance();
-    SyncLogReader analyzer = new SyncLogReader();
+    SyncLogReader logReader = new SyncLogReader();
     try {
-      analyzer.recover();
-      pipeSinks = analyzer.getAllPipeSinks();
-      pipes = analyzer.getAllPipeInfos();
-      runningPipe = analyzer.getRunningPipeInfo();
-      pipeServerEnable = analyzer.isPipeServerEnable();
-      pipeMessageMap = analyzer.getPipeMessageMap();
+      logReader.recover();
+      pipeSinks = logReader.getAllPipeSinks();
+      pipes = logReader.getAllPipeInfos();
+      runningPipe = logReader.getRunningPipeInfo();
+      pipeMessageMap = logReader.getPipeMessageMap();
     } catch (StartupException e) {
       LOGGER.error(
           "Cannot recover ReceiverInfo because {}. Use default info values.", e.getMessage());
       pipeSinks = new ConcurrentHashMap<>();
       pipes = new ArrayList<>();
       pipeMessageMap = new ConcurrentHashMap<>();
-      pipeServerEnable = false;
     }
   }
 
@@ -82,20 +79,6 @@ public class SyncInfo {
     syncLogWriter.close();
   }
 
-  // region Implement of PipeServer
-
-  public void startServer() throws IOException {
-    pipeServerEnable = true;
-    syncLogWriter.startPipeServer();
-  }
-
-  public void stopServer() throws IOException {
-    pipeServerEnable = false;
-    syncLogWriter.stopPipeServer();
-  }
-
-  // endregion
-
   // region Implement of PipeSink
   private boolean isPipeSinkExist(String name) {
     return pipeSinks.containsKey(name);
@@ -280,10 +263,6 @@ public class SyncInfo {
     return message;
   }
 
-  public boolean isPipeServerEnable() {
-    return pipeServerEnable;
-  }
-
   private void createDir(String pipeName, String remoteIp, long createTime) {
     File f = new File(SyncPathUtil.getReceiverFileDataDir(pipeName, remoteIp, createTime));
     if (!f.exists()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
index 66f8fe3a03..ac5cd39397 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
@@ -44,8 +44,6 @@ import java.util.concurrent.ConcurrentHashMap;
 
 public class SyncLogReader {
   private static final Logger logger = LoggerFactory.getLogger(SyncLogReader.class);
-  // record recovery result of receiver server status
-  private boolean pipeServerEnable = false;
   // <pipeFolderName, pipeMsg>
   private Map<String, List<PipeMessage>> pipeMessageMap = new ConcurrentHashMap<>();
   // <pipeSinkName, PipeSink>
@@ -56,7 +54,6 @@ public class SyncLogReader {
   public void recover() throws StartupException {
     logger.info("Start to recover all sync state for sync.");
     this.pipeMessageMap = new ConcurrentHashMap<>();
-    this.pipeServerEnable = false;
     this.pipeSinks = new ConcurrentHashMap<>();
     this.pipes = new ArrayList<>();
     File serviceLogFile = new File(SyncPathUtil.getSysDir(), SyncConstant.SYNC_LOG_NAME);
@@ -86,10 +83,6 @@ public class SyncLogReader {
     }
   }
 
-  public boolean isPipeServerEnable() {
-    return pipeServerEnable;
-  }
-
   public Map<String, List<PipeMessage>> getPipeMessageMap() {
     return pipeMessageMap;
   }
@@ -149,12 +142,6 @@ public class SyncLogReader {
           case DROP_PIPE:
             runningPipe.drop();
             break;
-          case START_PIPE_SERVER:
-            pipeServerEnable = true;
-            break;
-          case STOP_PIPE_SERVER:
-            pipeServerEnable = false;
-            break;
           default:
             throw new UnsupportedOperationException(
                 String.format("Can not recognize type %s.", type.name()));
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
index 8cda3e7669..3408d2d5ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogWriter.java
@@ -54,20 +54,6 @@ public class SyncLogWriter {
     }
   }
 
-  public void startPipeServer() throws IOException {
-    getBufferedWriter();
-    pipeInfoWriter.write(Operator.OperatorType.START_PIPE_SERVER.name());
-    pipeInfoWriter.newLine();
-    pipeInfoWriter.flush();
-  }
-
-  public void stopPipeServer() throws IOException {
-    getBufferedWriter();
-    pipeInfoWriter.write(Operator.OperatorType.STOP_PIPE_SERVER.name());
-    pipeInfoWriter.newLine();
-    pipeInfoWriter.flush();
-  }
-
   public synchronized void addPipeSink(CreatePipeSinkPlan plan) throws IOException {
     getBufferedWriter();
     pipeInfoWriter.write(Operator.OperatorType.CREATE_PIPESINK.name());
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
index 6fb16770a1..d8f96ac28e 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
 import org.apache.iotdb.db.sync.sender.pipe.Pipe;
 import org.apache.iotdb.db.sync.transport.client.ITransportClient;
-import org.apache.iotdb.db.sync.transport.client.IoTDBSInkTransportClient;
+import org.apache.iotdb.db.sync.transport.client.IoTDBSinkTransportClient;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,7 +63,7 @@ public class TransportHandler {
 
     this.localIP = getLocalIP(pipeSink);
     this.transportClient =
-        new IoTDBSInkTransportClient(pipe, pipeSink.getIp(), pipeSink.getPort(), localIP);
+        new IoTDBSinkTransportClient(pipe, pipeSink.getIp(), pipeSink.getPort(), localIP);
   }
 
   private String getLocalIP(IoTDBPipeSink pipeSink) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ClientWrapper.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ClientWrapper.java
index 670c3b6ad6..b96d6b3ead 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ClientWrapper.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/ClientWrapper.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.sync.transport.client;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.sync.SyncConstant;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -25,9 +26,9 @@ import org.apache.iotdb.db.exception.SyncConnectionException;
 import org.apache.iotdb.db.sync.sender.pipe.Pipe;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.rpc.TConfigurationConst;
-import org.apache.iotdb.service.transport.thrift.IdentityInfo;
-import org.apache.iotdb.service.transport.thrift.TransportService;
-import org.apache.iotdb.service.transport.thrift.TransportStatus;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
+import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
 
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -38,14 +39,12 @@ import org.apache.thrift.transport.TTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.iotdb.commons.sync.SyncConstant.SUCCESS_CODE;
-
 public class ClientWrapper {
   private static final Logger logger = LoggerFactory.getLogger(ClientWrapper.class);
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
   private TTransport transport = null;
-  private volatile TransportService.Client serviceClient = null;
+  private volatile IClientRPCService.Client serviceClient = null;
 
   /* remote IP address*/
   private final String ipAddress;
@@ -63,7 +62,7 @@ public class ClientWrapper {
     this.localIP = localIP;
   }
 
-  public TransportService.Client getClient() {
+  public IClientRPCService.Client getClient() {
     return serviceClient;
   }
 
@@ -93,19 +92,19 @@ public class ClientWrapper {
       } else {
         protocol = new TBinaryProtocol(transport);
       }
-      serviceClient = new TransportService.Client(protocol);
+      serviceClient = new IClientRPCService.Client(protocol);
 
       // Underlay socket open.
       if (!transport.isOpen()) {
         transport.open();
       }
 
-      IdentityInfo identityInfo =
-          new IdentityInfo(
+      TSyncIdentityInfo identityInfo =
+          new TSyncIdentityInfo(
               localIP, pipe.getName(), pipe.getCreateTime(), config.getIoTDBMajorVersion());
-      TransportStatus status = serviceClient.handshake(identityInfo);
-      if (status.code != SUCCESS_CODE) {
-        logger.error("The receiver rejected the synchronization task because {}", status.msg);
+      TSStatus status = serviceClient.handshake(identityInfo);
+      if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        logger.error("The receiver rejected the synchronization task because {}", status.message);
         return false;
       }
     } catch (TException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSInkTransportClient.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSinkTransportClient.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSInkTransportClient.java
rename to server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSinkTransportClient.java
index b68cd44cf4..14c5da4b50 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSInkTransportClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSinkTransportClient.java
@@ -19,6 +19,7 @@
  */
 package org.apache.iotdb.db.sync.transport.client;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.sync.SyncConstant;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -29,9 +30,9 @@ import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
 import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
 import org.apache.iotdb.db.sync.sender.pipe.Pipe;
 import org.apache.iotdb.rpc.RpcTransportFactory;
-import org.apache.iotdb.service.transport.thrift.MetaInfo;
-import org.apache.iotdb.service.transport.thrift.TransportStatus;
-import org.apache.iotdb.service.transport.thrift.Type;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
+import org.apache.iotdb.service.rpc.thrift.TSyncTransportType;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -49,14 +50,11 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 
 import static org.apache.iotdb.commons.sync.SyncConstant.DATA_CHUNK_SIZE;
-import static org.apache.iotdb.commons.sync.SyncConstant.REBASE_CODE;
-import static org.apache.iotdb.commons.sync.SyncConstant.RETRY_CODE;
-import static org.apache.iotdb.commons.sync.SyncConstant.SUCCESS_CODE;
 import static org.apache.iotdb.db.sync.transport.conf.TransportConfig.isCheckFileDegistAgain;
 
-public class IoTDBSInkTransportClient implements ITransportClient {
+public class IoTDBSinkTransportClient implements ITransportClient {
 
-  private static final Logger logger = LoggerFactory.getLogger(IoTDBSInkTransportClient.class);
+  private static final Logger logger = LoggerFactory.getLogger(IoTDBSinkTransportClient.class);
 
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
@@ -80,7 +78,7 @@ public class IoTDBSInkTransportClient implements ITransportClient {
    * @param port remote port
    * @param localIP local ip address
    */
-  public IoTDBSInkTransportClient(Pipe pipe, String ipAddress, int port, String localIP) {
+  public IoTDBSinkTransportClient(Pipe pipe, String ipAddress, int port, String localIP) {
     RpcTransportFactory.setThriftMaxFrameSize(config.getThriftMaxFrameSize());
     this.pipe = pipe;
     this.ipAddress = ipAddress;
@@ -258,9 +256,10 @@ public class IoTDBSInkTransportClient implements ITransportClient {
           messageDigest.reset();
           messageDigest.update(buffer, 0, dataLength);
           ByteBuffer buffToSend = ByteBuffer.wrap(buffer, 0, dataLength);
-          MetaInfo metaInfo = new MetaInfo(Type.FILE, file.getName(), position);
+          TSyncTransportMetaInfo metaInfo =
+              new TSyncTransportMetaInfo(TSyncTransportType.FILE, file.getName(), position);
 
-          TransportStatus status = null;
+          TSStatus status = null;
           int retryCount = 0;
           while (true) {
             retryCount++;
@@ -283,21 +282,21 @@ public class IoTDBSInkTransportClient implements ITransportClient {
             break;
           }
 
-          if (status.code == REBASE_CODE) {
-            position = Long.parseLong(status.msg);
+          if (status.code == TSStatusCode.SYNC_FILE_REBASE.getStatusCode()) {
+            position = Long.parseLong(status.message);
             continue outer;
-          } else if (status.code == RETRY_CODE) {
+          } else if (status.code == TSStatusCode.SYNC_FILE_RETRY.getStatusCode()) {
             logger.info(
                 "Receiver failed to receive data from {} because {}, retry.",
                 file.getAbsoluteFile(),
-                status.msg);
+                status.message);
             continue outer;
-          } else if (status.code != SUCCESS_CODE) {
+          } else if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
             logger.info(
                 "Receiver failed to receive data from {} because {}, abort.",
                 file.getAbsoluteFile(),
-                status.msg);
-            throw new SyncConnectionException(status.msg);
+                status.message);
+            throw new SyncConnectionException(status.message);
           } else { // Success
             position += dataLength;
             if (position >= limit) {
@@ -339,8 +338,10 @@ public class IoTDBSInkTransportClient implements ITransportClient {
       }
     }
 
-    MetaInfo metaInfo = new MetaInfo(Type.FILE, file.getName(), 0);
-    TransportStatus status;
+    TSyncTransportMetaInfo metaInfo =
+        new TSyncTransportMetaInfo(TSyncTransportType.FILE, file.getName(), 0);
+
+    TSStatus status;
     int retryCount = 0;
 
     while (true) {
@@ -364,7 +365,7 @@ public class IoTDBSInkTransportClient implements ITransportClient {
       break;
     }
 
-    if (status.code != SUCCESS_CODE) {
+    if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       logger.error("Digest check of tsfile {} failed, retry", file.getAbsoluteFile());
       return false;
     }
@@ -392,14 +393,15 @@ public class IoTDBSInkTransportClient implements ITransportClient {
         messageDigest.update(buffer);
         ByteBuffer buffToSend = ByteBuffer.wrap(buffer);
 
-        MetaInfo metaInfo =
-            new MetaInfo(Type.findByValue(pipeData.getType().ordinal()), "fileName", 0);
-        TransportStatus status =
+        TSyncTransportMetaInfo metaInfo =
+            new TSyncTransportMetaInfo(
+                TSyncTransportType.findByValue(pipeData.getType().ordinal()), "fileName", 0);
+        TSStatus status =
             serviceClient
                 .getClient()
                 .transportData(metaInfo, buffToSend, ByteBuffer.wrap(messageDigest.digest()));
 
-        if (status.code == SUCCESS_CODE) {
+        if (status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           break;
         } else {
           logger.error("Digest check of pipeData failed, retry");
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
similarity index 72%
rename from server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java
rename to server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
index b4eabd3b2d..2a970a67b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
@@ -19,6 +19,7 @@
  */
 package org.apache.iotdb.db.sync.transport.server;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.sync.SyncConstant;
 import org.apache.iotdb.commons.sync.SyncPathUtil;
@@ -27,11 +28,11 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.sync.PipeDataLoadException;
 import org.apache.iotdb.db.sync.pipedata.PipeData;
 import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
-import org.apache.iotdb.service.transport.thrift.IdentityInfo;
-import org.apache.iotdb.service.transport.thrift.MetaInfo;
-import org.apache.iotdb.service.transport.thrift.TransportService;
-import org.apache.iotdb.service.transport.thrift.TransportStatus;
-import org.apache.iotdb.service.transport.thrift.Type;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
+import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
+import org.apache.iotdb.service.rpc.thrift.TSyncTransportType;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -52,31 +53,42 @@ import java.nio.file.StandardCopyOption;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.text.DecimalFormat;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
-import static org.apache.iotdb.commons.sync.SyncConstant.CONFLICT_CODE;
 import static org.apache.iotdb.commons.sync.SyncConstant.DATA_CHUNK_SIZE;
-import static org.apache.iotdb.commons.sync.SyncConstant.ERROR_CODE;
-import static org.apache.iotdb.commons.sync.SyncConstant.REBASE_CODE;
-import static org.apache.iotdb.commons.sync.SyncConstant.RETRY_CODE;
-import static org.apache.iotdb.commons.sync.SyncConstant.SUCCESS_CODE;
 
-public class TransportServiceImpl implements TransportService.Iface {
-  private static Logger logger = LoggerFactory.getLogger(TransportServiceImpl.class);
+/**
+ * This class is responsible for implementing the RPC processing on the receiver-side. It should
+ * only be accessed by the {@linkplain org.apache.iotdb.db.sync.SyncService}
+ */
+public class ReceiverManager {
+  private static Logger logger = LoggerFactory.getLogger(ReceiverManager.class);
 
   private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private static final String RECORD_SUFFIX = ".record";
   private static final String PATCH_SUFFIX = ".patch";
-  private final ThreadLocal<IdentityInfo> identityInfoThreadLocal;
-  private final Map<IdentityInfo, Integer> identityInfoCounter;
 
-  public TransportServiceImpl() {
-    identityInfoThreadLocal = new ThreadLocal<>();
-    identityInfoCounter = new ConcurrentHashMap<>();
+  // When the client abnormally exits, we can still know who to disconnect
+  private final ThreadLocal<Long> currentConnectionId;
+  // Record the remote message for every rpc connection
+  private final Map<Long, TSyncIdentityInfo> connectionIdToIdentityInfoMap;
+
+  // The sync connectionId is unique in one IoTDB instance.
+  private final AtomicLong connectionIdGenerator;
+
+  public ReceiverManager() {
+    currentConnectionId = new ThreadLocal<>();
+    connectionIdToIdentityInfoMap = new ConcurrentHashMap<>();
+    connectionIdGenerator = new AtomicLong();
   }
 
+  // region Interfaces and Implementation of Index Checker
+
   private class CheckResult {
     boolean result;
     String index;
@@ -138,21 +150,22 @@ public class TransportServiceImpl implements TransportService.Iface {
     return new CheckResult(true, "0");
   }
 
-  @Override
-  public TransportStatus handshake(IdentityInfo identityInfo) throws TException {
+  // endregion
+
+  // region Interfaces and Implementation of RPC Handler
+
+  public TSStatus handshake(TSyncIdentityInfo identityInfo) {
     logger.debug("Invoke handshake method from client ip = {}", identityInfo.address);
-    identityInfoThreadLocal.set(identityInfo);
-    identityInfoCounter.compute(identityInfo, (k, v) -> v == null ? 1 : v + 1);
     // check ip address
     if (!verifyIPSegment(config.getIpWhiteList(), identityInfo.address)) {
-      return new TransportStatus(
-          ERROR_CODE,
+      return RpcUtils.getStatus(
+          TSStatusCode.PIPESERVER_ERROR,
           "Sender IP is not in the white list of receiver IP and synchronization tasks are not allowed.");
     }
     // Version check
     if (!config.getIoTDBMajorVersion(identityInfo.version).equals(config.getIoTDBMajorVersion())) {
-      return new TransportStatus(
-          ERROR_CODE,
+      return RpcUtils.getStatus(
+          TSStatusCode.PIPESERVER_ERROR,
           String.format(
               "Version mismatch: the sender <%s>, the receiver <%s>",
               identityInfo.version, config.getIoTDBVersion()));
@@ -161,7 +174,8 @@ public class TransportServiceImpl implements TransportService.Iface {
     if (!new File(SyncPathUtil.getFileDataDirPath(identityInfo)).exists()) {
       new File(SyncPathUtil.getFileDataDirPath(identityInfo)).mkdirs();
     }
-    return new TransportStatus(SUCCESS_CODE, "");
+    createConnection(identityInfo);
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
   }
 
   /**
@@ -204,26 +218,29 @@ public class TransportServiceImpl implements TransportService.Iface {
     return ipAddressBinary.equals(ipSegmentBinary);
   }
 
-  @Override
-  public TransportStatus transportData(MetaInfo metaInfo, ByteBuffer buff, ByteBuffer digest) {
-    IdentityInfo identityInfo = identityInfoThreadLocal.get();
+  public TSStatus transportData(TSyncTransportMetaInfo metaInfo, ByteBuffer buff, ByteBuffer digest)
+      throws TException {
+    TSyncIdentityInfo identityInfo = getCurrentTSyncIdentityInfo();
+    if (identityInfo == null) {
+      throw new TException("Thrift connection is not alive.");
+    }
     logger.debug("Invoke transportData method from client ip = {}", identityInfo.address);
 
     String fileDir = SyncPathUtil.getFileDataDirPath(identityInfo);
-    Type type = metaInfo.type;
+    TSyncTransportType type = metaInfo.type;
     String fileName = metaInfo.fileName;
     long startIndex = metaInfo.startIndex;
 
     // Check file start index valid
-    if (type == Type.FILE) {
+    if (type == TSyncTransportType.FILE) {
       try {
         CheckResult result = checkStartIndexValid(new File(fileDir, fileName), startIndex);
         if (!result.isResult()) {
-          return new TransportStatus(REBASE_CODE, result.getIndex());
+          return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_REBASE, result.getIndex());
         }
       } catch (IOException e) {
         logger.error(e.getMessage());
-        return new TransportStatus(ERROR_CODE, e.getMessage());
+        return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
       }
     }
 
@@ -234,23 +251,23 @@ public class TransportServiceImpl implements TransportService.Iface {
       messageDigest = MessageDigest.getInstance("SHA-256");
     } catch (NoSuchAlgorithmException e) {
       logger.error(e.getMessage());
-      return new TransportStatus(ERROR_CODE, e.getMessage());
+      return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
     }
     messageDigest.update(buff);
     byte[] digestBytes = new byte[digest.capacity()];
     digest.get(digestBytes);
     if (!Arrays.equals(messageDigest.digest(), digestBytes)) {
-      return new TransportStatus(RETRY_CODE, "Data digest check error, retry.");
+      return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_RETRY, "Data digest check error");
     }
 
-    if (type != Type.FILE) {
+    if (type != TSyncTransportType.FILE) {
       buff.position(pos);
       int length = buff.capacity();
       byte[] byteArray = new byte[length];
       buff.get(byteArray);
       try {
         PipeData pipeData = PipeData.createPipeData(byteArray);
-        if (type == Type.TSFILE) {
+        if (type == TSyncTransportType.TSFILE) {
           // Do with file
           handleTsFilePipeData((TsFilePipeData) pipeData, fileDir);
         }
@@ -264,10 +281,12 @@ public class TransportServiceImpl implements TransportService.Iface {
             "Load pipeData with serialize number {} successfully.", pipeData.getSerialNumber());
       } catch (IOException | IllegalPathException e) {
         logger.error("Pipe data transport error, {}", e.getMessage());
-        return new TransportStatus(RETRY_CODE, "Data digest transport error " + e.getMessage());
+        return RpcUtils.getStatus(
+            TSStatusCode.SYNC_FILE_RETRY, "Data digest transport error " + e.getMessage());
       } catch (PipeDataLoadException e) {
         logger.error("Fail to load pipeData because {}.", e.getMessage());
-        return new TransportStatus(ERROR_CODE, "Fail to load pipeData because " + e.getMessage());
+        return RpcUtils.getStatus(
+            TSStatusCode.SYNC_FILE_ERROR, "Fail to load pipeData because " + e.getMessage());
       }
     } else {
       // Write buff to {file}.patch
@@ -290,15 +309,18 @@ public class TransportServiceImpl implements TransportService.Iface {
                 + " is done.");
       } catch (IOException e) {
         logger.error(e.getMessage());
-        return new TransportStatus(ERROR_CODE, e.getMessage());
+        return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
       }
     }
-    return new TransportStatus(SUCCESS_CODE, "");
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
   }
 
-  @Override
-  public TransportStatus checkFileDigest(MetaInfo metaInfo, ByteBuffer digest) throws TException {
-    IdentityInfo identityInfo = identityInfoThreadLocal.get();
+  public TSStatus checkFileDigest(TSyncTransportMetaInfo metaInfo, ByteBuffer digest)
+      throws TException {
+    TSyncIdentityInfo identityInfo = getCurrentTSyncIdentityInfo();
+    if (identityInfo == null) {
+      throw new TException("Thrift connection is not alive.");
+    }
     logger.debug("Invoke checkFileDigest method from client ip = {}", identityInfo.address);
     String fileDir = SyncPathUtil.getFileDataDirPath(identityInfo);
     synchronized (fileDir.intern()) {
@@ -308,7 +330,7 @@ public class TransportServiceImpl implements TransportService.Iface {
         messageDigest = MessageDigest.getInstance("SHA-256");
       } catch (NoSuchAlgorithmException e) {
         logger.error(e.getMessage());
-        return new TransportStatus(ERROR_CODE, e.getMessage());
+        return RpcUtils.getStatus(TSStatusCode.PIPESERVER_ERROR, e.getMessage());
       }
 
       try (InputStream inputStream =
@@ -330,14 +352,14 @@ public class TransportServiceImpl implements TransportService.Iface {
               localDigest,
               digest);
           new File(fileDir, fileName + RECORD_SUFFIX).delete();
-          return new TransportStatus(CONFLICT_CODE, "File digest check error.");
+          return RpcUtils.getStatus(TSStatusCode.PIPESERVER_ERROR, "File digest check error.");
         }
       } catch (IOException e) {
         logger.error(e.getMessage());
-        return new TransportStatus(ERROR_CODE, e.getMessage());
+        return RpcUtils.getStatus(TSStatusCode.PIPESERVER_ERROR, e.getMessage());
       }
 
-      return new TransportStatus(SUCCESS_CODE, "");
+      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
     }
   }
 
@@ -349,25 +371,6 @@ public class TransportServiceImpl implements TransportService.Iface {
     Files.move(tmpFile.toPath(), recordFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
   }
 
-  /**
-   * release resources or cleanup when a client (a sender) is disconnected (normally or abnormally).
-   */
-  public void handleClientExit() {
-    // Handle client exit here.
-    IdentityInfo identityInfo = identityInfoThreadLocal.get();
-    if (identityInfo != null) {
-      // if all connections exit, stop pipe
-      identityInfoThreadLocal.remove();
-      synchronized (identityInfoCounter) {
-        identityInfoCounter.compute(identityInfo, (k, v) -> v == null ? 0 : v - 1);
-        if (identityInfoCounter.get(identityInfo) == 0) {
-          identityInfoCounter.remove(identityInfo);
-          // TODO:发送端退出
-        }
-      }
-    }
-  }
-
   /**
    * handle when successfully receive tsFilePipeData. Rename .patch file and reset tsFilePipeData's
    * path.
@@ -400,4 +403,50 @@ public class TransportServiceImpl implements TransportService.Iface {
           String.format("Delete record file %s error, because %s.", recordFile.getPath(), e));
     }
   }
+
+  // endregion
+
+  // region Interfaces and Implementation of Connection Manager
+
+  /** Check if the connection is legally established by handshaking */
+  private boolean checkConnection() {
+    return currentConnectionId.get() != null;
+  }
+
+  /**
+   * Get current TSyncIdentityInfo
+   *
+   * @return null if connection has been exited
+   */
+  private TSyncIdentityInfo getCurrentTSyncIdentityInfo() {
+    Long id = currentConnectionId.get();
+    if (id != null) {
+      return connectionIdToIdentityInfoMap.get(id);
+    } else {
+      return null;
+    }
+  }
+
+  private void createConnection(TSyncIdentityInfo identityInfo) {
+    long connectionId = connectionIdGenerator.incrementAndGet();
+    currentConnectionId.set(connectionId);
+    connectionIdToIdentityInfoMap.put(connectionId, identityInfo);
+  }
+
+  /**
+   * release resources or cleanup when a client (a sender) is disconnected (normally or abnormally).
+   */
+  public void handleClientExit() {
+    if (checkConnection()) {
+      long id = currentConnectionId.get();
+      connectionIdToIdentityInfoMap.remove(id);
+      currentConnectionId.remove();
+    }
+  }
+
+  public List<TSyncIdentityInfo> getAllTSyncIdentityInfos() {
+    return new ArrayList<>(connectionIdToIdentityInfoMap.values());
+  }
+
+  // endregion
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServerManager.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServerManager.java
deleted file mode 100644
index c0663f8d13..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServerManager.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.sync.transport.server;
-
-import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.commons.exception.StartupException;
-import org.apache.iotdb.commons.service.AbstractThriftServiceThread;
-import org.apache.iotdb.commons.service.ServiceType;
-import org.apache.iotdb.commons.service.ThriftService;
-import org.apache.iotdb.commons.service.ThriftServiceThread;
-import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.service.metrics.MetricService;
-import org.apache.iotdb.db.service.metrics.enums.Metric;
-import org.apache.iotdb.db.service.metrics.enums.Tag;
-import org.apache.iotdb.metrics.utils.MetricLevel;
-import org.apache.iotdb.service.transport.thrift.TransportService;
-
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TransportServerManager extends ThriftService
-    implements Runnable, TransportServerManagerMBean {
-
-  private static final Logger logger = LoggerFactory.getLogger(TransportServerManager.class);
-  private TransportServiceImpl serviceImpl;
-
-  @Override
-  public void run() {
-    TransportServerManager serverManager = new TransportServerManager();
-    try {
-      serverManager.start();
-    } catch (StartupException e) {
-      e.printStackTrace();
-    }
-  }
-
-  private static class ServiceManagerHolder {
-    private static final TransportServerManager INSTANCE = new TransportServerManager();
-  }
-
-  public static TransportServerManager getInstance() {
-    return TransportServerManager.ServiceManagerHolder.INSTANCE;
-  }
-
-  @Override
-  public ServiceType getID() {
-    return ServiceType.SYNC_RPC_SERVICE;
-  }
-
-  @Override
-  public void initTProcessor() {
-    initSyncedServiceImpl(null);
-    serviceImpl = new TransportServiceImpl();
-    processor = new TransportService.Processor<>(serviceImpl);
-  }
-
-  @Override
-  public void initThriftServiceThread() {
-    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-    thriftServiceThread =
-        new ThriftServiceThread(
-            processor,
-            getID().getName(),
-            ThreadName.SYNC_CLIENT.getName(),
-            config.getRpcAddress(),
-            config.getPipeServerPort(),
-            Integer.MAX_VALUE,
-            config.getThriftServerAwaitTimeForStopService(),
-            new TransportServerThriftHandler(serviceImpl),
-            config.isRpcThriftCompressionEnable());
-    thriftServiceThread.setName(ThreadName.SYNC_SERVER.getName());
-    MetricService.getInstance()
-        .getOrCreateAutoGauge(
-            Metric.THRIFT_ACTIVE_THREADS.toString(),
-            MetricLevel.CORE,
-            thriftServiceThread,
-            AbstractThriftServiceThread::getActiveThreadCount,
-            Tag.NAME.toString(),
-            ThreadName.SYNC_SERVER.getName());
-  }
-
-  @Override
-  public String getBindIP() {
-    // TODO: Whether to change this config here
-    return IoTDBDescriptor.getInstance().getConfig().getRpcAddress();
-  }
-
-  @Override
-  public int getBindPort() {
-    // TODO: Whether to change this config here
-    return IoTDBDescriptor.getInstance().getConfig().getPipeServerPort();
-  }
-
-  //  @Override
-  public int getRPCPort() {
-    return getBindPort();
-  }
-
-  @Override
-  public void startService() throws StartupException {
-    // TODO: Whether to change this config here
-    super.startService();
-  }
-
-  @Override
-  public void stopService() {
-    // TODO: Whether to change this config here
-    super.stopService();
-  }
-
-  @TestOnly
-  public static void main(String[] args) throws TTransportException, StartupException {
-    logger.info("Transport server for testing only.");
-    TransportServerManager serverManager = new TransportServerManager();
-    serverManager.start();
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServerManagerMBean.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServerManagerMBean.java
deleted file mode 100644
index 4559ea07f9..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServerManagerMBean.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.sync.transport.server;
-
-import org.apache.iotdb.commons.exception.StartupException;
-
-public interface TransportServerManagerMBean {
-  String getRPCServiceStatus();
-
-  int getRPCPort();
-
-  void startService() throws StartupException;
-
-  void restartService() throws StartupException;
-
-  void stopService();
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServerThriftHandler.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServerThriftHandler.java
deleted file mode 100644
index 1045a71c6b..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServerThriftHandler.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.iotdb.db.sync.transport.server;
-
-import org.apache.iotdb.db.service.metrics.MetricService;
-import org.apache.iotdb.db.service.metrics.enums.Metric;
-import org.apache.iotdb.db.service.metrics.enums.Tag;
-import org.apache.iotdb.metrics.utils.MetricLevel;
-
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.server.ServerContext;
-import org.apache.thrift.server.TServerEventHandler;
-import org.apache.thrift.transport.TTransport;
-
-public class TransportServerThriftHandler implements TServerEventHandler {
-
-  private TransportServiceImpl serviceImpl;
-
-  public TransportServerThriftHandler(TransportServiceImpl serviceImpl) {
-    this.serviceImpl = serviceImpl;
-  }
-
-  @Override
-  public void preServe() {}
-
-  @Override
-  public ServerContext createContext(TProtocol input, TProtocol output) {
-    MetricService.getInstance()
-        .getOrCreateGauge(
-            Metric.THRIFT_CONNECTIONS.toString(),
-            MetricLevel.CORE,
-            Tag.NAME.toString(),
-            "Transport")
-        .incr(1L);
-    return null;
-  }
-
-  @Override
-  public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) {
-    // release query resources.
-    serviceImpl.handleClientExit();
-    MetricService.getInstance()
-        .getOrCreateGauge(
-            Metric.THRIFT_CONNECTIONS.toString(),
-            MetricLevel.CORE,
-            Tag.NAME.toString(),
-            "Transport")
-        .decr(1L);
-  }
-
-  @Override
-  public void processContext(
-      ServerContext serverContext, TTransport inputTransport, TTransport outputTransport) {}
-}
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java
index 9fce97b490..ec05de5d72 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.SQLParserException;
 import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
@@ -55,12 +54,9 @@ import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowContinuousQueriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowFunctionsPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowTriggersPlan;
-import org.apache.iotdb.db.qp.physical.sys.StartPipeServerPlan;
 import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan;
-import org.apache.iotdb.db.qp.physical.sys.StopPipeServerPlan;
 import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan;
 import org.apache.iotdb.db.query.executor.fill.PreviousFill;
 import org.apache.iotdb.db.service.IoTDB;
@@ -1214,30 +1210,6 @@ public class PhysicalPlanTest {
     Assert.assertEquals(ShowPlan.ShowContentType.TRIGGERS, plan.getShowContentType());
   }
 
-  @Test
-  public void testShowPipeServer() throws QueryProcessException {
-    String sql1 = "SHOW PIPESERVER";
-    ShowPipeServerPlan plan1 = (ShowPipeServerPlan) processor.parseSQLToPhysicalPlan(sql1);
-    Assert.assertTrue(plan1.isQuery());
-    Assert.assertEquals(ShowPlan.ShowContentType.PIPESERVER, plan1.getShowContentType());
-  }
-
-  @Test
-  public void testStartPipeServer() throws QueryProcessException {
-    String sql = "START PIPESERVER";
-    StartPipeServerPlan plan = (StartPipeServerPlan) processor.parseSQLToPhysicalPlan(sql);
-    Assert.assertFalse(plan.isQuery());
-    Assert.assertEquals(Operator.OperatorType.START_PIPE_SERVER, plan.getOperatorType());
-  }
-
-  @Test
-  public void testStopPipeServer() throws QueryProcessException {
-    String sql = "STOP PIPESERVER";
-    StopPipeServerPlan plan = (StopPipeServerPlan) processor.parseSQLToPhysicalPlan(sql);
-    Assert.assertFalse(plan.isQuery());
-    Assert.assertEquals(OperatorType.STOP_PIPE_SERVER, plan.getOperatorType());
-  }
-
   @Test
   public void testCreateCQ1() throws QueryProcessException {
     String sql =
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/SyncInfoTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/SyncInfoTest.java
index f21190a275..fc68a92324 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/SyncInfoTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/SyncInfoTest.java
@@ -54,7 +54,6 @@ public class SyncInfoTest {
   public void testOperatePipe() throws Exception {
     SyncInfo syncInfo = new SyncInfo();
     try {
-      syncInfo.startServer();
       CreatePipeSinkPlan createPipeSinkPlan = new CreatePipeSinkPlan("demo", "iotdb");
       createPipeSinkPlan.addPipeSinkAttribute("ip", "127.0.0.1");
       createPipeSinkPlan.addPipeSinkAttribute("port", "6670");
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
index ad15ad606b..0d53ecf0dc 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/SyncLogTest.java
@@ -64,15 +64,12 @@ public class SyncLogTest {
   public void testServiceLog() {
     try {
       SyncLogWriter log = SyncLogWriter.getInstance();
-      log.startPipeServer();
       CreatePipeSinkPlan createPipeSinkPlan = new CreatePipeSinkPlan("demo", "iotdb");
       createPipeSinkPlan.addPipeSinkAttribute("ip", "127.0.0.1");
       createPipeSinkPlan.addPipeSinkAttribute("port", "6670");
       log.addPipeSink(createPipeSinkPlan);
       log.addPipe(new CreatePipePlan(pipe1, "demo"), 1);
       log.operatePipe(pipe1, Operator.OperatorType.DROP_PIPE);
-      log.stopPipeServer();
-      log.startPipeServer();
 
       log.addPipe(new CreatePipePlan(pipe2, "demo"), 2);
       log.operatePipe(pipe1, Operator.OperatorType.STOP_PIPE);
@@ -83,7 +80,6 @@ public class SyncLogTest {
       List<PipeInfo> pipes = syncLogReader.getAllPipeInfos();
       Map<String, PipeSink> allPipeSinks = syncLogReader.getAllPipeSinks();
       PipeInfo runningPipe = syncLogReader.getRunningPipeInfo();
-      Assert.assertTrue(syncLogReader.isPipeServerEnable());
       Assert.assertEquals(1, allPipeSinks.size());
       Assert.assertEquals(2, pipes.size());
       Assert.assertEquals(pipe2, runningPipe.getPipeName());
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/transport/TransportServiceTest.java b/server/src/test/java/org/apache/iotdb/db/sync/transport/SyncTransportTest.java
similarity index 93%
rename from server/src/test/java/org/apache/iotdb/db/sync/transport/TransportServiceTest.java
rename to server/src/test/java/org/apache/iotdb/db/sync/transport/SyncTransportTest.java
index 33a6712908..afb932cf27 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/transport/TransportServiceTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/transport/SyncTransportTest.java
@@ -31,8 +31,7 @@ import org.apache.iotdb.db.sync.pipedata.SchemaPipeData;
 import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
 import org.apache.iotdb.db.sync.sender.pipe.Pipe;
 import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
-import org.apache.iotdb.db.sync.transport.client.IoTDBSInkTransportClient;
-import org.apache.iotdb.db.sync.transport.server.TransportServerManager;
+import org.apache.iotdb.db.sync.transport.client.IoTDBSinkTransportClient;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.session.Session;
 import org.apache.iotdb.session.SessionDataSet;
@@ -53,7 +52,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-public class TransportServiceTest {
+public class SyncTransportTest {
   /** create tsfile and move to tmpDir for sync test */
   File tmpDir = new File("target/synctest");
 
@@ -130,23 +129,17 @@ public class TransportServiceTest {
     Deletion deletion = new Deletion(new PartialPath("root.vehicle.**"), 0, 33, 38);
     pipeDataList.add(new DeletionPipeData(deletion, serialNum++));
 
-    // 3. start server
-    TransportServerManager.getInstance().startService();
-
-    // 4. start client
+    // 3. start client
     Pipe pipe = new TsFilePipe(createdTime1, pipeName1, null, 0, false);
-    IoTDBSInkTransportClient client =
-        new IoTDBSInkTransportClient(
-            pipe,
-            "127.0.0.1",
-            IoTDBDescriptor.getInstance().getConfig().getPipeServerPort(),
-            "127.0.0.1");
+    IoTDBSinkTransportClient client =
+        new IoTDBSinkTransportClient(
+            pipe, "127.0.0.1", IoTDBDescriptor.getInstance().getConfig().getRpcPort(), "127.0.0.1");
     client.handshake();
     for (PipeData pipeData : pipeDataList) {
       client.senderTransport(pipeData);
     }
 
-    // 5. check result
+    // 4. check result
     checkResult(
         "select ** from root.vehicle",
         new String[] {"Time", "root.vehicle.d0.s0"},
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index b455746b09..7827dae848 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -70,6 +70,9 @@ public enum TSStatusCode {
   MEASUREMENT_ALREADY_EXIST(338),
   TEMPLATE_NOT_EXIST(339),
   CREATE_TEMPLATE_ERROR(340),
+  SYNC_FILE_REBASE(341),
+  SYNC_FILE_RETRY(342),
+  SYNC_FILE_ERROR(343),
 
   EXECUTE_STATEMENT_ERROR(400),
   SQL_PARSE_ERROR(401),
diff --git a/thrift-sync/README.md b/thrift-sync/README.md
deleted file mode 100644
index 94c1de6654..0000000000
--- a/thrift-sync/README.md
+++ /dev/null
@@ -1,22 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-        http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-
-This modules maintains all RPC interfaces for data synchronization among servers.
\ No newline at end of file
diff --git a/thrift-sync/pom.xml b/thrift-sync/pom.xml
deleted file mode 100644
index 28958a86e8..0000000000
--- a/thrift-sync/pom.xml
+++ /dev/null
@@ -1,62 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-        http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <groupId>org.apache.iotdb</groupId>
-        <artifactId>iotdb-parent</artifactId>
-        <version>0.14.0-SNAPSHOT</version>
-        <relativePath>../pom.xml</relativePath>
-    </parent>
-    <artifactId>iotdb-thrift-sync</artifactId>
-    <name>rpc-thrift-sync</name>
-    <description>RPC (Thrift) framework among servers for data synchronization.</description>
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.thrift</groupId>
-            <artifactId>libthrift</artifactId>
-        </dependency>
-    </dependencies>
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.codehaus.mojo</groupId>
-                <artifactId>build-helper-maven-plugin</artifactId>
-                <version>3.2.0</version>
-                <executions>
-                    <execution>
-                        <id>add-source</id>
-                        <phase>generate-sources</phase>
-                        <goals>
-                            <goal>add-source</goal>
-                        </goals>
-                        <configuration>
-                            <sources>
-                                <source>${project.build.directory}/generated-sources/thrift</source>
-                            </sources>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-</project>
diff --git a/thrift-sync/rpc-changelist.md b/thrift-sync/rpc-changelist.md
deleted file mode 100644
index 797c0b6767..0000000000
--- a/thrift-sync/rpc-changelist.md
+++ /dev/null
@@ -1,181 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-    
-        http://www.apache.org/licenses/LICENSE-2.0
-    
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-# 0.11.x(version-2) -> 0.12.x(version-1)
-
-Last Updated on 2021.01.19 by Xiangwei Wei.
-
-
-## 1. Delete Old
-
-| Latest Changes                     | Related Committers |
-| ---------------------------------- | ------------------ |
-
-
-## 2. Add New
-
-| Latest Changes                                               | Related Committers     |
-| ------------------------------------------------------------ | ---------------------- |
-| Add timeout in TSFetchResultsReq and TSExecuteStatementReq | Xiangwei Wei | 
-
-
-## 3. Update
-
-| Latest Changes                                               | Related Committers     |
-| ------------------------------------------------------------ | ---------------------- |
-
-
-# 0.10.x (version-2) -> 0.11.x (version-3)
-
-Last Updated on 2020-10-27 by Xiangwei Wei.
-
-
-## 1. Delete Old
-
-| Latest Changes                     | Related Committers |
-| ---------------------------------- | ------------------ |
-| Remove TSBatchExecuteStatementResp            | Tian Jiang         |
-
-
-## 2. Add New
-
-| Latest Changes                                               | Related Committers     |
-| ------------------------------------------------------------ | ---------------------- |
-| set the input/output as TFramedTransport      |  Tian Jiang        |
-| Add timeout(optional) in TSFetchResultsReq and TSExecuteStatementReq | Xiangwei Wei | 
-
-
-## 3. Update
-
-| Latest Changes                                               | Related Committers     |
-| ------------------------------------------------------------ | ---------------------- |
-| Add sub-status in TSStatus  | Tian Jiang  |
-| Change the result of executeBatchStatement  as   TSStatus    | Tian Jiang  |
-| Change TSDeleteDataReq, delete timestamp and add startTime and endTime   | Wei Shao   |
-| Add zoneId in TSOpenSessionReq | Xiangwei Wei |
-
-
-# 0.9.x (version-1) -> 0.10.x (version-2)
-
-Last Updated on 2020-5-25 by Kaifeng Xue.
-
-
-## 1. Delete Old
-
-| Latest Changes                     | Related Committers |
-| ---------------------------------- | ------------------ |
-| Remove TS_SessionHandle,TSHandleIdentifier            | Tian Jiang         |
-| Remove TSStatus,TSExecuteInsertRowInBatchResp            | Jialin Qiao|
-
-
-## 2. Add New
-
-| Latest Changes                                               | Related Committers                 |
-| ------------------------------------------------------------ | ---------------------------------- |
-| Add parameter sessionId in getTimeZone, getProperties, setStorageGroup, createTimeseries... | Tian Jiang|
-| Add struct TSQueryNonAlignDataSet                            | Haonan Hou|
-| Add struct TSInsertTabletsReq                            | Jialin Qiao|
-| Add method insertTablets                            | Jialin Qiao|
-| Add method testInsertTablets                            | Xiangdong Huang |
-| add new field `inferType` in TSInsertRecordReq  | Jialin Qiao      |
-
-## 3. Update
-
-| Latest Changes                                               | Related Committers     |
-| ------------------------------------------------------------ | ---------------------- |
-| Replace TS_SessionHandles with SessionIds, TSOperationHandle with queryIds  | Tian Jiang  |
-| Add optional TSQueryNonAlignDataSet in TSExecuteStatementResp, TSFetchResultsResp and required bool isAlign in TSFetchResultsReq | Haonan Hou |
-| Rename TSStatusType to TSStatus   | Jialin Qiao   |
-| Remove sessionId in TSExecuteBatchStatementResp   | Jialin Qiao   |
-| Rename insertRows to insertReords, insert to insertRecord, insertBatch to insertTablet   | Jialin Qiao   |
-| Use TsDataType and binary rather than string in TSInsertInBatchReq and TSInsertReq  | Kaifeng Xue  |
-
-
-
-# 0.8.x -> 0.9.x (version-1)
-
-Last Updated on 2019-10-27 by Lei Rui.
-
-
-## 1. Delete Old
-
-| Latest Changes                     | Related Committers |
-| ---------------------------------- | ------------------ |
-| Delete struct TSSetStorageGroupReq | Jialin Qiao        |
-| Remove struct TSDataValue          | Lei Rui            |
-| Remove struct TSRowRecord          | Lei Rui            |
-| Remove optional string version in TSFetchMetadataResp | Genius_pig |
-| Remove optional set<string> childPaths, nodesList, storageGroups, devices in TSFetchMetadataResp | Genius_pig |
-| Remove optional map<string, string> nodeTimeseriesNum in TSFetchMetadataResp | Genius_pig |
-| Remove optional list<list<string>> timeseriesList in TSFetchMetadataResp | Genius_pig |
-| Remove optinoal optional i32 timeseriesNum in TSFetchMetadataResp | Genius_pig |
-| Remove optional i32 nodeLevel in TSFetchMetadataReq | Genius_pig |
-
-
-## 2. Add New
-
-| Latest Changes                                               | Related Committers                 |
-| ------------------------------------------------------------ | ---------------------------------- |
-| Add struct TSBatchInsertionReq                               | qiaojialin                         |
-| Add method TSExecuteBatchStatementResp insertBatch(1:TSBatchInsertionReq req) | qiaojialin                         |
-| Add Struct TSStatusType                                      | Zesong Sun                         |
-| Add TSCreateTimeseriesReq                                    | Zesong Sun                         |
-| Add method TSStatus setStorageGroup(1:string storageGroup)   | Zesong Sun, Jialin Qiao            |
-| Add method TSStatus createTimeseries(1:TSCreateTimeseriesReq req) | Zesong Sun                         |
-| Add struct TSInsertReq                                       | qiaojialin                         |
-| Add method TSRPCResp insertRow(1:TSInsertReq req)            | qiaojialin                         |
-| Add struct TSDeleteDataReq                                   | Jack Tsai, qiaojialin              |
-| Add method TSStatus deleteData(1:TSDeleteDataReq req)        | Jack Tsai, Jialin Qiao, qiaojialin |
-| Add method TSStatus deleteTimeseries(1:list\<string> path)   | qiaojialin                         |
-| Add method TSStatus deleteStorageGroups(1:list\<string> storageGroup) | Yi Tao                             |
-| Add Struct TSExecuteInsertRowInBatchResp                     | Kaifeng Xue |
-| Add method insertRowInBatch(1:TSInsertInBatchReq req);       | Kaifeng Xue |
-| Add method testInsertRowInBatch(1:TSInsertInBatchReq req);   | Kaifeng Xue |
-| Add method testInsertRow(1:TSInsertReq req);                 | Kaifeng Xue |
-| Add method testInsertBatch(1:TSBatchInsertionReq req);       | Kaifeng Xue |
-| Add struct TSCreateMultiTimeseriesReq                        | qiaojialin |
-| Add method createMultiTimeseries(1:TSCreateMultiTimeseriesReq req);       | qiaojialin |
-
-
-## 3. Update
-
-| Latest Changes                                               | Related Committers     |
-| ------------------------------------------------------------ | ---------------------- |
-| Add required string timestampPrecision in ServerProperties   | 1160300922             |
-| Add optional list\<string\> dataTypeList in TSExecuteStatementResp | suyue                  |
-| Update TSStatus to use TSStatusType, instead of using ~~TS_StatusCode, errorCode and errorMessage~~ | Zesong Sun             |
-| Rename item in enum TSProtocolVersion from ~~TSFILE_SERVICE_PROTOCOL_V1~~ to IOTDB_SERVICE_PROTOCOL_V1 | qiaojialin             |
-| Rename method name from ~~TSExecuteStatementResp executeInsertion(1:TSInsertionReq req)~~ to TSExecuteStatementResp insert(1:TSInsertionReq req) | qiaojialin             |
-| Add required i32 compressor in TSCreateTimeseriesReq         | Jialin Qiao            |
-| Add optional list\<string> nodesList, optional map\<string, string> nodeTimeseriesNum in TSFetchMetadataResp | jack870131             |
-| Add optional i32 nodeLevel in TSFetchMetadataReq             | jack870131, Zesong Sun |
-| Change the following methods' returned type to be TSStatus: <br />TSStatus closeSession(1:TSCloseSessionReq req), <br />TSStatus cancelOperation(1:TSCancelOperationReq req), <br />TSStatus closeOperation(1:TSCloseOperationReq req), <br />TSStatus setTimeZone(1:TSSetTimeZoneReq req), <br />TSStatus setStorageGroup(1:string storageGroup), <br />TSStatus createTimeseries(1:TSCreateTimeseriesReq req), <br />TSStatus insertRow(1:TSInsertReq req), <br />TSStatus deleteData(1:TSDeleteDataReq  [...]
-| Change from ~~required string path~~ to required list\<string> paths in TSDeleteDataReq | qiaojialin             |
-| Add optional set\<string> devices in TSFetchMetadataResp     | Zesong Sun             |
-| Rename some fields in TSFetchMetadataResp: ~~ColumnsList~~ to columnsList, ~~showTimeseriesList~~ to timeseriesList, ~~showStorageGroups~~ to storageGroups | Zesong Sun             |
-| Change struct TSQueryDataSet to eliminate row-wise rpc writing | Lei Rui                |
-| Add optional i32 timeseriesNum in TSFetchMetadataResp        | Jack Tsai              |
-| Add required i64 queryId in TSHandleIdentifier               | Yuan Tian    |
-| Add optional set\<string> childPaths in TSFetchMetadataResp     | Haonan Hou             |
-| Add optional string version in TSFetchMetadataResp           | Genius_pig             |
-| Add required i64 statementId in TSExecuteStatementReq        | Yuan Tian |
-| Add required binary time, required list<binary> valueList, required list<binary> bitmapList and remove required binary values, required i32 rowCount in TSQueryDataSet| Yuan Tian |
-| Add optional i32 fetchSize in TSExecuteStatementReq,<br />Add optional TSQueryDataSet in TSExecuteStatementResp| liutaohua |
-| Add optional map<string, string> props, optional map<string, string> tags, optional map<string, string> attributes and optional string aliasPath in TSCreateTimeseriesReq | Yuan Tian | 
diff --git a/thrift-sync/src/main/thrift/transport.thrift b/thrift-sync/src/main/thrift/transport.thrift
deleted file mode 100644
index 8218a7813a..0000000000
--- a/thrift-sync/src/main/thrift/transport.thrift
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-namespace java org.apache.iotdb.service.transport.thrift
-namespace py iotdb.thrift.transport
-
-struct TransportStatus{
-  1:required i32 code
-  2:required string msg
-}
-
-// The sender and receiver need to check some info to confirm validity
-struct IdentityInfo{
-  // Check whether the ip of sender is in the white list of receiver.
-  1:required string address
-
-  // Sender needs to tell receiver its identity.
-  2:required string pipeName
-  3:required i64 createTime
-
-  // The version of sender and receiver need to be the same.
-  4:required string version
-
-}
-
-enum Type {
-  TSFILE,
-  DELETION,
-  PHYSICALPLAN,
-  FILE
-}
-
-struct MetaInfo{
-  // The type of the pipeData in sending.
-  1:required Type type
-
-  // The name of the file in sending.
-  2:required string fileName
-
-  // The start index of the file slice in sending.
-  3:required i64 startIndex
-}
-
-service TransportService{
-  TransportStatus handshake(IdentityInfo info);
-  TransportStatus transportData(1:MetaInfo metaInfo, 2:binary buff, 3:binary digest);
-  TransportStatus checkFileDigest(1:MetaInfo metaInfo, 2:binary digest);
-}
diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift
index f2ecbeedea..014bda77dd 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -409,6 +409,33 @@ struct TSDropSchemaTemplateReq {
   2: required string templateName
 }
 
+// The sender and receiver need to check some info to confirm validity
+struct TSyncIdentityInfo{
+  // Check whether the ip of sender is in the white list of receiver.
+  1:required string address
+  // Sender needs to tell receiver its identity.
+  2:required string pipeName
+  3:required i64 createTime
+  // The version of sender and receiver need to be the same.
+  4:required string version
+}
+
+enum TSyncTransportType {
+  TSFILE,
+  DELETION,
+  PHYSICALPLAN,
+  FILE
+}
+
+struct TSyncTransportMetaInfo{
+  // The type of the pipeData in sending.
+  1:required TSyncTransportType type
+  // The name of the file in sending.
+  2:required string fileName
+  // The start index of the file slice in sending.
+  3:required i64 startIndex
+}
+
 service IClientRPCService {
   TSOpenSessionResp openSession(1:TSOpenSessionReq req);
 
@@ -499,4 +526,10 @@ service IClientRPCService {
   common.TSStatus unsetSchemaTemplate(1:TSUnsetSchemaTemplateReq req);
 
   common.TSStatus dropSchemaTemplate(1:TSDropSchemaTemplateReq req);
+
+  common.TSStatus handshake(TSyncIdentityInfo info);
+
+  common.TSStatus transportData(1:TSyncTransportMetaInfo metaInfo, 2:binary buff, 3:binary digest);
+
+  common.TSStatus checkFileDigest(1:TSyncTransportMetaInfo metaInfo, 2:binary digest);
 }
\ No newline at end of file