You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/20 07:14:36 UTC

[iotdb] branch master updated: [IOTDB-5847] Pipe: IoTDB Thrift Connector and PipeReceiverAgent (#9829)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 738a1389842 [IOTDB-5847] Pipe: IoTDB Thrift Connector and PipeReceiverAgent (#9829)
738a1389842 is described below

commit 738a1389842b06ecd3747045c57dc3d5c7dd50bc
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Sat May 20 15:14:29 2023 +0800

    [IOTDB-5847] Pipe: IoTDB Thrift Connector and PipeReceiverAgent (#9829)
    
    Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
 .../persistence/pipe/PipePluginInfo.java           |  44 ++--
 .../confignode/persistence/pipe/PipeTaskInfo.java  |  67 ++---
 .../runtime/PipeHandleLeaderChangeProcedure.java   |   3 +-
 .../pipe/task/AbstractOperatePipeProcedureV2.java  |  11 +-
 .../impl/pipe/task/CreatePipeProcedureV2.java      |  16 +-
 .../impl/pipe/task/DropPipeProcedureV2.java        |   4 +-
 .../impl/pipe/task/StartPipeProcedureV2.java       |   4 +-
 .../impl/pipe/task/StopPipeProcedureV2.java        |   4 +-
 .../pipe/plugin/builtin/BuiltinPipePlugin.java     |   6 +-
 .../{DefaultCollector.java => IoTDBCollector.java} |   2 +-
 .../IoTDBThriftConnector.java}                     |  48 ++--
 .../org/apache/iotdb/pipe/api/PipeConnector.java   |   4 +
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  41 +++
 .../db/engine/storagegroup/TsFileResource.java     |   2 +-
 .../org/apache/iotdb/db/pipe/agent/PipeAgent.java  |  12 +
 .../db/pipe/agent/plugin/PipePluginAgent.java      |   2 +-
 .../receiver/IoTDBThriftReceiver.java}             |  19 +-
 .../db/pipe/agent/receiver/PipeReceiverAgent.java  |  90 +++++++
 .../apache/iotdb/db/pipe/config/PipeConfig.java    |  16 ++
 .../db/pipe/config/PipeConnectorConstant.java      |   3 +
 .../core/collector/IoTDBDataRegionCollector.java   |  39 ++-
 .../PipeHistoricalDataRegionTsFileCollector.java   |  13 +-
 .../PipeRealtimeDataRegionHybridCollector.java     |   6 +-
 .../realtime/assigner/PipeDataRegionAssigner.java  |   3 +-
 .../listener/PipeInsertionDataNodeListener.java    |   4 +-
 .../impl/iotdb/IoTDBThriftConnectorClient.java     |  70 ++++++
 .../impl/iotdb/IoTDBThriftConnectorVersion.java}   |  16 +-
 .../impl/iotdb/v1/IoTDBThriftConnectorV1.java      | 231 +++++++++++++++++
 .../impl/iotdb/v1/IoTDBThriftReceiverV1.java       | 275 +++++++++++++++++++++
 .../connector/impl/iotdb/v1/PipeRequestType.java   |  56 +++++
 .../v1/reponse/PipeTransferFilePieceResp.java      |  80 ++++++
 .../iotdb/v1/request/PipeTransferFilePieceReq.java |  88 +++++++
 .../iotdb/v1/request/PipeTransferFileSealReq.java  |  79 ++++++
 .../iotdb/v1/request/PipeTransferHandshakeReq.java |  71 ++++++
 .../v1/request/PipeTransferInsertNodeReq.java      | 102 ++++++++
 .../PipeConnectorSubtaskLifeCycle.java             |   2 +-
 .../{ => manager}/PipeConnectorSubtaskManager.java |  18 +-
 .../core/event/impl/PipeTabletInsertionEvent.java  |   4 +
 .../core/event/impl/PipeTsFileInsertionEvent.java  |  37 ++-
 .../realtime/PipeRealtimeCollectEventFactory.java  |   6 +-
 .../event/view/collector/PipeEventCollector.java   |   3 +-
 .../execution/executor/PipeSubtaskExecutor.java    |  15 +-
 .../org/apache/iotdb/db/pipe/task/PipeBuilder.java |   8 +-
 .../db/pipe/task/stage/PipeTaskCollectorStage.java |  25 +-
 .../db/pipe/task/stage/PipeTaskConnectorStage.java |  11 +-
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |  19 +-
 .../db/pipe/task/subtask/PipeConnectorSubtask.java |  29 ++-
 .../db/pipe/task/subtask/PipeProcessorSubtask.java |  21 +-
 .../iotdb/db/pipe/task/subtask/PipeSubtask.java    |  28 ++-
 .../service/thrift/impl/ClientRPCServiceImpl.java  |   9 +
 .../pipe/core/connector/PipeThriftRequestTest.java | 118 +++++++++
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   9 +-
 thrift/src/main/thrift/client.thrift               |  13 +
 53 files changed, 1727 insertions(+), 179 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index c954df5770e..f839f6251dc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -56,8 +56,8 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.locks.ReentrantLock;
 
-import static org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.DEFAULT_COLLECTOR;
 import static org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.DO_NOTHING_PROCESSOR;
+import static org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.IOTDB_COLLECTOR;
 
 public class PipePluginInfo implements SnapshotProcessor {
 
@@ -121,17 +121,19 @@ public class PipePluginInfo implements SnapshotProcessor {
     return !pipePluginMetaKeeper.containsJar(jarName);
   }
 
-  public boolean checkBeforeCreatePipe(TCreatePipeReq createPipeRequest) {
+  public void checkBeforeCreatePipe(TCreatePipeReq createPipeRequest) {
     final PipeParameters collectorParameters =
         new PipeParameters(createPipeRequest.getCollectorAttributes());
     final String collectorPluginName =
         collectorParameters.getStringOrDefault(
-            PipeCollectorConstant.COLLECTOR_KEY, DEFAULT_COLLECTOR.getPipePluginName());
+            PipeCollectorConstant.COLLECTOR_KEY, IOTDB_COLLECTOR.getPipePluginName());
     if (!pipePluginMetaKeeper.containsPipePlugin(collectorPluginName)) {
-      LOGGER.warn(
-          "Failed to create pipe, the pipe collector plugin {} does not exist",
-          collectorPluginName);
-      return false;
+      final String exceptionMessage =
+          String.format(
+              "Failed to create pipe, the pipe collector plugin %s does not exist",
+              collectorPluginName);
+      LOGGER.warn(exceptionMessage);
+      throw new PipeManagementException(exceptionMessage);
     }
 
     final PipeParameters processorParameters =
@@ -140,28 +142,32 @@ public class PipePluginInfo implements SnapshotProcessor {
         processorParameters.getStringOrDefault(
             PipeProcessorConstant.PROCESSOR_KEY, DO_NOTHING_PROCESSOR.getPipePluginName());
     if (!pipePluginMetaKeeper.containsPipePlugin(processorPluginName)) {
-      LOGGER.warn(
-          "Failed to create pipe, the pipe processor plugin {} does not exist",
-          processorPluginName);
-      return false;
+      final String exceptionMessage =
+          String.format(
+              "Failed to create pipe, the pipe processor plugin %s does not exist",
+              processorPluginName);
+      LOGGER.warn(exceptionMessage);
+      throw new PipeManagementException(exceptionMessage);
     }
 
     final PipeParameters connectorParameters =
         new PipeParameters(createPipeRequest.getConnectorAttributes());
     if (!connectorParameters.hasAttribute(PipeConnectorConstant.CONNECTOR_KEY)) {
-      LOGGER.warn("Failed to create pipe, the pipe connector plugin is not specified");
-      return false;
+      final String exceptionMessage =
+          "Failed to create pipe, the pipe connector plugin is not specified";
+      LOGGER.warn(exceptionMessage);
+      throw new PipeManagementException(exceptionMessage);
     }
     final String connectorPluginName =
         connectorParameters.getString(PipeConnectorConstant.CONNECTOR_KEY);
     if (!pipePluginMetaKeeper.containsPipePlugin(connectorPluginName)) {
-      LOGGER.warn(
-          "Failed to create pipe, the pipe connector plugin {} does not exist",
-          connectorPluginName);
-      return false;
+      final String exceptionMessage =
+          String.format(
+              "Failed to create pipe, the pipe connector plugin %s does not exist",
+              connectorPluginName);
+      LOGGER.warn(exceptionMessage);
+      throw new PipeManagementException(exceptionMessage);
     }
-
-    return true;
   }
 
   /////////////////////////////// Pipe Plugin Management ///////////////////////////////
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 528725b1327..5ac5b7fca21 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStat
 import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -71,59 +72,67 @@ public class PipeTaskInfo implements SnapshotProcessor {
 
   /////////////////////////////// Validator ///////////////////////////////
 
-  public boolean checkBeforeCreatePipe(TCreatePipeReq createPipeRequest) {
+  public void checkBeforeCreatePipe(TCreatePipeReq createPipeRequest)
+      throws PipeManagementException {
     if (!isPipeExisted(createPipeRequest.getPipeName())) {
-      return true;
+      return;
     }
 
-    LOGGER.info(
+    final String exceptionMessage =
         String.format(
-            "Failed to create pipe [%s], the pipe with the same name has been created",
-            createPipeRequest.getPipeName()));
-    return false;
+            "Failed to create pipe %s, the pipe with the same name has been created",
+            createPipeRequest.getPipeName());
+    LOGGER.info(exceptionMessage);
+    throw new PipeManagementException(exceptionMessage);
   }
 
-  public boolean checkBeforeStartPipe(String pipeName) {
+  public void checkBeforeStartPipe(String pipeName) throws PipeManagementException {
     if (!isPipeExisted(pipeName)) {
-      LOGGER.info(String.format("Failed to start pipe [%s], the pipe does not exist", pipeName));
-      return false;
+      final String exceptionMessage =
+          String.format("Failed to start pipe %s, the pipe does not exist", pipeName);
+      LOGGER.info(exceptionMessage);
+      throw new PipeManagementException(exceptionMessage);
     }
 
     final PipeStatus pipeStatus = getPipeStatus(pipeName);
     if (pipeStatus == PipeStatus.RUNNING) {
-      LOGGER.info(
-          String.format("Failed to start pipe [%s], the pipe is already running", pipeName));
-      return false;
+      final String exceptionMessage =
+          String.format("Failed to start pipe %s, the pipe is already running", pipeName);
+      LOGGER.info(exceptionMessage);
+      throw new PipeManagementException(exceptionMessage);
     }
     if (pipeStatus == PipeStatus.DROPPED) {
-      LOGGER.info(
-          String.format("Failed to start pipe [%s], the pipe is already dropped", pipeName));
-      return false;
+      final String exceptionMessage =
+          String.format("Failed to start pipe %s, the pipe is already dropped", pipeName);
+      LOGGER.info(exceptionMessage);
+      throw new PipeManagementException(exceptionMessage);
     }
-
-    return true;
   }
 
-  public boolean checkBeforeStopPipe(String pipeName) {
+  public void checkBeforeStopPipe(String pipeName) throws PipeManagementException {
     if (!isPipeExisted(pipeName)) {
-      LOGGER.info(String.format("Failed to stop pipe [%s], the pipe does not exist", pipeName));
-      return false;
+      final String exceptionMessage =
+          String.format("Failed to stop pipe %s, the pipe does not exist", pipeName);
+      LOGGER.info(exceptionMessage);
+      throw new PipeManagementException(exceptionMessage);
     }
 
     final PipeStatus pipeStatus = getPipeStatus(pipeName);
     if (pipeStatus == PipeStatus.STOPPED) {
-      LOGGER.info(String.format("Failed to stop pipe [%s], the pipe is already stop", pipeName));
-      return false;
+      final String exceptionMessage =
+          String.format("Failed to stop pipe %s, the pipe is already stop", pipeName);
+      LOGGER.info(exceptionMessage);
+      throw new PipeManagementException(exceptionMessage);
     }
     if (pipeStatus == PipeStatus.DROPPED) {
-      LOGGER.info(String.format("Failed to stop pipe [%s], the pipe is already dropped", pipeName));
-      return false;
+      final String exceptionMessage =
+          String.format("Failed to stop pipe %s, the pipe is already dropped", pipeName);
+      LOGGER.info(exceptionMessage);
+      throw new PipeManagementException(exceptionMessage);
     }
-
-    return true;
   }
 
-  public boolean checkBeforeDropPipe(String pipeName) {
+  public void checkBeforeDropPipe(String pipeName) {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug(
           "Check before drop pipe {}, pipe exists: {}.",
@@ -131,8 +140,8 @@ public class PipeTaskInfo implements SnapshotProcessor {
           isPipeExisted(pipeName) ? "true" : "false");
     }
     // no matter whether the pipe exists, we allow the drop operation executed on all nodes to
-    // ensure the consistency
-    return true;
+    // ensure the consistency.
+    // DO NOTHING HERE!
   }
 
   private boolean isPipeExisted(String pipeName) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index 42de84cc9e5..3fbe3091101 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -65,11 +65,10 @@ public class PipeHandleLeaderChangeProcedure extends AbstractOperatePipeProcedur
   }
 
   @Override
-  protected boolean executeFromValidateTask(ConfigNodeProcedureEnv env) {
+  protected void executeFromValidateTask(ConfigNodeProcedureEnv env) {
     LOGGER.info("PipeHandleLeaderChangeProcedure: executeFromValidateTask");
 
     // nothing needs to be checked
-    return true;
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
index 402a9d43bf3..15065499562 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
 import org.apache.iotdb.commons.exception.sync.PipeException;
-import org.apache.iotdb.commons.exception.sync.PipeSinkException;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.confignode.persistence.pipe.PipeTaskOperation;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -66,8 +65,7 @@ public abstract class AbstractOperatePipeProcedureV2
    *
    * @return true if procedure can finish directly
    */
-  protected abstract boolean executeFromValidateTask(ConfigNodeProcedureEnv env)
-      throws PipeException, PipeSinkException;
+  protected abstract void executeFromValidateTask(ConfigNodeProcedureEnv env) throws PipeException;
 
   /** Execute at state CALCULATE_INFO_FOR_TASK */
   protected abstract void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env)
@@ -88,10 +86,7 @@ public abstract class AbstractOperatePipeProcedureV2
       switch (state) {
         case VALIDATE_TASK:
           env.getConfigManager().getPipeManager().getPipeTaskCoordinator().lock();
-          if (!executeFromValidateTask(env)) {
-            env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
-            return Flow.NO_MORE_STATE;
-          }
+          executeFromValidateTask(env);
           setNextState(OperatePipeTaskState.CALCULATE_INFO_FOR_TASK);
           break;
         case CALCULATE_INFO_FOR_TASK:
@@ -107,7 +102,7 @@ public abstract class AbstractOperatePipeProcedureV2
           env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
           return Flow.NO_MORE_STATE;
       }
-    } catch (PipeException | PipeSinkException | IOException e) {
+    } catch (Exception e) {
       if (isRollbackSupported(state)) {
         LOGGER.error("Fail in OperatePipeProcedure", e);
         setFailure(new ProcedureException(e.getMessage()));
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index ec45d1c26b4..e0de835fb07 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -69,19 +69,17 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
   }
 
   @Override
-  protected boolean executeFromValidateTask(ConfigNodeProcedureEnv env) {
+  protected void executeFromValidateTask(ConfigNodeProcedureEnv env)
+      throws PipeManagementException {
     LOGGER.info(
         "CreatePipeProcedureV2: executeFromValidateTask({})", createPipeRequest.getPipeName());
 
     final PipeManager pipeManager = env.getConfigManager().getPipeManager();
-    return pipeManager
-            .getPipePluginCoordinator()
-            .getPipePluginInfo()
-            .checkBeforeCreatePipe(createPipeRequest)
-        && pipeManager
-            .getPipeTaskCoordinator()
-            .getPipeTaskInfo()
-            .checkBeforeCreatePipe(createPipeRequest);
+    pipeManager
+        .getPipePluginCoordinator()
+        .getPipePluginInfo()
+        .checkBeforeCreatePipe(createPipeRequest);
+    pipeManager.getPipeTaskCoordinator().getPipeTaskInfo().checkBeforeCreatePipe(createPipeRequest);
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
index d5826af1081..5a715ae3534 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
@@ -55,11 +55,11 @@ public class DropPipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
   }
 
   @Override
-  protected boolean executeFromValidateTask(ConfigNodeProcedureEnv env)
+  protected void executeFromValidateTask(ConfigNodeProcedureEnv env)
       throws PipeManagementException {
     LOGGER.info("DropPipeProcedureV2: executeFromValidateTask({})", pipeName);
 
-    return env.getConfigManager()
+    env.getConfigManager()
         .getPipeManager()
         .getPipeTaskCoordinator()
         .getPipeTaskInfo()
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
index 8163f625ef2..ecd4cdc57c0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
@@ -56,11 +56,11 @@ public class StartPipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
   }
 
   @Override
-  protected boolean executeFromValidateTask(ConfigNodeProcedureEnv env)
+  protected void executeFromValidateTask(ConfigNodeProcedureEnv env)
       throws PipeManagementException {
     LOGGER.info("StartPipeProcedureV2: executeFromValidateTask({})", pipeName);
 
-    return env.getConfigManager()
+    env.getConfigManager()
         .getPipeManager()
         .getPipeTaskCoordinator()
         .getPipeTaskInfo()
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
index 4a8279d583b..fcd015fb940 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
@@ -56,11 +56,11 @@ public class StopPipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
   }
 
   @Override
-  protected boolean executeFromValidateTask(ConfigNodeProcedureEnv env)
+  protected void executeFromValidateTask(ConfigNodeProcedureEnv env)
       throws PipeManagementException {
     LOGGER.info("StopPipeProcedureV2: executeFromValidateTask({})", pipeName);
 
-    return env.getConfigManager()
+    env.getConfigManager()
         .getPipeManager()
         .getPipeTaskCoordinator()
         .getPipeTaskInfo()
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index 6e59cab398e..d1a95bb45ba 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -19,20 +19,22 @@
 
 package org.apache.iotdb.commons.pipe.plugin.builtin;
 
-import org.apache.iotdb.commons.pipe.plugin.builtin.collector.DefaultCollector;
+import org.apache.iotdb.commons.pipe.plugin.builtin.collector.IoTDBCollector;
 import org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector;
+import org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector;
 import org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor;
 
 public enum BuiltinPipePlugin {
 
   // collectors
-  DEFAULT_COLLECTOR("default_collector", DefaultCollector.class),
+  IOTDB_COLLECTOR("iotdb_collector", IoTDBCollector.class),
 
   // processors
   DO_NOTHING_PROCESSOR("do_nothing_processor", DoNothingProcessor.class),
 
   // connectors
   DO_NOTHING_CONNECTOR("do_nothing_connector", DoNothingConnector.class),
+  IOTDB_THRIFT_CONNECTOR("iotdb_thrift_connector", IoTDBThriftConnector.class);
   ;
 
   private final String pipePluginName;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/DefaultCollector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBCollector.java
similarity index 97%
copy from node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/DefaultCollector.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBCollector.java
index fed08ae2197..9902cce4ca0 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/DefaultCollector.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/IoTDBCollector.java
@@ -31,7 +31,7 @@ import org.apache.iotdb.pipe.api.event.Event;
  * imported here. The pipe agent in the server module will replace this class with the real
  * implementation when initializing the collector.
  */
-public class DefaultCollector implements PipeCollector {
+public class IoTDBCollector implements PipeCollector {
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
     throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/DefaultCollector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
similarity index 51%
rename from node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/DefaultCollector.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
index fed08ae2197..e252e5be726 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/collector/DefaultCollector.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
@@ -17,44 +17,62 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.pipe.plugin.builtin.collector;
+package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
 
-import org.apache.iotdb.pipe.api.PipeCollector;
+import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
-import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
 /**
- * This class is a placeholder and should not be initialized. It represents the default collector
- * when no collector is specified. There is a real implementation in the server module but cannot be
- * imported here. The pipe agent in the server module will replace this class with the real
- * implementation when initializing the collector.
+ * This class is a placeholder and should not be initialized. It represents the IoTDB Thrift
+ * connector. There is a real implementation in the server module but cannot be imported here. The
+ * pipe agent in the server module will replace this class with the real implementation when
+ * initializing the IoTDB Thrift connector.
  */
-public class DefaultCollector implements PipeCollector {
+public class IoTDBThriftConnector implements PipeConnector {
+
+  @Override
+  public void validate(PipeParameterValidator validator) {
+    throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
+  }
+
+  @Override
+  public void customize(
+      PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration) {
+    throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
+  }
+
+  @Override
+  public void handshake() {
+    throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
+  }
+
   @Override
-  public void validate(PipeParameterValidator validator) throws Exception {
+  public void heartbeat() {
     throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
   }
 
   @Override
-  public void customize(PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration)
-      throws Exception {
+  public void transfer(TabletInsertionEvent tabletInsertionEvent) {
     throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
   }
 
   @Override
-  public void start() throws Exception {
+  public void transfer(TsFileInsertionEvent tsFileInsertionEvent) {
     throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
   }
 
   @Override
-  public Event supply() throws Exception {
+  public void transfer(DeletionEvent deletionEvent) {
     throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
   }
 
   @Override
-  public void close() throws Exception {
+  public void close() {
     throw new UnsupportedOperationException("This class is a placeholder and should not be used.");
   }
 }
diff --git a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
index 2c9bbff5aa6..5502de8bb37 100644
--- a/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
+++ b/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfig
 import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 
 /**
  * PipeConnector
@@ -114,6 +115,7 @@ public interface PipeConnector extends PipePlugin {
    * This method is used to transfer the TabletInsertionEvent.
    *
    * @param tabletInsertionEvent TabletInsertionEvent to be transferred
+   * @throws PipeConnectionException if the connection is broken
    * @throws Exception the user can throw errors if necessary
    */
   void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception;
@@ -122,6 +124,7 @@ public interface PipeConnector extends PipePlugin {
    * This method is used to transfer the TsFileInsertionEvent.
    *
    * @param tsFileInsertionEvent TsFileInsertionEvent to be transferred
+   * @throws PipeConnectionException if the connection is broken
    * @throws Exception the user can throw errors if necessary
    */
   void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception;
@@ -130,6 +133,7 @@ public interface PipeConnector extends PipePlugin {
    * This method is used to transfer the DeletionEvent.
    *
    * @param deletionEvent DeletionEvent to be transferred
+   * @throws PipeConnectionException if the connection is broken
    * @throws Exception the user can throw errors if necessary
    */
   void transfer(DeletionEvent deletionEvent) throws Exception;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 6ebfff6ffca..c8131ebf286 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -94,6 +94,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 
 import static org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
 
@@ -700,6 +701,46 @@ public class StorageEngine implements IService {
     }
   }
 
+  /**
+   * run the runnable if the region is absent. if the region is present, do nothing.
+   *
+   * <p>we don't use computeIfAbsent because we don't want to create a new region if the region is
+   * absent, we just want to run the runnable in a synchronized way.
+   *
+   * @return true if the region is absent and the runnable is run. false if the region is present.
+   */
+  public boolean runIfAbsent(DataRegionId regionId, Runnable runnable) {
+    final AtomicBoolean result = new AtomicBoolean(false);
+    dataRegionMap.computeIfAbsent(
+        regionId,
+        k -> {
+          runnable.run();
+          result.set(true);
+          return null;
+        });
+    return result.get();
+  }
+
+  /**
+   * run the consumer if the region is present. if the region is absent, do nothing.
+   *
+   * <p>we don't use computeIfPresent because we don't want to remove the region if the consumer
+   * returns null, we just want to run the consumer in a synchronized way.
+   *
+   * @return true if the region is present and the consumer is run. false if the region is absent.
+   */
+  public boolean runIfPresent(DataRegionId regionId, Consumer<DataRegion> consumer) {
+    final AtomicBoolean result = new AtomicBoolean(false);
+    dataRegionMap.computeIfPresent(
+        regionId,
+        (id, region) -> {
+          consumer.accept(region);
+          result.set(true);
+          return region;
+        });
+    return result.get();
+  }
+
   public DataRegion getDataRegion(DataRegionId regionId) {
     return dataRegionMap.get(regionId);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index e91fe3ffc87..6b03f3ddede 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -482,7 +482,7 @@ public class TsFileResource {
     timeIndex.close();
   }
 
-  TsFileProcessor getProcessor() {
+  public TsFileProcessor getProcessor() {
     return processor;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
index 501cbf0016d..2db8738c557 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.agent;
 
 import org.apache.iotdb.db.pipe.agent.plugin.PipePluginAgent;
+import org.apache.iotdb.db.pipe.agent.receiver.PipeReceiverAgent;
 import org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent;
 import org.apache.iotdb.db.pipe.agent.task.PipeTaskAgent;
 
@@ -29,12 +30,14 @@ public class PipeAgent {
   private final PipePluginAgent pipePluginAgent;
   private final PipeTaskAgent pipeTaskAgent;
   private final PipeRuntimeAgent pipeRuntimeAgent;
+  private final PipeReceiverAgent pipeReceiverAgent;
 
   /** Private constructor to prevent users from creating a new instance. */
   private PipeAgent() {
     pipePluginAgent = new PipePluginAgent();
     pipeTaskAgent = new PipeTaskAgent();
     pipeRuntimeAgent = new PipeRuntimeAgent();
+    pipeReceiverAgent = new PipeReceiverAgent();
   }
 
   /** The singleton holder of PipeAgent. */
@@ -68,4 +71,13 @@ public class PipeAgent {
   public static PipeRuntimeAgent runtime() {
     return PipeAgentHolder.HANDLE.pipeRuntimeAgent;
   }
+
+  /**
+   * Get the singleton instance of PipeReceiverAgent.
+   *
+   * @return the singleton instance of PipeReceiverAgent
+   */
+  public static PipeReceiverAgent receiver() {
+    return PipeAgentHolder.HANDLE.pipeReceiverAgent;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index 3474edddf3d..b0fe93b4533 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -198,7 +198,7 @@ public class PipePluginAgent {
         reflect(
             collectorParameters.getStringOrDefault(
                 PipeCollectorConstant.COLLECTOR_KEY,
-                BuiltinPipePlugin.DEFAULT_COLLECTOR.getPipePluginName()));
+                BuiltinPipePlugin.IOTDB_COLLECTOR.getPipePluginName()));
   }
 
   public PipeProcessor reflectProcessor(PipeParameters processorParameters) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java
similarity index 56%
copy from server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java
index 67a637503b2..81d6fef23d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/IoTDBThriftReceiver.java
@@ -17,13 +17,20 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.config;
+package org.apache.iotdb.db.pipe.agent.receiver;
 
-public class PipeConnectorConstant {
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
-  public static final String CONNECTOR_KEY = "connector";
+public interface IoTDBThriftReceiver {
 
-  private PipeConnectorConstant() {
-    throw new IllegalStateException("Utility class");
-  }
+  IoTDBThriftConnectorVersion getVersion();
+
+  TPipeTransferResp handleTransferReq(
+      TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher);
+
+  void handleExit();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
new file mode 100644
index 00000000000..ab48a9f6149
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.agent.receiver;
+
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.IoTDBThriftReceiverV1;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PipeReceiverAgent {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipeReceiverAgent.class);
+
+  private final ThreadLocal<IoTDBThriftReceiver> receiverThreadLocal = new ThreadLocal<>();
+
+  public TPipeTransferResp transfer(
+      TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
+    final byte reqVersion = req.getVersion();
+    if (reqVersion == IoTDBThriftConnectorVersion.VERSION_ONE.getVersion()) {
+      return getReceiver(reqVersion).handleTransferReq(req, partitionFetcher, schemaFetcher);
+    } else {
+      return new TPipeTransferResp(
+          RpcUtils.getStatus(
+              TSStatusCode.PIPE_VERSION_ERROR,
+              String.format("Unsupported pipe version %d", reqVersion)));
+    }
+  }
+
+  private IoTDBThriftReceiver getReceiver(byte reqVersion) {
+    if (receiverThreadLocal.get() == null) {
+      return setAndGetReceiver(reqVersion);
+    }
+
+    final byte receiverThreadLocalVersion = receiverThreadLocal.get().getVersion().getVersion();
+    if (receiverThreadLocalVersion != reqVersion) {
+      LOGGER.warn(
+          "The receiver version {} is different from the sender version {},"
+              + " the receiver will be reset to the sender version.",
+          receiverThreadLocalVersion,
+          reqVersion);
+      receiverThreadLocal.get().handleExit();
+      receiverThreadLocal.remove();
+      return setAndGetReceiver(reqVersion);
+    }
+
+    return receiverThreadLocal.get();
+  }
+
+  private IoTDBThriftReceiver setAndGetReceiver(byte reqVersion) {
+    if (reqVersion == IoTDBThriftConnectorVersion.VERSION_ONE.getVersion()) {
+      receiverThreadLocal.set(new IoTDBThriftReceiverV1());
+    } else {
+      throw new UnsupportedOperationException(
+          String.format("Unsupported pipe version %d", reqVersion));
+    }
+    return receiverThreadLocal.get();
+  }
+
+  public void handleClientExit() {
+    final IoTDBThriftReceiver receiver = receiverThreadLocal.get();
+    if (receiver != null) {
+      receiver.handleExit();
+      receiverThreadLocal.remove();
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConfig.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConfig.java
index c0018f8767d..5690692e165 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConfig.java
@@ -19,6 +19,10 @@
 
 package org.apache.iotdb.db.pipe.config;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import java.io.File;
+
 // TODO: make these parameters configurable
 // TODO: make all pipe related parameters in one place
 // TODO: set the default value of the parameters in IoTDBDescriptor
@@ -41,6 +45,8 @@ public class PipeConfig {
   private final int realtimeCollectorPendingQueueTabletLimit =
       realtimeCollectorPendingQueueCapacity / 2;
 
+  private final int readFileBufferSize = 8388608;
+
   public int getDefaultRingBufferSize() {
     return defaultRingBufferSize;
   }
@@ -57,6 +63,16 @@ public class PipeConfig {
     return realtimeCollectorPendingQueueTabletLimit;
   }
 
+  public String getReceiveFileDir() {
+    return IoTDBDescriptor.getInstance().getConfig().getSystemDir()
+        + File.separator
+        + "pipe"; // TODO: replace with resource manager
+  }
+
+  public int getReadFileBufferSize() {
+    return readFileBufferSize;
+  }
+
   /////////////////////////////// Singleton ///////////////////////////////
 
   private PipeConfig() {}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
index 67a637503b2..7b88c56ef20 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
@@ -23,6 +23,9 @@ public class PipeConnectorConstant {
 
   public static final String CONNECTOR_KEY = "connector";
 
+  public static final String CONNECTOR_IOTDB_IP_KEY = "connector.ip";
+  public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port";
+
   private PipeConnectorConstant() {
     throw new IllegalStateException("Utility class");
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
index d7526fa1877..fe1c92638be 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.db.pipe.core.collector;
 
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionTsFileCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
@@ -39,6 +42,8 @@ public class IoTDBDataRegionCollector implements PipeCollector {
   // TODO: support pattern in historical collector
   private final PipeHistoricalDataRegionTsFileCollector historicalCollector;
 
+  private int dataRegionId;
+
   public IoTDBDataRegionCollector(ListenableUnblockingPendingQueue<Event> collectorPendingQueue) {
     hasBeenStarted = new AtomicBoolean(false);
     realtimeCollector = new PipeRealtimeDataRegionHybridCollector(collectorPendingQueue);
@@ -47,6 +52,8 @@ public class IoTDBDataRegionCollector implements PipeCollector {
 
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
+    validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY);
+
     // TODO: require more attributes
     realtimeCollector.validate(validator);
     historicalCollector.validate(validator);
@@ -55,6 +62,8 @@ public class IoTDBDataRegionCollector implements PipeCollector {
   @Override
   public void customize(
       PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {
+    dataRegionId = parameters.getInt(PipeCollectorConstant.DATA_REGION_KEY);
+
     realtimeCollector.customize(parameters, configuration);
     historicalCollector.customize(parameters, configuration);
   }
@@ -66,8 +75,36 @@ public class IoTDBDataRegionCollector implements PipeCollector {
     }
     hasBeenStarted.set(true);
 
-    realtimeCollector.start();
+    while (true) {
+      // try to start collectors in the data region ...
+      // first try to run if data region exists, then try to run if data region does not exist.
+      // both conditions fail is not common, which means the data region is created during the
+      // runIfPresent and runIfAbsent operations. in this case, we need to retry.
+      if (StorageEngine.getInstance()
+              .runIfPresent(
+                  new DataRegionId(dataRegionId),
+                  (dataRegion -> {
+                    dataRegion.writeLock(
+                        String.format(
+                            "Pipe: starting %s", IoTDBDataRegionCollector.class.getName()));
+                    try {
+                      startHistoricalCollectorAndRealtimeCollector();
+                    } finally {
+                      dataRegion.writeUnlock();
+                    }
+                  }))
+          || StorageEngine.getInstance()
+              .runIfAbsent(
+                  new DataRegionId(dataRegionId),
+                  this::startHistoricalCollectorAndRealtimeCollector)) {
+        return;
+      }
+    }
+  }
+
+  public void startHistoricalCollectorAndRealtimeCollector() {
     historicalCollector.start();
+    realtimeCollector.start();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index 4559ca8a537..ed0d37c6c14 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -56,6 +56,11 @@ public class PipeHistoricalDataRegionTsFileCollector implements PipeCollector {
   public synchronized void start() {
     final DataRegion dataRegion =
         StorageEngine.getInstance().getDataRegion(new DataRegionId(dataRegionId));
+    if (dataRegion == null) {
+      pendingQueue = new ArrayDeque<>();
+      return;
+    }
+
     dataRegion.writeLock("Pipe: collect historical TsFile");
     try {
       dataRegion.syncCloseAllWorkingTsFileProcessors();
@@ -66,12 +71,16 @@ public class PipeHistoricalDataRegionTsFileCollector implements PipeCollector {
         pendingQueue = new ArrayDeque<>(tsFileManager.size(true) + tsFileManager.size(false));
         pendingQueue.addAll(
             tsFileManager.getTsFileList(true).stream()
-                .map(o -> new PipeTsFileInsertionEvent(o.getTsFile()))
+                .map(PipeTsFileInsertionEvent::new)
                 .collect(Collectors.toList()));
         pendingQueue.addAll(
             tsFileManager.getTsFileList(false).stream()
-                .map(o -> new PipeTsFileInsertionEvent(o.getTsFile()))
+                .map(PipeTsFileInsertionEvent::new)
                 .collect(Collectors.toList()));
+        pendingQueue.forEach(
+            event ->
+                event.increaseReferenceCount(
+                    PipeHistoricalDataRegionTsFileCollector.class.getName()));
       } finally {
         tsFileManager.readUnlock();
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index ab7b705ce87..67d461e8c8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.db.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
 import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
 import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
-import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
 
@@ -123,7 +122,6 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
       }
 
       collectEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridCollector.class.getName());
-
       if (suppliedEvent != null) {
         return suppliedEvent;
       }
@@ -144,7 +142,7 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
                 (state.equals(TsFileEpoch.State.EMPTY)) ? TsFileEpoch.State.USING_TABLET : state);
 
     if (event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TABLET)) {
-      if (event.increaseReferenceCount(PipeProcessorSubtask.class.getName())) {
+      if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridCollector.class.getName())) {
         return event.getEvent();
       } else {
         // if the event's reference count can not be increased, it means the data represented by
@@ -174,7 +172,7 @@ public class PipeRealtimeDataRegionHybridCollector extends PipeRealtimeDataRegio
             });
 
     if (event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)) {
-      if (event.increaseReferenceCount(PipeProcessorSubtask.class.getName())) {
+      if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridCollector.class.getName())) {
         return event.getEvent();
       } else {
         // if the event's reference count can not be increased, it means the data represented by
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
index 4fc03f371cd..2d805310f30 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.pipe.core.collector.realtime.assigner;
 
 import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
-import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionHybridCollector;
 import org.apache.iotdb.db.pipe.core.collector.realtime.matcher.CachedSchemaPatternMatcher;
 import org.apache.iotdb.db.pipe.core.collector.realtime.matcher.PipeDataRegionMatcher;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
@@ -54,7 +53,7 @@ public class PipeDataRegionAssigner {
         .match(event)
         .forEach(
             collector -> {
-              event.increaseReferenceCount(PipeRealtimeDataRegionHybridCollector.class.getName());
+              event.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
               collector.collect(event);
             });
     event.gcSchemaInfo();
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
index bfa764c3711..778a583f1c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
@@ -81,9 +81,7 @@ public class PipeInsertionDataNodeListener {
       return;
     }
 
-    assigner.publishToAssign(
-        PipeRealtimeCollectEventFactory.createCollectEvent(
-            tsFileResource.getTsFile(), tsFileResource));
+    assigner.publishToAssign(PipeRealtimeCollectEventFactory.createCollectEvent(tsFileResource));
   }
 
   // TODO: check whether the method is called on the right place.
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/IoTDBThriftConnectorClient.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/IoTDBThriftConnectorClient.java
new file mode 100644
index 00000000000..16ebfb2d74d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/IoTDBThriftConnectorClient.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector.impl.iotdb;
+
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
+
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
+
+public class IoTDBThriftConnectorClient extends IClientRPCService.Client
+    implements ThriftClient, AutoCloseable {
+
+  public IoTDBThriftConnectorClient(ThriftClientProperty property, String ipAddress, int port)
+      throws TTransportException {
+    super(
+        property
+            .getProtocolFactory()
+            .getProtocol(
+                RpcTransportFactory.INSTANCE.getTransport(
+                    new TSocket(
+                        TConfigurationConst.defaultTConfiguration,
+                        ipAddress,
+                        port,
+                        property.getConnectionTimeoutMs()))));
+    getInputProtocol().getTransport().open();
+  }
+
+  @Override
+  public void close() throws Exception {
+    invalidate();
+  }
+
+  @Override
+  public void invalidate() {
+    if (getInputProtocol().getTransport().isOpen()) {
+      getInputProtocol().getTransport().close();
+    }
+  }
+
+  @Override
+  public void invalidateAll() {
+    invalidate();
+  }
+
+  @Override
+  public boolean printLogWhenEncounterException() {
+    return true;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/IoTDBThriftConnectorVersion.java
similarity index 74%
copy from server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
copy to server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/IoTDBThriftConnectorVersion.java
index 67a637503b2..d05fd374544 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConnectorConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/IoTDBThriftConnectorVersion.java
@@ -17,13 +17,19 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.config;
+package org.apache.iotdb.db.pipe.core.connector.impl.iotdb;
 
-public class PipeConnectorConstant {
+public enum IoTDBThriftConnectorVersion {
+  VERSION_ONE((byte) 1),
+  ;
 
-  public static final String CONNECTOR_KEY = "connector";
+  private final byte version;
 
-  private PipeConnectorConstant() {
-    throw new IllegalStateException("Utility class");
+  IoTDBThriftConnectorVersion(byte type) {
+    this.version = type;
+  }
+
+  public byte getVersion() {
+    return version;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
new file mode 100644
index 00000000000..32408769579
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1;
+
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.config.PipeConnectorConstant;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.IoTDBThriftConnectorClient;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.reponse.PipeTransferFilePieceResp;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferFilePieceReq;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferFileSealReq;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferHandshakeReq;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferInsertNodeReq;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Arrays;
+
+public class IoTDBThriftConnectorV1 implements PipeConnector {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBThriftConnectorV1.class);
+
+  private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig();
+  private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig();
+
+  private String ipAddress;
+  private int port;
+
+  private IoTDBThriftConnectorClient client;
+
+  public IoTDBThriftConnectorV1() {}
+
+  @Override
+  public void validate(PipeParameterValidator validator) throws Exception {
+    validator
+        .validateRequiredAttribute(PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY)
+        .validateRequiredAttribute(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY);
+  }
+
+  @Override
+  public void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration)
+      throws Exception {
+    this.ipAddress = parameters.getString(PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY);
+    this.port = parameters.getInt(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY);
+  }
+
+  @Override
+  public void handshake() throws Exception {
+    if (client != null) {
+      client.close();
+    }
+
+    client =
+        new IoTDBThriftConnectorClient(
+            new ThriftClientProperty.Builder()
+                .setConnectionTimeoutMs(COMMON_CONFIG.getConnectionTimeoutInMS())
+                .setRpcThriftCompressionEnabled(COMMON_CONFIG.isRpcThriftCompressionEnabled())
+                .build(),
+            ipAddress,
+            port);
+
+    final TPipeTransferResp resp =
+        client.pipeTransfer(
+            PipeTransferHandshakeReq.toTPipeTransferReq(IOTDB_CONFIG.getTimestampPrecision()));
+    if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(String.format("Handshake error, result status %s.", resp.status));
+    }
+  }
+
+  @Override
+  public void heartbeat() throws Exception {}
+
+  @Override
+  public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
+    // TODO: support more TabletInsertionEvent
+    // PipeProcessor can change the type of TabletInsertionEvent
+    if (!(tabletInsertionEvent instanceof PipeTabletInsertionEvent)) {
+      throw new NotImplementedException(
+          "IoTDBThriftConnectorV1 only support PipeTabletInsertionEvent.");
+    }
+
+    try {
+      doTransfer((PipeTabletInsertionEvent) tabletInsertionEvent);
+    } catch (TException e) {
+      LOGGER.error(
+          "Network error when transfer tablet insertion event: {}.", tabletInsertionEvent, e);
+      // the connection may be broken, try to reconnect by catching PipeConnectionException
+      throw new PipeConnectionException("Network error when transfer tablet insertion event.", e);
+    }
+  }
+
+  private void doTransfer(PipeTabletInsertionEvent pipeTabletInsertionEvent)
+      throws PipeException, TException {
+    final TPipeTransferResp resp =
+        client.pipeTransfer(
+            PipeTransferInsertNodeReq.toTPipeTransferReq(pipeTabletInsertionEvent.getInsertNode()));
+
+    if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(
+          String.format(
+              "Transfer tablet insertion event %s error, result status %s",
+              pipeTabletInsertionEvent, resp.status));
+    }
+  }
+
+  @Override
+  public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
+    // TODO: support more TsFileInsertionEvent
+    // PipeProcessor can change the type of TabletInsertionEvent
+    if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
+      throw new NotImplementedException(
+          "IoTDBThriftConnectorV1 only support PipeTsFileInsertionEvent.");
+    }
+
+    try {
+      doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent);
+    } catch (TException e) {
+      LOGGER.error(
+          "Network error when transfer tsfile insertion event: {}.", tsFileInsertionEvent, e);
+      // the connection may be broken, try to reconnect by catching PipeConnectionException
+      throw new PipeConnectionException("Network error when transfer tsfile insertion event.", e);
+    }
+  }
+
+  private void doTransfer(PipeTsFileInsertionEvent pipeTsFileInsertionEvent)
+      throws PipeException, TException, InterruptedException, IOException {
+    pipeTsFileInsertionEvent.waitForTsFileClose();
+
+    final File tsFile = pipeTsFileInsertionEvent.getTsFile();
+
+    // 1. transfer file piece by piece
+    final int readFileBufferSize = PipeConfig.getInstance().getReadFileBufferSize();
+    final byte[] readBuffer = new byte[readFileBufferSize];
+    long position = 0;
+    try (final RandomAccessFile reader = new RandomAccessFile(tsFile, "r")) {
+      while (true) {
+        final int readLength = reader.read(readBuffer);
+        if (readLength == -1) {
+          break;
+        }
+
+        final PipeTransferFilePieceResp resp =
+            PipeTransferFilePieceResp.fromTPipeTransferResp(
+                client.pipeTransfer(
+                    PipeTransferFilePieceReq.toTPipeTransferReq(
+                        tsFile.getName(),
+                        position,
+                        readLength == readFileBufferSize
+                            ? readBuffer
+                            : Arrays.copyOfRange(readBuffer, 0, readLength))));
+        position += readLength;
+
+        // this case only happens when the connection is broken, and the connector is reconnected
+        // to the receiver, then the receiver will redirect the file position to the last position
+        if (resp.getStatus().getCode()
+            == TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) {
+          position = resp.getEndWritingOffset();
+          reader.seek(position);
+          LOGGER.info(String.format("Redirect file position to %s.", position));
+          continue;
+        }
+
+        if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          throw new PipeException(
+              String.format("Transfer file %s error, result status %s.", tsFile, resp.getStatus()));
+        }
+      }
+    }
+
+    // 2. transfer file seal signal, which means the file is transferred completely
+    final TPipeTransferResp resp =
+        client.pipeTransfer(
+            PipeTransferFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length()));
+    if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(
+          String.format("Seal file %s error, result status %s.", tsFile, resp.getStatus()));
+    }
+  }
+
+  @Override
+  public void transfer(DeletionEvent deletionEvent) throws Exception {
+    throw new NotImplementedException("Not implement for deletion event.");
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (client != null) {
+      client.close();
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
new file mode 100644
index 00000000000..2cbcc748db7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.db.pipe.agent.receiver.IoTDBThriftReceiver;
+import org.apache.iotdb.db.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.reponse.PipeTransferFilePieceResp;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferFilePieceReq;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferFileSealReq;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferHandshakeReq;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferInsertNodeReq;
+import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+public class IoTDBThriftReceiverV1 implements IoTDBThriftReceiver {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBThriftReceiverV1.class);
+
+  private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig();
+  private static final String RECEIVE_DIR = PipeConfig.getInstance().getReceiveFileDir();
+
+  private File writingFile;
+  private RandomAccessFile writingFileWriter;
+
+  @Override
+  public synchronized TPipeTransferResp handleTransferReq(
+      TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
+    final short rawRequestType = req.getType();
+    if (PipeRequestType.isValidatedRequestType(rawRequestType)) {
+      switch (PipeRequestType.valueOf(rawRequestType)) {
+        case HANDSHAKE:
+          return handleTransferHandshake(PipeTransferHandshakeReq.fromTPipeTransferReq(req));
+        case TRANSFER_INSERT_NODE:
+          return handleTransferInsertNode(
+              PipeTransferInsertNodeReq.fromTPipeTransferReq(req), partitionFetcher, schemaFetcher);
+        case TRANSFER_FILE_PIECE:
+          return handleTransferFilePiece(PipeTransferFilePieceReq.fromTPipeTransferReq(req));
+        case TRANSFER_FILE_SEAL:
+          return handleTransferFileSeal(
+              PipeTransferFileSealReq.fromTPipeTransferReq(req), partitionFetcher, schemaFetcher);
+        default:
+          break;
+      }
+    }
+
+    // unknown request type, which means the request can not be handled by this receiver,
+    // maybe the version of the receiver is not compatible with the sender
+    return new TPipeTransferResp(
+        RpcUtils.getStatus(
+            TSStatusCode.PIPE_TYPE_ERROR,
+            String.format("Unknown transfer type %s.", rawRequestType)));
+  }
+
+  private TPipeTransferResp handleTransferHandshake(PipeTransferHandshakeReq req) {
+    if (!IOTDB_CONFIG.getTimestampPrecision().equals(req.getTimestampPrecision())) {
+      String msg =
+          String.format(
+              "IoTDB receiver's timestamp precision %s, connector's timestamp precision %s. validation fails.",
+              IOTDB_CONFIG.getTimestampPrecision(), req.getTimestampPrecision());
+      LOGGER.warn(msg);
+      return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_HANDSHAKE_ERROR, msg));
+    }
+
+    LOGGER.info("Handshake successfully.");
+    return new TPipeTransferResp(RpcUtils.SUCCESS_STATUS);
+  }
+
+  private TPipeTransferResp handleTransferInsertNode(
+      PipeTransferInsertNodeReq req,
+      IPartitionFetcher partitionFetcher,
+      ISchemaFetcher schemaFetcher) {
+    return new TPipeTransferResp(
+        executeStatement(req.constructStatement(), partitionFetcher, schemaFetcher));
+  }
+
+  private TSStatus executeStatement(
+      Statement statement, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
+    final long queryId = SessionManager.getInstance().requestQueryId();
+    final ExecutionResult result =
+        Coordinator.getInstance()
+            .execute(
+                statement,
+                queryId,
+                null,
+                "",
+                partitionFetcher,
+                schemaFetcher,
+                IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
+    if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      LOGGER.warn(
+          "failed to execute statement, statement: {}, result status is: {}",
+          statement,
+          result.status);
+    }
+    return result.status;
+  }
+
+  private TPipeTransferResp handleTransferFilePiece(PipeTransferFilePieceReq req) {
+    try {
+      updateWritingFileIfNeeded(req.getFileName());
+
+      if (!isWritingFileOffsetCorrect(req.getStartWritingOffset())) {
+        return PipeTransferFilePieceResp.toTPipeTransferResp(
+            RpcUtils.getStatus(
+                TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET,
+                String.format(
+                    "request sender reset file reader's offset from %s to %s.",
+                    req.getStartWritingOffset(), writingFileWriter.length())),
+            writingFileWriter.length());
+      }
+
+      writingFileWriter.write(req.getFilePiece());
+      return PipeTransferFilePieceResp.toTPipeTransferResp(
+          RpcUtils.SUCCESS_STATUS, writingFileWriter.length());
+    } catch (Exception e) {
+      LOGGER.warn(String.format("failed to write file piece from req %s.", req), e);
+      final TSStatus status =
+          RpcUtils.getStatus(
+              TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
+              String.format("failed to write file piece, because %s", e.getMessage()));
+      try {
+        return PipeTransferFilePieceResp.toTPipeTransferResp(
+            status, PipeTransferFilePieceResp.ERROR_END_OFFSET);
+      } catch (IOException ex) {
+        return PipeTransferFilePieceResp.toTPipeTransferResp(status);
+      }
+    }
+  }
+
+  private void updateWritingFileIfNeeded(String fileName) throws IOException {
+    if (isFileExistedAndNameCorrect(fileName)) {
+      return;
+    }
+
+    if (writingFileWriter != null) {
+      writingFileWriter.close();
+      writingFileWriter = null;
+    }
+    if (writingFile != null && writingFile.exists()) {
+      final boolean ignored = writingFile.delete();
+      LOGGER.info(String.format("original file %s was deleted.", writingFile.getPath()));
+      writingFile = null;
+    }
+
+    final File receiveDir = new File(RECEIVE_DIR);
+    if (!receiveDir.exists()) {
+      boolean ignored = receiveDir.mkdirs();
+    }
+    writingFile = new File(RECEIVE_DIR, fileName);
+    writingFileWriter = new RandomAccessFile(writingFile, "rw");
+    LOGGER.info(String.format("start to write transferring file %s.", writingFile.getPath()));
+  }
+
+  private boolean isFileExistedAndNameCorrect(String fileName) {
+    return writingFile != null && writingFile.getName().equals(fileName);
+  }
+
+  private boolean isWritingFileOffsetCorrect(long offset) throws IOException {
+    return writingFileWriter.length() == offset;
+  }
+
+  private TPipeTransferResp handleTransferFileSeal(
+      PipeTransferFileSealReq req,
+      IPartitionFetcher partitionFetcher,
+      ISchemaFetcher schemaFetcher) {
+    try {
+      if (!isWritingFileAvailable()) {
+        return new TPipeTransferResp(
+            RpcUtils.getStatus(
+                TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
+                String.format(
+                    "failed to seal file, because writing file %s is not available.",
+                    req.getFileName())));
+      }
+
+      if (!isFileExistedAndNameCorrect(req.getFileName())) {
+        return new TPipeTransferResp(
+            RpcUtils.getStatus(
+                TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
+                String.format(
+                    "failed to seal file %s, but writing file is %s.",
+                    req.getFileName(), writingFile)));
+      }
+
+      if (!isWritingFileOffsetCorrect(req.getFileLength())) {
+        return new TPipeTransferResp(
+            RpcUtils.getStatus(
+                TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
+                String.format(
+                    "failed to seal file because the length of file is not correct. "
+                        + "the original file has length %s, but receiver file has length %s.",
+                    req.getFileLength(), writingFileWriter.length())));
+      }
+
+      writingFileWriter.close();
+
+      final LoadTsFileStatement statement = new LoadTsFileStatement(writingFile.getAbsolutePath());
+      statement.setDeleteAfterLoad(true);
+      statement.setVerifySchema(true);
+      statement.setAutoCreateDatabase(false);
+      return new TPipeTransferResp(executeStatement(statement, partitionFetcher, schemaFetcher));
+    } catch (IOException e) {
+      LOGGER.warn(String.format("failed to seal file %s from req %s.", writingFile, req), e);
+      return new TPipeTransferResp(
+          RpcUtils.getStatus(
+              TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
+              String.format("failed to seal file %s because %s", writingFile, e.getMessage())));
+    }
+  }
+
+  private boolean isWritingFileAvailable() {
+    return writingFile != null && writingFile.exists() && writingFileWriter != null;
+  }
+
+  @Override
+  public synchronized void handleExit() {
+    try {
+      if (writingFileWriter != null) {
+        writingFileWriter.close();
+      }
+      if (writingFile != null) {
+        if (!writingFile.delete()) {
+          LOGGER.warn(
+              String.format(
+                  "IoTDBThriftReceiverV1#handleExit: delete file %s error.",
+                  writingFile.getPath()));
+        }
+      }
+    } catch (IOException e) {
+      LOGGER.warn("IoTDBThriftReceiverV1#handleExit: meeting errors on handleExit().", e);
+    }
+  }
+
+  @Override
+  public IoTDBThriftConnectorVersion getVersion() {
+    return IoTDBThriftConnectorVersion.VERSION_ONE;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/PipeRequestType.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/PipeRequestType.java
new file mode 100644
index 00000000000..982243cd87e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/PipeRequestType.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public enum PipeRequestType {
+  HANDSHAKE((short) 1),
+
+  TRANSFER_INSERT_NODE((short) 2),
+
+  TRANSFER_FILE_PIECE((short) 3),
+  TRANSFER_FILE_SEAL((short) 4),
+  ;
+
+  private final short type;
+
+  PipeRequestType(short type) {
+    this.type = type;
+  }
+
+  public short getType() {
+    return type;
+  }
+
+  private static final Map<Short, PipeRequestType> TYPE_MAP =
+      Arrays.stream(PipeRequestType.values())
+          .collect(HashMap::new, (map, type) -> map.put(type.getType(), type), HashMap::putAll);
+
+  public static boolean isValidatedRequestType(short type) {
+    return TYPE_MAP.containsKey(type);
+  }
+
+  public static PipeRequestType valueOf(short type) {
+    return TYPE_MAP.get(type);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/reponse/PipeTransferFilePieceResp.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/reponse/PipeTransferFilePieceResp.java
new file mode 100644
index 00000000000..56c3b8a4cd5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/reponse/PipeTransferFilePieceResp.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.reponse;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class PipeTransferFilePieceResp extends TPipeTransferResp {
+
+  public static final long ERROR_END_OFFSET = -1;
+
+  private long endWritingOffset;
+
+  private PipeTransferFilePieceResp() {}
+
+  public long getEndWritingOffset() {
+    return endWritingOffset;
+  }
+
+  public static PipeTransferFilePieceResp toTPipeTransferResp(
+      TSStatus status, long endWritingOffset) throws IOException {
+    final PipeTransferFilePieceResp filePieceResp = new PipeTransferFilePieceResp();
+
+    filePieceResp.status = status;
+
+    filePieceResp.endWritingOffset = endWritingOffset;
+    try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(endWritingOffset, outputStream);
+      filePieceResp.body =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+    }
+
+    return filePieceResp;
+  }
+
+  public static PipeTransferFilePieceResp toTPipeTransferResp(TSStatus status) {
+    final PipeTransferFilePieceResp filePieceResp = new PipeTransferFilePieceResp();
+
+    filePieceResp.status = status;
+
+    return filePieceResp;
+  }
+
+  public static PipeTransferFilePieceResp fromTPipeTransferResp(TPipeTransferResp transferResp) {
+    final PipeTransferFilePieceResp filePieceResp = new PipeTransferFilePieceResp();
+
+    filePieceResp.status = transferResp.status;
+
+    if (transferResp.isSetBody()) {
+      filePieceResp.endWritingOffset = ReadWriteIOUtils.readLong(transferResp.body);
+      filePieceResp.body = transferResp.body;
+    }
+
+    return filePieceResp;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferFilePieceReq.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferFilePieceReq.java
new file mode 100644
index 00000000000..6f6266fa1e9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferFilePieceReq.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request;
+
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.PipeRequestType;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class PipeTransferFilePieceReq extends TPipeTransferReq {
+
+  private String fileName;
+  private long startWritingOffset;
+  private byte[] filePiece;
+
+  private PipeTransferFilePieceReq() {}
+
+  public String getFileName() {
+    return fileName;
+  }
+
+  public long getStartWritingOffset() {
+    return startWritingOffset;
+  }
+
+  public byte[] getFilePiece() {
+    return filePiece;
+  }
+
+  public static PipeTransferFilePieceReq toTPipeTransferReq(
+      String fileName, long startWritingOffset, byte[] filePiece) throws IOException {
+    final PipeTransferFilePieceReq filePieceReq = new PipeTransferFilePieceReq();
+
+    filePieceReq.fileName = fileName;
+    filePieceReq.startWritingOffset = startWritingOffset;
+    filePieceReq.filePiece = filePiece;
+
+    filePieceReq.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion();
+    filePieceReq.type = PipeRequestType.TRANSFER_FILE_PIECE.getType();
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(fileName, outputStream);
+      ReadWriteIOUtils.write(startWritingOffset, outputStream);
+      ReadWriteIOUtils.write(new Binary(filePiece), outputStream);
+      filePieceReq.body =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+    }
+
+    return filePieceReq;
+  }
+
+  public static PipeTransferFilePieceReq fromTPipeTransferReq(TPipeTransferReq transferReq) {
+    final PipeTransferFilePieceReq filePieceReq = new PipeTransferFilePieceReq();
+
+    filePieceReq.fileName = ReadWriteIOUtils.readString(transferReq.body);
+    filePieceReq.startWritingOffset = ReadWriteIOUtils.readLong(transferReq.body);
+    filePieceReq.filePiece = ReadWriteIOUtils.readBinary(transferReq.body).getValues();
+
+    filePieceReq.version = transferReq.version;
+    filePieceReq.type = transferReq.type;
+    filePieceReq.body = transferReq.body;
+
+    return filePieceReq;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferFileSealReq.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferFileSealReq.java
new file mode 100644
index 00000000000..935131db950
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferFileSealReq.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request;
+
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.PipeRequestType;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class PipeTransferFileSealReq extends TPipeTransferReq {
+
+  private String fileName;
+  private long fileLength;
+
+  private PipeTransferFileSealReq() {}
+
+  public String getFileName() {
+    return fileName;
+  }
+
+  public long getFileLength() {
+    return fileLength;
+  }
+
+  public static PipeTransferFileSealReq toTPipeTransferReq(String fileName, long fileLength)
+      throws IOException {
+    final PipeTransferFileSealReq fileSealReq = new PipeTransferFileSealReq();
+
+    fileSealReq.fileName = fileName;
+    fileSealReq.fileLength = fileLength;
+
+    fileSealReq.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion();
+    fileSealReq.type = PipeRequestType.TRANSFER_FILE_SEAL.getType();
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(fileName, outputStream);
+      ReadWriteIOUtils.write(fileLength, outputStream);
+      fileSealReq.body =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+    }
+
+    return fileSealReq;
+  }
+
+  public static PipeTransferFileSealReq fromTPipeTransferReq(TPipeTransferReq req) {
+    final PipeTransferFileSealReq fileSealReq = new PipeTransferFileSealReq();
+
+    fileSealReq.fileName = ReadWriteIOUtils.readString(req.body);
+    fileSealReq.fileLength = ReadWriteIOUtils.readLong(req.body);
+
+    fileSealReq.version = req.version;
+    fileSealReq.type = req.type;
+    fileSealReq.body = req.body;
+
+    return fileSealReq;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferHandshakeReq.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferHandshakeReq.java
new file mode 100644
index 00000000000..f258c5036f9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferHandshakeReq.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request;
+
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.PipeRequestType;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class PipeTransferHandshakeReq extends TPipeTransferReq {
+
+  private String timestampPrecision;
+
+  private PipeTransferHandshakeReq() {}
+
+  public String getTimestampPrecision() {
+    return timestampPrecision;
+  }
+
+  public static PipeTransferHandshakeReq toTPipeTransferReq(String timestampPrecision)
+      throws IOException {
+    final PipeTransferHandshakeReq handshakeReq = new PipeTransferHandshakeReq();
+
+    handshakeReq.timestampPrecision = timestampPrecision;
+
+    handshakeReq.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion();
+    handshakeReq.type = PipeRequestType.HANDSHAKE.getType();
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(timestampPrecision, outputStream);
+      handshakeReq.body =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+    }
+
+    return handshakeReq;
+  }
+
+  public static PipeTransferHandshakeReq fromTPipeTransferReq(TPipeTransferReq transferReq) {
+    final PipeTransferHandshakeReq handshakeReq = new PipeTransferHandshakeReq();
+
+    handshakeReq.timestampPrecision = ReadWriteIOUtils.readString(transferReq.body);
+
+    handshakeReq.version = transferReq.version;
+    handshakeReq.type = transferReq.type;
+    handshakeReq.body = transferReq.body;
+
+    return handshakeReq;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferInsertNodeReq.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferInsertNodeReq.java
new file mode 100644
index 00000000000..90af3b2fae6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferInsertNodeReq.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.PipeRequestType;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+public class PipeTransferInsertNodeReq extends TPipeTransferReq {
+
+  private InsertNode insertNode;
+
+  private PipeTransferInsertNodeReq() {}
+
+  public InsertNode getInsertNode() {
+    return insertNode;
+  }
+
+  public Statement constructStatement() {
+    if (insertNode instanceof InsertRowNode) {
+      final InsertRowNode node = (InsertRowNode) insertNode;
+
+      final InsertRowStatement statement = new InsertRowStatement();
+      statement.setDevicePath(node.getDevicePath());
+      statement.setTime(node.getTime());
+      statement.setMeasurements(node.getMeasurements());
+      statement.setDataTypes(node.getDataTypes());
+      statement.setValues(node.getValues());
+      statement.setNeedInferType(true);
+      statement.setAligned(node.isAligned());
+      return statement;
+    }
+
+    if (insertNode instanceof InsertTabletNode) {
+      final InsertTabletNode node = (InsertTabletNode) insertNode;
+
+      final InsertTabletStatement statement = new InsertTabletStatement();
+      statement.setDevicePath(node.getDevicePath());
+      statement.setMeasurements(node.getMeasurements());
+      statement.setTimes(node.getTimes());
+      statement.setColumns(node.getColumns());
+      statement.setBitMaps(node.getBitMaps());
+      statement.setRowCount(node.getRowCount());
+      statement.setDataTypes(node.getDataTypes());
+      statement.setAligned(node.isAligned());
+      return statement;
+    }
+
+    throw new UnsupportedOperationException(
+        String.format(
+            "unknown InsertNode type %s when constructing statement from insert node.",
+            insertNode));
+  }
+
+  public static PipeTransferInsertNodeReq toTPipeTransferReq(InsertNode insertNode) {
+    final PipeTransferInsertNodeReq req = new PipeTransferInsertNodeReq();
+
+    req.insertNode = insertNode;
+
+    req.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion();
+    req.type = PipeRequestType.TRANSFER_INSERT_NODE.getType();
+    req.body = insertNode.serializeToByteBuffer();
+
+    return req;
+  }
+
+  public static PipeTransferInsertNodeReq fromTPipeTransferReq(TPipeTransferReq transferReq) {
+    final PipeTransferInsertNodeReq insertNodeReq = new PipeTransferInsertNodeReq();
+
+    insertNodeReq.insertNode = (InsertNode) PlanNodeType.deserialize(transferReq.body);
+
+    insertNodeReq.version = transferReq.version;
+    insertNodeReq.type = transferReq.type;
+    insertNodeReq.body = transferReq.body;
+
+    return insertNodeReq;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
index 9ed53ae310e..a805844f855 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskLifeCycle.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskLifeCycle.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.core.connector;
+package org.apache.iotdb.db.pipe.core.connector.manager;
 
 import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
similarity index 86%
rename from server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
rename to server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
index 33a298c5d02..06d9942f525 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
@@ -17,9 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.core.connector;
+package org.apache.iotdb.db.pipe.core.connector.manager;
 
+import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.config.PipeConnectorConstant;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.IoTDBThriftConnectorV1;
 import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
@@ -46,15 +49,24 @@ public class PipeConnectorSubtaskManager {
         new TreeMap<>(pipeConnectorParameters.getAttribute()).toString();
 
     if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
-      // 1. construct, validate and customize PipeConnector
+      // TODO: construct all PipeConnector with the same reflection method, avoid using if-else
+      // 1. construct, validate and customize PipeConnector, and then handshake (create connection)
+      // with the target
       final PipeConnector pipeConnector =
-          PipeAgent.plugin().reflectConnector(pipeConnectorParameters);
+          pipeConnectorParameters
+                  .getStringOrDefault(
+                      PipeConnectorConstant.CONNECTOR_KEY,
+                      BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
+                  .equals(BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
+              ? new IoTDBThriftConnectorV1()
+              : PipeAgent.plugin().reflectConnector(pipeConnectorParameters);
       try {
         pipeConnector.validate(new PipeParameterValidator(pipeConnectorParameters));
         final PipeConnectorRuntimeConfiguration runtimeConfiguration =
             new PipeConnectorRuntimeConfiguration();
         pipeConnector.customize(pipeConnectorParameters, runtimeConfiguration);
         // TODO: use runtimeConfiguration to configure PipeConnector
+        pipeConnector.handshake();
       } catch (Exception e) {
         throw new PipeManagementException(
             "Failed to construct PipeConnector, because of " + e.getMessage(), e);
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
index e97e0da9b18..62c599b66c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
@@ -41,6 +41,10 @@ public class PipeTabletInsertionEvent implements TabletInsertionEvent, EnrichedE
     this.referenceCount = new AtomicInteger(0);
   }
 
+  public InsertNode getInsertNode() {
+    return insertNode;
+  }
+
   @Override
   public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer) {
     throw new UnsupportedOperationException("Not implemented yet");
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
index 1a460b6b0a2..65f78cebac2 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.pipe.core.event.impl;
 
+import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -28,15 +30,46 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class PipeTsFileInsertionEvent implements TsFileInsertionEvent, EnrichedEvent {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeTsFileInsertionEvent.class);
 
   private File tsFile;
+  private final AtomicBoolean isClosed;
 
-  public PipeTsFileInsertionEvent(File tsFile) {
-    this.tsFile = tsFile;
+  public PipeTsFileInsertionEvent(TsFileResource resource) {
+    tsFile = resource.getTsFile();
+
+    isClosed = new AtomicBoolean(resource.isClosed());
+    // register close listener if TsFile is not closed
+    if (!isClosed.get()) {
+      final TsFileProcessor processor = resource.getProcessor();
+      if (processor != null) {
+        processor.addCloseFileListener(
+            o -> {
+              synchronized (isClosed) {
+                isClosed.set(true);
+                isClosed.notifyAll();
+              }
+            });
+      }
+    }
+  }
+
+  public void waitForTsFileClose() throws InterruptedException {
+    if (!isClosed.get()) {
+      synchronized (isClosed) {
+        while (!isClosed.get()) {
+          isClosed.wait();
+        }
+      }
+    }
+  }
+
+  public File getTsFile() {
+    return tsFile;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
index 29cbefa8d14..4c98c5193be 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
@@ -24,15 +24,13 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
 
-import java.io.File;
-
 public class PipeRealtimeCollectEventFactory {
 
   private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new TsFileEpochManager();
 
-  public static PipeRealtimeCollectEvent createCollectEvent(File tsFile, TsFileResource resource) {
+  public static PipeRealtimeCollectEvent createCollectEvent(TsFileResource resource) {
     return TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent(
-        new PipeTsFileInsertionEvent(tsFile), resource);
+        new PipeTsFileInsertionEvent(resource), resource);
   }
 
   public static PipeRealtimeCollectEvent createCollectEvent(
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
index 8bde5dda46c..a1f443a9d43 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeEventCollector.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.core.event.view.collector;
 
 import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
-import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.deletion.DeletionEvent;
@@ -65,7 +64,7 @@ public class PipeEventCollector implements EventCollector {
 
   private synchronized void collect(Event event) {
     if (event instanceof EnrichedEvent) {
-      ((EnrichedEvent) event).increaseReferenceCount(PipeConnectorSubtask.class.getName());
+      ((EnrichedEvent) event).increaseReferenceCount(PipeEventCollector.class.getName());
     }
 
     while (!bufferQueue.isEmpty()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
index bf4f8daf2e1..2952476dbbe 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
@@ -29,13 +29,10 @@ import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.concurrent.NotThreadSafe;
-
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 
-@NotThreadSafe
 public abstract class PipeSubtaskExecutor {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeSubtaskExecutor.class);
@@ -71,7 +68,7 @@ public abstract class PipeSubtaskExecutor {
     subtask.bindExecutors(subtaskWorkerThreadPoolExecutor, subtaskCallbackListeningExecutor);
   }
 
-  public final void start(String subTaskID) {
+  public final synchronized void start(String subTaskID) {
     if (!registeredIdSubtaskMapper.containsKey(subTaskID)) {
       LOGGER.warn("The subtask {} is not registered.", subTaskID);
       return;
@@ -79,7 +76,9 @@ public abstract class PipeSubtaskExecutor {
 
     final PipeSubtask subtask = registeredIdSubtaskMapper.get(subTaskID);
     if (subtask.isSubmittingSelf()) {
-      LOGGER.info("The subtask {} is already running.", subTaskID);
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("The subtask {} is already running.", subTaskID);
+      }
     } else {
       subtask.allowSubmittingSelf();
       subtask.submitSelf();
@@ -87,7 +86,7 @@ public abstract class PipeSubtaskExecutor {
     }
   }
 
-  public final void stop(String subTaskID) {
+  public final synchronized void stop(String subTaskID) {
     if (!registeredIdSubtaskMapper.containsKey(subTaskID)) {
       LOGGER.warn("The subtask {} is not registered.", subTaskID);
       return;
@@ -96,7 +95,7 @@ public abstract class PipeSubtaskExecutor {
     registeredIdSubtaskMapper.get(subTaskID).disallowSubmittingSelf();
   }
 
-  public final void deregister(String subTaskID) {
+  public final synchronized void deregister(String subTaskID) {
     stop(subTaskID);
 
     final PipeSubtask subtask = registeredIdSubtaskMapper.remove(subTaskID);
@@ -122,7 +121,7 @@ public abstract class PipeSubtaskExecutor {
 
   /////////////////////// executor management  ///////////////////////
 
-  public final void shutdown() {
+  public final synchronized void shutdown() {
     if (isShutdown()) {
       return;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
index b8257845266..447ae5bdf09 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
@@ -47,14 +47,14 @@ public class PipeBuilder {
     final Map<TConsensusGroupId, PipeTask> consensusGroupIdToPipeTaskMap = new HashMap<>();
 
     final PipeRuntimeMeta pipeRuntimeMeta = pipeMeta.getRuntimeMeta();
-    for (Map.Entry<TConsensusGroupId, PipeTaskMeta> consensusGroupIdPipeTaskMeta :
+    for (Map.Entry<TConsensusGroupId, PipeTaskMeta> consensusGroupIdToPipeTaskMeta :
         pipeRuntimeMeta.getConsensusGroupIdToTaskMetaMap().entrySet()) {
       consensusGroupIdToPipeTaskMap.put(
-          consensusGroupIdPipeTaskMeta.getKey(),
+          consensusGroupIdToPipeTaskMeta.getKey(),
           new PipeTaskBuilder(
                   pipeName,
-                  Integer.toString(consensusGroupIdPipeTaskMeta.getValue().getRegionLeader()),
-                  // TODO: consensusGroupIdPipeTaskMeta.getValue().getProgressIndex() is not used
+                  Integer.toString(consensusGroupIdToPipeTaskMeta.getKey().getId()),
+                  // TODO: consensusGroupIdToPipeTaskMeta.getValue().getProgressIndex() is not used
                   collectorParameters,
                   processorParameters,
                   connectorParameters)
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 31e935256ff..a4866e87e8e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -32,6 +32,8 @@ import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfig
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
+import java.util.HashMap;
+
 public class PipeTaskCollectorStage extends PipeTaskStage {
 
   private final PipeParameters collectorParameters;
@@ -52,19 +54,28 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
   private final PipeCollector pipeCollector;
 
   public PipeTaskCollectorStage(String dataRegionId, PipeParameters collectorParameters) {
-    this.collectorParameters = collectorParameters;
-    // set data region id to collector parameters, so that collector can get data region id inside
-    // collector
-    collectorParameters.getAttribute().put(PipeCollectorConstant.DATA_REGION_KEY, dataRegionId);
-
+    // TODO: avoid if-else, use reflection to create collector all the time
     if (collectorParameters
         .getStringOrDefault(
             PipeCollectorConstant.COLLECTOR_KEY,
-            BuiltinPipePlugin.DEFAULT_COLLECTOR.getPipePluginName())
-        .equals(BuiltinPipePlugin.DEFAULT_COLLECTOR.getPipePluginName())) {
+            BuiltinPipePlugin.IOTDB_COLLECTOR.getPipePluginName())
+        .equals(BuiltinPipePlugin.IOTDB_COLLECTOR.getPipePluginName())) {
+      // we want to pass data region id to collector, so we need to create a new collector
+      // parameters and put data region id into it. we can't put data region id into collector
+      // parameters directly, because the given collector parameters may be used by other pipe task.
+      this.collectorParameters =
+          new PipeParameters(new HashMap<>(collectorParameters.getAttribute()));
+      // set data region id to collector parameters, so that collector can get data region id inside
+      // collector
+      this.collectorParameters
+          .getAttribute()
+          .put(PipeCollectorConstant.DATA_REGION_KEY, dataRegionId);
+
       collectorPendingQueue = new ListenableUnblockingPendingQueue<>();
       this.pipeCollector = new IoTDBDataRegionCollector(collectorPendingQueue);
     } else {
+      this.collectorParameters = collectorParameters;
+
       this.pipeCollector = PipeAgent.plugin().reflectCollector(collectorParameters);
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index 62cadbde87d..7f147b130bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.task.stage;
 
-import org.apache.iotdb.db.pipe.core.connector.PipeConnectorSubtaskManager;
+import org.apache.iotdb.db.pipe.core.connector.manager.PipeConnectorSubtaskManager;
 import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
 import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
@@ -34,17 +34,16 @@ public class PipeTaskConnectorStage extends PipeTaskStage {
 
   public PipeTaskConnectorStage(PipeParameters pipeConnectorParameters) {
     this.pipeConnectorParameters = pipeConnectorParameters;
-  }
-
-  @Override
-  public void createSubtask() throws PipeException {
     connectorSubtaskId =
         PipeConnectorSubtaskManager.instance()
             .register(
                 PipeSubtaskExecutorManager.getInstance().getConnectorSubtaskExecutor(),
-                pipeConnectorParameters);
+                pipeConnectorParameters); // TODO: should split to create
   }
 
+  @Override
+  public void createSubtask() throws PipeException {}
+
   @Override
   public void startSubtask() throws PipeException {
     PipeConnectorSubtaskManager.instance().start(connectorSubtaskId);
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 3ba031a23a4..6bbec4480c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -79,14 +79,18 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
             pipeProcessor,
             pipeConnectorOutputEventCollector);
 
+    final PipeTaskStage pipeTaskStage = this;
     this.pipeCollectorInputPendingQueue =
         pipeCollectorInputPendingQueue != null
             ? pipeCollectorInputPendingQueue
                 .registerEmptyToNotEmptyListener(
                     taskId,
                     () -> {
-                      if (status == PipeStatus.RUNNING) {
-                        executor.start(pipeProcessorSubtask.getTaskID());
+                      // status can be changed by other threads calling pipeTaskStage's methods
+                      synchronized (pipeTaskStage) {
+                        if (status == PipeStatus.RUNNING) {
+                          executor.start(pipeProcessorSubtask.getTaskID());
+                        }
                       }
                     })
                 .registerNotEmptyToEmptyListener(
@@ -99,10 +103,13 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
             .registerFullToNotFullListener(
                 taskId,
                 () -> {
-                  // only start when the pipe is running
-                  if (status == PipeStatus.RUNNING) {
-                    pipeConnectorOutputEventCollector.tryCollectBufferedEvents();
-                    executor.start(pipeProcessorSubtask.getTaskID());
+                  // status can be changed by other threads calling pipeTaskStage's methods
+                  synchronized (pipeTaskStage) {
+                    // only start when the pipe is running
+                    if (status == PipeStatus.RUNNING) {
+                      pipeConnectorOutputEventCollector.tryCollectBufferedEvents();
+                      executor.start(pipeProcessorSubtask.getTaskID());
+                    }
                   }
                 });
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index d9f2ea84bc7..80f366eb096 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.pipe.task.subtask;
 
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -53,7 +52,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
 
   // TODO: for a while
   @Override
-  protected void executeForAWhile() {
+  protected synchronized void executeForAWhile() {
     try {
       // TODO: reduce the frequency of heartbeat
       outputPipeConnector.heartbeat();
@@ -62,7 +61,9 @@ public class PipeConnectorSubtask extends PipeSubtask {
           "PipeConnector: failed to connect to the target system.", e);
     }
 
-    final Event event = inputPendingQueue.poll();
+    final Event event = lastEvent != null ? lastEvent : inputPendingQueue.poll();
+    // record this event for retrying on connection failure or other exceptions
+    lastEvent = event;
     if (event == null) {
       return;
     }
@@ -82,15 +83,15 @@ public class PipeConnectorSubtask extends PipeSubtask {
           throw new UnsupportedOperationException(
               "Unsupported event type: " + event.getClass().getName());
       }
+
+      releaseLastEvent();
+    } catch (PipeConnectionException e) {
+      throw e;
     } catch (Exception e) {
       e.printStackTrace();
       throw new PipeException(
           "Error occurred during executing PipeConnector#transfer, perhaps need to check whether the implementation of PipeConnector is correct according to the pipe-api description.",
           e);
-    } finally {
-      if (event instanceof EnrichedEvent) {
-        ((EnrichedEvent) event).decreaseReferenceCount(PipeConnectorSubtask.class.getName());
-      }
     }
   }
 
@@ -125,7 +126,13 @@ public class PipeConnectorSubtask extends PipeSubtask {
             MAX_RETRY_TIMES,
             taskID);
         lastFailedCause = throwable;
+
         PipeAgent.runtime().report(this);
+
+        // although the pipe task will be stopped, we still don't release the last event here
+        // because we need to keep it for the next retry. if user wants to restart the task,
+        // the last event will be processed again. the last event will be released when the task
+        // is dropped or the process is running normally.
         return;
       }
     }
@@ -135,9 +142,15 @@ public class PipeConnectorSubtask extends PipeSubtask {
   }
 
   @Override
-  public void close() {
+  // synchronized for outputPipeConnector.close() and releaseLastEvent() in super.close()
+  // make sure that the lastEvent will not be updated after pipeProcessor.close() to avoid
+  // resource leak because of the lastEvent is not released.
+  public synchronized void close() {
     try {
       outputPipeConnector.close();
+
+      // should be called after outputPipeConnector.close()
+      super.close();
     } catch (Exception e) {
       e.printStackTrace();
       LOGGER.info(
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index 4d15d3a3528..feb584fcaff 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.pipe.task.subtask;
 
-import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
@@ -52,8 +51,10 @@ public class PipeProcessorSubtask extends PipeSubtask {
   }
 
   @Override
-  protected void executeForAWhile() throws Exception {
-    Event event = inputEventSupplier.supply();
+  protected synchronized void executeForAWhile() throws Exception {
+    final Event event = lastEvent != null ? lastEvent : inputEventSupplier.supply();
+    // record the last event for retry when exception occurs
+    lastEvent = event;
     if (event == null) {
       return;
     }
@@ -73,22 +74,26 @@ public class PipeProcessorSubtask extends PipeSubtask {
           throw new UnsupportedOperationException(
               "Unsupported event type: " + event.getClass().getName());
       }
+
+      releaseLastEvent();
     } catch (Exception e) {
       e.printStackTrace();
       throw new PipeException(
           "Error occurred during executing PipeProcessor#process, perhaps need to check whether the implementation of PipeProcessor is correct according to the pipe-api description.",
           e);
-    } finally {
-      if (event instanceof EnrichedEvent) {
-        ((EnrichedEvent) event).decreaseReferenceCount(PipeProcessorSubtask.class.getName());
-      }
     }
   }
 
   @Override
-  public void close() {
+  // synchronized for pipeProcessor.close() and releaseLastEvent() in super.close().
+  // make sure that the lastEvent will not be updated after pipeProcessor.close() to avoid
+  // resource leak because of the lastEvent is not released.
+  public synchronized void close() {
     try {
       pipeProcessor.close();
+
+      // should be called after pipeProcessor.close()
+      super.close();
     } catch (Exception e) {
       e.printStackTrace();
       LOGGER.info(
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index e01c1390186..c770b983b9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.db.pipe.task.subtask;
 
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
+import org.apache.iotdb.pipe.api.event.Event;
 
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -42,15 +44,14 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
 
   private ListeningExecutorService subtaskWorkerThreadPoolExecutor;
   private ExecutorService subtaskCallbackListeningExecutor;
-
   private final DecoratingLock callbackDecoratingLock = new DecoratingLock();
+  private final AtomicBoolean shouldStopSubmittingSelf = new AtomicBoolean(true);
 
   protected static final int MAX_RETRY_TIMES = 5;
   private final AtomicInteger retryCount = new AtomicInteger(0);
-
   protected Throwable lastFailedCause;
 
-  private final AtomicBoolean shouldStopSubmittingSelf = new AtomicBoolean(true);
+  protected Event lastEvent;
 
   public PipeSubtask(String taskID) {
     super();
@@ -95,7 +96,13 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
           retryCount,
           throwable);
       lastFailedCause = throwable;
+
       PipeAgent.runtime().report(this);
+
+      // although the pipe task will be stopped, we still don't release the last event here
+      // because we need to keep it for the next retry. if user wants to restart the task,
+      // the last event will be processed again. the last event will be released when the task
+      // is dropped or the process is running normally.
     }
   }
 
@@ -114,6 +121,7 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
   }
 
   public void allowSubmittingSelf() {
+    retryCount.set(0);
     shouldStopSubmittingSelf.set(false);
   }
 
@@ -125,6 +133,20 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void
     return !shouldStopSubmittingSelf.get();
   }
 
+  @Override
+  public synchronized void close() {
+    releaseLastEvent();
+  }
+
+  protected void releaseLastEvent() {
+    if (lastEvent != null) {
+      if (lastEvent instanceof EnrichedEvent) {
+        ((EnrichedEvent) lastEvent).decreaseReferenceCount(PipeSubtask.class.getName());
+      }
+      lastEvent = null;
+    }
+  }
+
   public String getTaskID() {
     return taskID;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index c5e26996703..28bec0979bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.template.CreateSchemaTemp
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.DropSchemaTemplateStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.SetSchemaTemplateStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.UnsetSchemaTemplateStatement;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.query.control.clientsession.IClientSession;
 import org.apache.iotdb.db.quotas.DataNodeThrottleQuotaManager;
@@ -75,6 +76,8 @@ import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.ServerProperties;
 import org.apache.iotdb.service.rpc.thrift.TCreateTimeseriesUsingSchemaTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
@@ -2106,6 +2109,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
     return SyncService.getInstance().transportFile(metaInfo, buff);
   }
 
+  @Override
+  public TPipeTransferResp pipeTransfer(TPipeTransferReq req) {
+    return PipeAgent.receiver().transfer(req, partitionFetcher, schemaFetcher);
+  }
+
   @Override
   public TSBackupConfigurationResp getBackupConfiguration() {
     return new TSBackupConfigurationResp(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
@@ -2244,5 +2252,6 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       closeSession(req);
     }
     SyncService.getInstance().handleClientExit();
+    PipeAgent.receiver().handleClientExit();
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/connector/PipeThriftRequestTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/connector/PipeThriftRequestTest.java
new file mode 100644
index 00000000000..a3b65a66bfd
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/connector/PipeThriftRequestTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.reponse.PipeTransferFilePieceResp;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferFilePieceReq;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferFileSealReq;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferHandshakeReq;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferInsertNodeReq;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class PipeThriftRequestTest {
+
+  private static final String TIME_PRECISION = "ms";
+
+  @Test
+  public void testPipeValidateHandshakeReq() throws IOException {
+    PipeTransferHandshakeReq req = PipeTransferHandshakeReq.toTPipeTransferReq(TIME_PRECISION);
+    PipeTransferHandshakeReq deserializeReq = PipeTransferHandshakeReq.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertArrayEquals(req.getBody(), deserializeReq.getBody());
+
+    Assert.assertEquals(req.getTimestampPrecision(), deserializeReq.getTimestampPrecision());
+  }
+
+  @Test
+  public void testPipeTransferInsertNodeReq() {
+    PipeTransferInsertNodeReq req =
+        PipeTransferInsertNodeReq.toTPipeTransferReq(
+            new InsertRowNode(
+                new PlanNodeId(""),
+                new PartialPath(new String[] {"root", "sg", "d"}),
+                false,
+                new String[] {"s"},
+                new TSDataType[] {TSDataType.INT32},
+                1,
+                new Object[] {1},
+                false));
+    PipeTransferInsertNodeReq deserializeReq = PipeTransferInsertNodeReq.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertArrayEquals(req.getBody(), deserializeReq.getBody());
+
+    Assert.assertEquals(req.getInsertNode(), deserializeReq.getInsertNode());
+  }
+
+  @Test
+  public void testPipeTransferFilePieceReq() throws IOException {
+    byte[] body = "testPipeTransferFilePieceReq".getBytes();
+    String fileName = "1.tsfile";
+
+    PipeTransferFilePieceReq req = PipeTransferFilePieceReq.toTPipeTransferReq(fileName, 0, body);
+    PipeTransferFilePieceReq deserializeReq = PipeTransferFilePieceReq.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertArrayEquals(req.getBody(), deserializeReq.getBody());
+
+    Assert.assertEquals(req.getFileName(), deserializeReq.getFileName());
+    Assert.assertEquals(req.getStartWritingOffset(), deserializeReq.getStartWritingOffset());
+    Assert.assertArrayEquals(req.getFilePiece(), deserializeReq.getFilePiece());
+  }
+
+  @Test
+  public void testPipeTransferFileSealReq() throws IOException {
+    String fileName = "1.tsfile";
+
+    PipeTransferFileSealReq req = PipeTransferFileSealReq.toTPipeTransferReq(fileName, 100);
+    PipeTransferFileSealReq deserializeReq = PipeTransferFileSealReq.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertArrayEquals(req.getBody(), deserializeReq.getBody());
+
+    Assert.assertEquals(req.getFileName(), deserializeReq.getFileName());
+    Assert.assertEquals(req.getFileLength(), deserializeReq.getFileLength());
+  }
+
+  @Test
+  public void testPIpeTransferFilePieceResp() throws IOException {
+    PipeTransferFilePieceResp resp =
+        PipeTransferFilePieceResp.toTPipeTransferResp(RpcUtils.SUCCESS_STATUS, 100);
+    PipeTransferFilePieceResp deserializeResp =
+        PipeTransferFilePieceResp.fromTPipeTransferResp(resp);
+
+    Assert.assertEquals(resp.getStatus(), deserializeResp.getStatus());
+    Assert.assertEquals(resp.getEndWritingOffset(), deserializeResp.getEndWritingOffset());
+  }
+}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 937b5ea676a..79871f857af 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -207,7 +207,14 @@ public enum TSStatusCode {
   WRITE_SIZE_EXCEEDED(1705),
   READ_SIZE_EXCEEDED(1706),
   QUOTA_MEM_QUERY_NOT_ENOUGH(1707),
-  QUERY_CPU_QUERY_NOT_ENOUGH(1708);
+  QUERY_CPU_QUERY_NOT_ENOUGH(1708),
+
+  // Pipe
+  PIPE_VERSION_ERROR(1800),
+  PIPE_TYPE_ERROR(1801),
+  PIPE_HANDSHAKE_ERROR(1802),
+  PIPE_TRANSFER_FILE_OFFSET_RESET(1803),
+  PIPE_TRANSFER_FILE_ERROR(1804);
 
   private final int statusCode;
 
diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift
index 6779bbf89cc..8f6cf8e6297 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -455,6 +455,17 @@ struct TSyncTransportMetaInfo{
   2:required i64 startIndex
 }
 
+struct TPipeTransferReq {
+  1:required i8 version
+  2:required i16 type
+  3:required binary body
+}
+
+struct TPipeTransferResp {
+  1:required common.TSStatus status
+  2:optional binary body
+}
+
 struct TSBackupConfigurationResp {
   1: required common.TSStatus status
   2: optional bool enableOperationSync
@@ -595,6 +606,8 @@ service IClientRPCService {
 
   common.TSStatus sendFile(1:TSyncTransportMetaInfo metaInfo, 2:binary buff);
 
+  TPipeTransferResp pipeTransfer(TPipeTransferReq req);
+
   TSBackupConfigurationResp getBackupConfiguration();
 
   TSConnectionInfoResp fetchAllConnectionsInfo();