You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/10/11 07:25:01 UTC

[iotdb] branch master updated: [IOTDB-4528] Operate Pipe Procedure (#7533)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e9c2871b27 [IOTDB-4528] Operate Pipe Procedure  (#7533)
e9c2871b27 is described below

commit e9c2871b273e77df1758dde1389ba490befca4c5
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Tue Oct 11 15:24:55 2022 +0800

    [IOTDB-4528] Operate Pipe Procedure  (#7533)
---
 .../confignode/client/DataNodeRequestType.java     |   4 +
 .../client/async/AsyncDataNodeClientPool.java      |  14 ++
 .../client/async/handlers/AsyncClientHandler.java  |   2 +
 .../consensus/request/ConfigPhysicalPlan.java      |  12 ++
 .../consensus/request/ConfigPhysicalPlanType.java  |   3 +
 .../request/write/sync/GetPipeSinkPlan.java        |   1 +
 ...GetPipeSinkPlan.java => PreCreatePipePlan.java} |  25 +--
 ...GetPipeSinkPlan.java => SetPipeStatusPlan.java} |  41 +++--
 .../{GetPipeSinkPlan.java => ShowPipePlan.java}    |  23 +--
 .../confignode/consensus/response/PipeResp.java    |  37 ++++-
 .../iotdb/confignode/manager/ConfigManager.java    |  59 +++++++
 .../apache/iotdb/confignode/manager/IManager.java  |  50 ++++++
 .../iotdb/confignode/manager/ProcedureManager.java |  74 +++++++++
 .../iotdb/confignode/manager/SyncManager.java      |  98 ++++++++++-
 .../persistence/executor/ConfigPlanExecutor.java   |   9 +
 .../persistence/sync/ClusterSyncInfo.java          |  79 ++++++++-
 .../procedure/env/ConfigNodeProcedureEnv.java      |   7 +
 .../impl/sync/AbstractOperatePipeProcedure.java    | 142 ++++++++++++++++
 .../procedure/impl/sync/CreatePipeProcedure.java   | 146 ++++++++++++++++
 .../procedure/impl/sync/DropPipeProcedure.java     | 133 +++++++++++++++
 .../procedure/impl/sync/StartPipeProcedure.java    | 132 +++++++++++++++
 .../procedure/impl/sync/StopPipeProcedure.java     | 132 +++++++++++++++
 .../procedure/state/sync/OperatePipeState.java     |  12 +-
 .../procedure/store/ProcedureFactory.java          |  30 +++-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  28 ++++
 .../request/ConfigPhysicalPlanSerDeTest.java       |  42 +++++
 .../procedure/impl/OperatePipeProcedureTest.java   | 122 ++++++++++++++
 .../org/apache/iotdb/db/it/sync/IoTDBPipeIT.java   |   3 +-
 .../sync/PipeNotExistException.java}               |  17 +-
 .../iotdb/commons/sync/metadata/SyncMetadata.java  |  38 ++---
 .../commons/sync/persistence/SyncLogReader.java    |   7 +-
 .../apache/iotdb/commons/sync/pipe/PipeInfo.java   |  58 +++++--
 .../apache/iotdb/commons/sync/pipe/PipeStatus.java |   7 +-
 .../iotdb/commons/sync/pipe/SyncOperation.java     |   2 +
 .../iotdb/commons/sync/pipe/TsFilePipeInfo.java    |  15 ++
 .../iotdb/commons/sync/pipesink/IoTDBPipeSink.java |   8 +
 .../iotdb/commons/sync/pipesink/PipeSink.java      |  20 +++
 .../commons/utils/ThriftConfigNodeSerDeUtils.java  |  23 ++-
 .../apache/iotdb/db/client/ConfigNodeClient.java   |  83 ++++++++++
 .../iotdb/db/localconfignode/LocalConfigNode.java  |  11 +-
 .../config/executor/ClusterConfigTaskExecutor.java | 107 +++++++++---
 .../executor/StandaloneConfigTaskExecutor.java     |  12 +-
 .../config/sys/sync/ShowPipeSinkTask.java          |   2 +-
 .../execution/config/sys/sync/ShowPipeTask.java    |   6 +-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |   4 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |  40 +++++
 .../java/org/apache/iotdb/db/sync/SyncService.java | 184 +++++++++++----------
 .../db/sync/common/ClusterSyncInfoFetcher.java     | 144 ++++++++++++++++
 .../iotdb/db/sync/common/ISyncInfoFetcher.java     |  10 +-
 .../apache/iotdb/db/sync/common/LocalSyncInfo.java |  41 ++---
 .../iotdb/db/sync/common/LocalSyncInfoFetcher.java |  16 +-
 .../db/sync/sender/pipe/ExternalPipeSink.java      |   8 +
 .../apache/iotdb/db/utils/sync/SyncPipeUtil.java   |  74 ++++-----
 .../sync/receiver/manager/LocalSyncInfoTest.java   |  10 +-
 .../src/main/thrift/confignode.thrift              |  30 +++-
 thrift/src/main/thrift/datanode.thrift             |  20 +++
 56 files changed, 2132 insertions(+), 325 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index e3ac3c249e..e2e52f3613 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -60,6 +60,10 @@ public enum DataNodeRequestType {
   ACTIVE_TRIGGER_INSTANCE,
   INACTIVE_TRIGGER_INSTANCE,
 
+  /** Sync */
+  PRE_CREATE_PIPE,
+  OPERATE_PIPE,
+
   /** TEMPLATE */
   UPDATE_TEMPLATE,
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index 98fe826e93..24865fddbe 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
+import org.apache.iotdb.mpp.rpc.thrift.TCreatePipeOnDataNodeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteTimeSeriesReq;
@@ -43,6 +44,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
+import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
@@ -244,6 +246,18 @@ public class AsyncDataNodeClientPool {
               (DeleteTimeSeriesRPCHandler)
                   clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
           break;
+        case PRE_CREATE_PIPE:
+          client.createPipeOnDataNode(
+              (TCreatePipeOnDataNodeReq) clientHandler.getRequest(requestId),
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
+        case OPERATE_PIPE:
+          client.operatePipeOnDataNode(
+              (TOperatePipeOnDataNodeReq) clientHandler.getRequest(requestId),
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
         default:
           LOGGER.error(
               "Unexpected DataNode Request Type: {} when sendAsyncRequestToDataNode",
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
index 60c7878fe4..a10ef2306c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
@@ -188,6 +188,8 @@ public class AsyncClientHandler<Q, R> {
       case UPDATE_REGION_ROUTE_MAP:
       case BROADCAST_LATEST_CONFIG_NODE_GROUP:
       case INVALIDATE_MATCHED_SCHEMA_CACHE:
+      case PRE_CREATE_PIPE:
+      case OPERATE_PIPE:
       default:
         return new AsyncTSStatusRPCHandler(
             requestType,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index 46b72279bf..62d1bab979 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -63,6 +63,9 @@ import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTimeP
 import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
 import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
 import org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan;
 import org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
@@ -274,6 +277,15 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest {
         case GetPipeSink:
           req = new GetPipeSinkPlan();
           break;
+        case PreCreatePipe:
+          req = new PreCreatePipePlan();
+          break;
+        case SetPipeStatus:
+          req = new SetPipeStatusPlan();
+          break;
+        case ShowPipe:
+          req = new ShowPipePlan();
+          break;
         case GetRouting:
           req = new GetRoutingPlan();
           break;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index 6f467b5c01..ca96142d45 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -81,6 +81,9 @@ public enum ConfigPhysicalPlanType {
   CreatePipeSink,
   DropPipeSink,
   GetPipeSink,
+  PreCreatePipe,
+  SetPipeStatus,
+  ShowPipe,
   AddTriggerInTable,
   DeleteTriggerInTable,
   GetTriggerTable,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java
index 688578ed7e..9cfcecfc65 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 public class GetPipeSinkPlan extends ConfigPhysicalPlan {
+  /** empty pipeSinkName means get all PIPESINK */
   private String pipeSinkName;
 
   public GetPipeSinkPlan() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/PreCreatePipePlan.java
similarity index 70%
copy from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/PreCreatePipePlan.java
index 688578ed7e..016428a083 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/PreCreatePipePlan.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.confignode.consensus.request.write.sync;
 
-import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
 
@@ -26,30 +26,31 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-public class GetPipeSinkPlan extends ConfigPhysicalPlan {
-  private String pipeSinkName;
+public class PreCreatePipePlan extends ConfigPhysicalPlan {
 
-  public GetPipeSinkPlan() {
-    super(ConfigPhysicalPlanType.GetPipeSink);
+  private PipeInfo pipeInfo;
+
+  public PreCreatePipePlan() {
+    super(ConfigPhysicalPlanType.PreCreatePipe);
   }
 
-  public GetPipeSinkPlan(String pipeSinkName) {
+  public PreCreatePipePlan(PipeInfo pipeInfo) {
     this();
-    this.pipeSinkName = pipeSinkName;
+    this.pipeInfo = pipeInfo;
   }
 
-  public String getPipeSinkName() {
-    return pipeSinkName;
+  public PipeInfo getPipeInfo() {
+    return pipeInfo;
   }
 
   @Override
   protected void serializeImpl(DataOutputStream stream) throws IOException {
-    stream.writeInt(ConfigPhysicalPlanType.GetPipeSink.ordinal());
-    BasicStructureSerDeUtil.write(pipeSinkName, stream);
+    stream.writeInt(ConfigPhysicalPlanType.PreCreatePipe.ordinal());
+    pipeInfo.serialize(stream);
   }
 
   @Override
   protected void deserializeImpl(ByteBuffer buffer) throws IOException {
-    pipeSinkName = BasicStructureSerDeUtil.readString(buffer);
+    pipeInfo = PipeInfo.deserializePipeInfo(buffer);
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlan.java
similarity index 56%
copy from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlan.java
index 688578ed7e..13b0b6dfb1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlan.java
@@ -18,38 +18,55 @@
  */
 package org.apache.iotdb.confignode.consensus.request.write.sync;
 
-import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
+import org.apache.iotdb.commons.sync.pipe.PipeStatus;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-public class GetPipeSinkPlan extends ConfigPhysicalPlan {
-  private String pipeSinkName;
+public class SetPipeStatusPlan extends ConfigPhysicalPlan {
+  private String pipeName;
+  private PipeStatus pipeStatus;
 
-  public GetPipeSinkPlan() {
-    super(ConfigPhysicalPlanType.GetPipeSink);
+  public SetPipeStatusPlan() {
+    super(ConfigPhysicalPlanType.SetPipeStatus);
   }
 
-  public GetPipeSinkPlan(String pipeSinkName) {
+  public SetPipeStatusPlan(String pipeName, PipeStatus pipeStatus) {
     this();
-    this.pipeSinkName = pipeSinkName;
+    this.pipeName = pipeName;
+    this.pipeStatus = pipeStatus;
   }
 
-  public String getPipeSinkName() {
-    return pipeSinkName;
+  public String getPipeName() {
+    return pipeName;
+  }
+
+  public void setPipeName(String pipeName) {
+    this.pipeName = pipeName;
+  }
+
+  public PipeStatus getPipeStatus() {
+    return pipeStatus;
+  }
+
+  public void setPipeStatus(PipeStatus pipeStatus) {
+    this.pipeStatus = pipeStatus;
   }
 
   @Override
   protected void serializeImpl(DataOutputStream stream) throws IOException {
-    stream.writeInt(ConfigPhysicalPlanType.GetPipeSink.ordinal());
-    BasicStructureSerDeUtil.write(pipeSinkName, stream);
+    stream.writeInt(ConfigPhysicalPlanType.SetPipeStatus.ordinal());
+    ReadWriteIOUtils.write(pipeName, stream);
+    ReadWriteIOUtils.write((byte) pipeStatus.ordinal(), stream);
   }
 
   @Override
   protected void deserializeImpl(ByteBuffer buffer) throws IOException {
-    pipeSinkName = BasicStructureSerDeUtil.readString(buffer);
+    pipeName = ReadWriteIOUtils.readString(buffer);
+    pipeStatus = PipeStatus.values()[ReadWriteIOUtils.readByte(buffer)];
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/ShowPipePlan.java
similarity index 73%
copy from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/ShowPipePlan.java
index 688578ed7e..36c6fc3083 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/ShowPipePlan.java
@@ -26,30 +26,31 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-public class GetPipeSinkPlan extends ConfigPhysicalPlan {
-  private String pipeSinkName;
+public class ShowPipePlan extends ConfigPhysicalPlan {
+  /** empty pipeName means show all PIPE */
+  private String pipeName;
 
-  public GetPipeSinkPlan() {
-    super(ConfigPhysicalPlanType.GetPipeSink);
+  public ShowPipePlan() {
+    super(ConfigPhysicalPlanType.ShowPipe);
   }
 
-  public GetPipeSinkPlan(String pipeSinkName) {
+  public ShowPipePlan(String pipeName) {
     this();
-    this.pipeSinkName = pipeSinkName;
+    this.pipeName = pipeName;
   }
 
-  public String getPipeSinkName() {
-    return pipeSinkName;
+  public String getPipeName() {
+    return pipeName;
   }
 
   @Override
   protected void serializeImpl(DataOutputStream stream) throws IOException {
-    stream.writeInt(ConfigPhysicalPlanType.GetPipeSink.ordinal());
-    BasicStructureSerDeUtil.write(pipeSinkName, stream);
+    stream.writeInt(ConfigPhysicalPlanType.ShowPipe.ordinal());
+    BasicStructureSerDeUtil.write(pipeName, stream);
   }
 
   @Override
   protected void deserializeImpl(ByteBuffer buffer) throws IOException {
-    pipeSinkName = BasicStructureSerDeUtil.readString(buffer);
+    pipeName = BasicStructureSerDeUtil.readString(buffer);
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/PipeResp.java
similarity index 56%
copy from node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/PipeResp.java
index d2bfdf743b..11a5c618d0 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/PipeResp.java
@@ -16,13 +16,32 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.commons.sync.pipe;
-
-public enum SyncOperation {
-  CREATE_PIPESINK,
-  DROP_PIPESINK,
-  CREATE_PIPE,
-  START_PIPE,
-  STOP_PIPE,
-  DROP_PIPE
+package org.apache.iotdb.confignode.consensus.response;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.consensus.common.DataSet;
+
+import java.util.List;
+
+public class PipeResp implements DataSet {
+
+  TSStatus status;
+  List<PipeInfo> pipeInfoList;
+
+  public TSStatus getStatus() {
+    return status;
+  }
+
+  public void setStatus(TSStatus status) {
+    this.status = status;
+  }
+
+  public List<PipeInfo> getPipeInfoList() {
+    return pipeInfoList;
+  }
+
+  public void setPipeInfoList(List<PipeInfo> pipeInfoList) {
+    this.pipeInfoList = pipeInfoList;
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 2e4729cbc0..c1bde6dbc2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -102,6 +102,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
@@ -110,6 +111,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowStorageGroupResp;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 import org.apache.iotdb.consensus.common.DataSet;
@@ -609,6 +612,11 @@ public class ConfigManager implements IManager {
     return triggerManager;
   }
 
+  @Override
+  public SyncManager getSyncManager() {
+    return syncManager;
+  }
+
   @Override
   public TSStatus operatePermission(AuthorPlan authorPlan) {
     TSStatus status = confirmLeader();
@@ -1010,6 +1018,57 @@ public class ConfigManager implements IManager {
     }
   }
 
+  @Override
+  public TSStatus createPipe(TPipeInfo pipeInfo) {
+    TSStatus status = confirmLeader();
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return procedureManager.createPipe(pipeInfo);
+    } else {
+      return status;
+    }
+  }
+
+  @Override
+  public TSStatus startPipe(String pipeName) {
+    TSStatus status = confirmLeader();
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return procedureManager.startPipe(pipeName);
+    } else {
+      return status;
+    }
+  }
+
+  @Override
+  public TSStatus stopPipe(String pipeName) {
+    TSStatus status = confirmLeader();
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return procedureManager.stopPipe(pipeName);
+    } else {
+      return status;
+    }
+  }
+
+  @Override
+  public TSStatus dropPipe(String pipeName) {
+    TSStatus status = confirmLeader();
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return procedureManager.dropPipe(pipeName);
+    } else {
+      return status;
+    }
+  }
+
+  @Override
+  public TShowPipeResp showPipe(TShowPipeReq req) {
+    TSStatus status = confirmLeader();
+    TShowPipeResp resp = new TShowPipeResp();
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return syncManager.showPipe(req.getPipeName());
+    } else {
+      return resp.setStatus(status);
+    }
+  }
+
   @Override
   @TestOnly
   public TGetRoutingResp getRouting(GetRoutingPlan plan) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index d028f3425c..514cc03ba7 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
@@ -70,6 +71,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowStorageGroupResp;
 import org.apache.iotdb.consensus.common.DataSet;
 
@@ -130,6 +133,13 @@ public interface IManager {
    */
   TriggerManager getTriggerManager();
 
+  /**
+   * Get SyncManager
+   *
+   * @return SyncManager instance
+   */
+  SyncManager getSyncManager();
+
   /**
    * Get ProcedureManager
    *
@@ -406,6 +416,46 @@ public interface IManager {
    */
   TGetPipeSinkResp getPipeSink(TGetPipeSinkReq req);
 
+  /**
+   * Create Pipe
+   *
+   * @param pipeInfo Info about Pipe
+   * @return TSStatus
+   */
+  TSStatus createPipe(TPipeInfo pipeInfo);
+
+  /**
+   * Start Pipe
+   *
+   * @param pipeName name of Pipe
+   * @return TSStatus
+   */
+  TSStatus startPipe(String pipeName);
+
+  /**
+   * Stop Pipe
+   *
+   * @param pipeName name of Pipe
+   * @return TSStatus
+   */
+  TSStatus stopPipe(String pipeName);
+
+  /**
+   * Drop Pipe
+   *
+   * @param pipeName name of Pipe
+   * @return TSStatus
+   */
+  TSStatus dropPipe(String pipeName);
+
+  /**
+   * Get Pipe by name. If pipeName is empty, get all Pipe.
+   *
+   * @param req specify the pipeName
+   * @return TShowPipeResp contains the TShowPipeInfo
+   */
+  TShowPipeResp showPipe(TShowPipeReq req);
+
   TGetRoutingResp getRouting(GetRoutingPlan plan);
 
   TGetTimeSlotListResp getTimeSlotList(GetTimeSlotListPlan plan);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index e57b16e56f..ef583c341e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.manager;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.sync.PipeException;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
 import org.apache.iotdb.commons.utils.StatusUtils;
@@ -44,6 +45,10 @@ import org.apache.iotdb.confignode.procedure.impl.statemachine.CreateRegionGroup
 import org.apache.iotdb.confignode.procedure.impl.statemachine.DeleteStorageGroupProcedure;
 import org.apache.iotdb.confignode.procedure.impl.statemachine.DeleteTimeSeriesProcedure;
 import org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure;
+import org.apache.iotdb.confignode.procedure.impl.sync.CreatePipeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.sync.DropPipeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.sync.StartPipeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.sync.StopPipeProcedure;
 import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
 import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
 import org.apache.iotdb.confignode.procedure.store.ConfigProcedureStore;
@@ -52,6 +57,7 @@ import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
 import org.apache.iotdb.confignode.procedure.store.ProcedureStore;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -266,6 +272,74 @@ public class ProcedureManager {
     }
   }
 
+  public TSStatus createPipe(TPipeInfo req) {
+    try {
+      long procedureId = executor.submitProcedure(new CreatePipeProcedure(req));
+      List<TSStatus> statusList = new ArrayList<>();
+      boolean isSucceed =
+          waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
+      if (isSucceed) {
+        return RpcUtils.SUCCESS_STATUS;
+      } else {
+        return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
+            .setMessage(statusList.get(0).getMessage());
+      }
+    } catch (PipeException e) {
+      return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
+    }
+  }
+
+  public TSStatus startPipe(String pipeName) {
+    try {
+      long procedureId = executor.submitProcedure(new StartPipeProcedure(pipeName));
+      List<TSStatus> statusList = new ArrayList<>();
+      boolean isSucceed =
+          waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
+      if (isSucceed) {
+        return RpcUtils.SUCCESS_STATUS;
+      } else {
+        return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
+            .setMessage(statusList.get(0).getMessage());
+      }
+    } catch (PipeException e) {
+      return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
+    }
+  }
+
+  public TSStatus stopPipe(String pipeName) {
+    try {
+      long procedureId = executor.submitProcedure(new StopPipeProcedure(pipeName));
+      List<TSStatus> statusList = new ArrayList<>();
+      boolean isSucceed =
+          waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
+      if (isSucceed) {
+        return RpcUtils.SUCCESS_STATUS;
+      } else {
+        return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
+            .setMessage(statusList.get(0).getMessage());
+      }
+    } catch (PipeException e) {
+      return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
+    }
+  }
+
+  public TSStatus dropPipe(String pipeName) {
+    try {
+      long procedureId = executor.submitProcedure(new DropPipeProcedure(pipeName));
+      List<TSStatus> statusList = new ArrayList<>();
+      boolean isSucceed =
+          waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
+      if (isSucceed) {
+        return RpcUtils.SUCCESS_STATUS;
+      } else {
+        return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
+            .setMessage(statusList.get(0).getMessage());
+      }
+    } catch (PipeException e) {
+      return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
+    }
+  }
+
   /**
    * Waiting until the specific procedures finished
    *
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java
index 10af4dac17..c8af8831f8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java
@@ -18,20 +18,40 @@
  */
 package org.apache.iotdb.confignode.manager;
 
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.sync.PipeException;
 import org.apache.iotdb.commons.exception.sync.PipeSinkException;
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.PipeStatus;
+import org.apache.iotdb.commons.sync.pipe.SyncOperation;
 import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
 import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
+import org.apache.iotdb.confignode.consensus.response.PipeResp;
 import org.apache.iotdb.confignode.consensus.response.PipeSinkResp;
+import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
+import org.apache.iotdb.mpp.rpc.thrift.TCreatePipeOnDataNodeReq;
+import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 public class SyncManager {
@@ -88,10 +108,84 @@ public class SyncManager {
   // endregion
 
   // ======================================================
-  // region Implement of PipeS
+  // region Implement of Pipe
   // ======================================================
 
-  // TODO....
+  public void checkAddPipe(PipeInfo pipeInfo) throws PipeException {
+    clusterSyncInfo.checkAddPipe(pipeInfo);
+  }
+
+  public TSStatus preCreatePipe(PipeInfo pipeInfo) {
+    pipeInfo.setStatus(PipeStatus.PREPARE_CREATE);
+    return getConsensusManager().write(new PreCreatePipePlan(pipeInfo)).getStatus();
+  }
+
+  public TSStatus setPipeStatus(String pipeName, PipeStatus pipeStatus) {
+    return getConsensusManager().write(new SetPipeStatusPlan(pipeName, pipeStatus)).getStatus();
+  }
+
+  public TShowPipeResp showPipe(String pipeName) {
+    ShowPipePlan showPipePlan = new ShowPipePlan(pipeName);
+    PipeResp pipeResp = (PipeResp) getConsensusManager().read(showPipePlan).getDataset();
+    TShowPipeResp resp = new TShowPipeResp();
+    resp.setStatus(pipeResp.getStatus());
+    if (pipeResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      resp.setPipeInfoList(
+          pipeResp.getPipeInfoList().stream()
+              .map(PipeInfo::getTShowPipeInfo)
+              .collect(Collectors.toList()));
+    }
+    return resp;
+  }
+
+  public PipeInfo getPipeInfo(String pipeName) throws PipeException {
+    return clusterSyncInfo.getPipeInfo(pipeName);
+  }
+
+  /**
+   * Broadcast DataNodes to operate PIPE operation.
+   *
+   * @param pipeName name of PIPE
+   * @param operation only support {@link SyncOperation#START_PIPE}, {@link SyncOperation#STOP_PIPE}
+   *     and {@link SyncOperation#DROP_PIPE}
+   * @return list of TSStatus
+   */
+  public List<TSStatus> operatePipeOnDataNodes(String pipeName, SyncOperation operation) {
+    NodeManager nodeManager = configManager.getNodeManager();
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        nodeManager.getRegisteredDataNodeLocations();
+    final List<TSStatus> dataNodeResponseStatus =
+        Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
+    final TOperatePipeOnDataNodeReq request =
+        new TOperatePipeOnDataNodeReq(pipeName, (byte) operation.ordinal());
+
+    AsyncClientHandler<TOperatePipeOnDataNodeReq, TSStatus> clientHandler =
+        new AsyncClientHandler<>(DataNodeRequestType.OPERATE_PIPE, request, dataNodeLocationMap);
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+
+    return dataNodeResponseStatus;
+  }
+
+  /**
+   * Broadcast DataNodes to pre create PIPE
+   *
+   * @param pipeInfo pipeInfo
+   * @return list of TSStatus
+   */
+  public List<TSStatus> preCreatePipeOnDataNodes(PipeInfo pipeInfo) {
+    NodeManager nodeManager = configManager.getNodeManager();
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        nodeManager.getRegisteredDataNodeLocations();
+    final List<TSStatus> dataNodeResponseStatus =
+        Collections.synchronizedList(new ArrayList<>(dataNodeLocationMap.size()));
+    final TCreatePipeOnDataNodeReq request =
+        new TCreatePipeOnDataNodeReq(pipeInfo.serializeToByteBuffer());
+
+    AsyncClientHandler<TCreatePipeOnDataNodeReq, TSStatus> clientHandler =
+        new AsyncClientHandler<>(DataNodeRequestType.PRE_CREATE_PIPE, request, dataNodeLocationMap);
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    return dataNodeResponseStatus;
+  }
 
   // endregion
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 64b488cdca..983aafcf79 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -62,6 +62,9 @@ import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTimeP
 import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
 import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
 import org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan;
 import org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
@@ -173,6 +176,8 @@ public class ConfigPlanExecutor {
         return clusterSchemaInfo.getAllTemplateSetInfo();
       case GetPipeSink:
         return syncInfo.getPipeSink((GetPipeSinkPlan) req);
+      case ShowPipe:
+        return syncInfo.showPipe((ShowPipePlan) req);
       case GetTriggerTable:
         return triggerInfo.getTriggerTable();
       case GetRouting:
@@ -268,6 +273,10 @@ public class ConfigPlanExecutor {
         return syncInfo.addPipeSink((CreatePipeSinkPlan) physicalPlan);
       case DropPipeSink:
         return syncInfo.dropPipeSink((DropPipeSinkPlan) physicalPlan);
+      case PreCreatePipe:
+        return syncInfo.preCreatePipe((PreCreatePipePlan) physicalPlan);
+      case SetPipeStatus:
+        return syncInfo.operatePipe((SetPipeStatusPlan) physicalPlan);
       default:
         throw new UnknownPhysicalPlanTypeException(physicalPlan.getType());
     }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java
index c42dd0d75f..1cda416fe8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java
@@ -19,12 +19,19 @@
 package org.apache.iotdb.confignode.persistence.sync;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.sync.PipeException;
+import org.apache.iotdb.commons.exception.sync.PipeNotExistException;
 import org.apache.iotdb.commons.exception.sync.PipeSinkException;
 import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
 import org.apache.iotdb.commons.sync.metadata.SyncMetadata;
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
 import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
+import org.apache.iotdb.confignode.consensus.response.PipeResp;
 import org.apache.iotdb.confignode.consensus.response.PipeSinkResp;
 import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -37,7 +44,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 
 public class ClusterSyncInfo implements SnapshotProcessor {
 
@@ -65,9 +74,10 @@ public class ClusterSyncInfo implements SnapshotProcessor {
   public TSStatus addPipeSink(CreatePipeSinkPlan plan) {
     TSStatus status = new TSStatus();
     try {
-      syncMetadata.addPipeSink(SyncPipeUtil.parsePipeInfoAsPipe(plan.getPipeSinkInfo()));
+      syncMetadata.addPipeSink(SyncPipeUtil.parseTPipeSinkInfoAsPipeSink(plan.getPipeSinkInfo()));
       status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } catch (PipeSinkException e) {
+      LOGGER.error("failed to execute CreatePipeSinkPlan {} on ClusterSyncInfo", plan, e);
       status.setCode(TSStatusCode.PIPESINK_ERROR.getStatusCode());
       LOGGER.error(e.getMessage());
     }
@@ -103,6 +113,73 @@ public class ClusterSyncInfo implements SnapshotProcessor {
 
   // endregion
 
+  // ======================================================
+  // region Implement of Pipe
+  // ======================================================
+
+  /**
+   * Check Pipe before create operation
+   *
+   * @param pipeInfo pipe info
+   * @throws PipeException if there is Pipe with the same name exists or PipeSink does not exist
+   */
+  public void checkAddPipe(PipeInfo pipeInfo) throws PipeException {
+    syncMetadata.checkAddPipe(pipeInfo);
+  }
+
+  public TSStatus preCreatePipe(PreCreatePipePlan physicalPlan) {
+    syncMetadata.addPipe(physicalPlan.getPipeInfo());
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  public TSStatus operatePipe(SetPipeStatusPlan physicalPlan) {
+    TSStatus status = new TSStatus();
+    try {
+      syncMetadata.setPipeStatus(physicalPlan.getPipeName(), physicalPlan.getPipeStatus());
+      status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    } catch (PipeException e) {
+      LOGGER.error("failed to execute OperatePipePlan {} on ClusterSyncInfo", physicalPlan, e);
+      status.setCode(TSStatusCode.PIPE_ERROR.getStatusCode());
+      LOGGER.error(e.getMessage());
+    }
+    return status;
+  }
+
+  public PipeResp showPipe(ShowPipePlan plan) {
+    PipeResp resp = new PipeResp();
+    // TODO(sync): optimize logic later
+    List<PipeInfo> allPipeInfos = syncMetadata.getAllPipeInfos();
+    if (StringUtils.isEmpty(plan.getPipeName())) {
+      resp.setPipeInfoList(allPipeInfos);
+    } else {
+      List<PipeInfo> pipeInfoList = new ArrayList<>();
+      for (PipeInfo pipeInfo : allPipeInfos) {
+        if (plan.getPipeName().equals(pipeInfo.getPipeName())) {
+          pipeInfoList.add(pipeInfo);
+        }
+      }
+      resp.setPipeInfoList(pipeInfoList);
+    }
+    resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+    return resp;
+  }
+
+  /**
+   * Get PipeInfo by pipeName. Check before start, stop and drop operation
+   *
+   * @param pipeName pipe name
+   * @throws PipeNotExistException if there is Pipe does not exist
+   */
+  public PipeInfo getPipeInfo(String pipeName) throws PipeNotExistException {
+    PipeInfo pipeInfo = syncMetadata.getRunningPipeInfo();
+    if (pipeInfo == null || !pipeInfo.getPipeName().equals(pipeName)) {
+      throw new PipeNotExistException(pipeName);
+    }
+    return pipeInfo;
+  }
+
+  // endregion
+
   // ======================================================
   // region Implement of Snapshot
   // ======================================================
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 6862be0a4d..44d6549f96 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -83,6 +83,9 @@ public class ConfigNodeProcedureEnv {
   /** add or remove node lock */
   private final LockQueue nodeLock = new LockQueue();
 
+  /** pipe operation lock */
+  private final LockQueue pipeLock = new LockQueue();
+
   private final ReentrantLock schedulerLock = new ReentrantLock();
 
   private final ConfigManager configManager;
@@ -551,6 +554,10 @@ public class ConfigNodeProcedureEnv {
     return nodeLock;
   }
 
+  public LockQueue getPipeLock() {
+    return pipeLock;
+  }
+
   public ProcedureScheduler getScheduler() {
     return scheduler;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/AbstractOperatePipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/AbstractOperatePipeProcedure.java
new file mode 100644
index 0000000000..81b576419d
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/AbstractOperatePipeProcedure.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.procedure.impl.sync;
+
+import org.apache.iotdb.commons.exception.sync.PipeException;
+import org.apache.iotdb.commons.sync.pipe.SyncOperation;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
+import org.apache.iotdb.confignode.procedure.impl.statemachine.StateMachineProcedure;
+import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
+import org.apache.iotdb.confignode.procedure.state.sync.OperatePipeState;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** This procedure manage three kinds of PIPE operations: CREATE, START and STOP */
+abstract class AbstractOperatePipeProcedure
+    extends StateMachineProcedure<ConfigNodeProcedureEnv, OperatePipeState> {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractOperatePipeProcedure.class);
+
+  private static final int retryThreshold = 3;
+
+  /**
+   * Execute at state OPERATE_CHECK
+   *
+   * @return true if procedure can finish directly
+   */
+  abstract boolean executeCheckCanSkip(ConfigNodeProcedureEnv env) throws PipeException;
+
+  /** Execute at state PRE_OPERATE_PIPE_CONFIGNODE */
+  abstract void executePreOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException;
+
+  /** Execute at state OPERATE_PIPE_DATANODE */
+  abstract void executeOperatePipeOnDataNode(ConfigNodeProcedureEnv env) throws PipeException;
+
+  /** Execute at state OPERATE_PIPE_CONFIGNODE */
+  abstract void executeOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException;
+
+  abstract SyncOperation getOperation();
+
+  @Override
+  protected Flow executeFromState(ConfigNodeProcedureEnv env, OperatePipeState state)
+      throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+    try {
+      switch (state) {
+        case OPERATE_CHECK:
+          if (executeCheckCanSkip(env)) {
+            return Flow.NO_MORE_STATE;
+          }
+          setNextState(OperatePipeState.PRE_OPERATE_PIPE_CONFIGNODE);
+          break;
+        case PRE_OPERATE_PIPE_CONFIGNODE:
+          executePreOperatePipeOnConfigNode(env);
+          setNextState(OperatePipeState.OPERATE_PIPE_DATANODE);
+          break;
+        case OPERATE_PIPE_DATANODE:
+          executeOperatePipeOnDataNode(env);
+          setNextState(OperatePipeState.OPERATE_PIPE_CONFIGNODE);
+          break;
+        case OPERATE_PIPE_CONFIGNODE:
+          executeOperatePipeOnConfigNode(env);
+          return Flow.NO_MORE_STATE;
+      }
+    } catch (PipeException e) {
+      if (isRollbackSupported(state)) {
+        LOGGER.error("Fail in OperatePipeProcedure", e);
+        setFailure(new ProcedureException(e.getMessage()));
+      } else {
+        LOGGER.error("Retrievable error trying to {} at state [{}]", getOperation(), state, e);
+        if (getCycles() > retryThreshold) {
+          setFailure(
+              new ProcedureException(
+                  String.format("Fail to %s because %s", getOperation().name(), e.getMessage())));
+        }
+      }
+    }
+    return Flow.HAS_MORE_STATE;
+  }
+
+  @Override
+  protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv env) {
+    env.getSchedulerLock().lock();
+    try {
+      if (env.getPipeLock().tryLock(this)) {
+        LOGGER.info("procedureId {} acquire lock.", getProcId());
+        return ProcedureLockState.LOCK_ACQUIRED;
+      }
+      env.getPipeLock().waitProcedure(this);
+      LOGGER.info("procedureId {} wait for lock.", getProcId());
+      return ProcedureLockState.LOCK_EVENT_WAIT;
+    } finally {
+      env.getSchedulerLock().unlock();
+    }
+  }
+
+  @Override
+  protected void releaseLock(ConfigNodeProcedureEnv env) {
+    env.getSchedulerLock().lock();
+    try {
+      LOGGER.info("procedureId {} release lock.", getProcId());
+      if (env.getPipeLock().releaseLock(this)) {
+        env.getPipeLock().wakeWaitingProcedures(env.getScheduler());
+      }
+    } finally {
+      env.getSchedulerLock().unlock();
+    }
+  }
+
+  @Override
+  protected OperatePipeState getState(int stateId) {
+    return OperatePipeState.values()[stateId];
+  }
+
+  @Override
+  protected int getStateId(OperatePipeState state) {
+    return state.ordinal();
+  }
+
+  @Override
+  protected OperatePipeState getInitialState() {
+    return OperatePipeState.OPERATE_CHECK;
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/CreatePipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/CreatePipeProcedure.java
new file mode 100644
index 0000000000..25862974ac
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/CreatePipeProcedure.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.procedure.impl.sync;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.sync.PipeException;
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.PipeStatus;
+import org.apache.iotdb.commons.sync.pipe.SyncOperation;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.state.sync.OperatePipeState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
+import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class CreatePipeProcedure extends AbstractOperatePipeProcedure {
+  private static final Logger LOGGER = LoggerFactory.getLogger(CreatePipeProcedure.class);
+
+  private PipeInfo pipeInfo;
+
+  public CreatePipeProcedure() {
+    super();
+  }
+
+  public CreatePipeProcedure(TPipeInfo pipeInfo) throws PipeException {
+    super();
+    this.pipeInfo = SyncPipeUtil.parseTPipeInfoAsPipeInfo(pipeInfo, System.currentTimeMillis());
+  }
+
+  @Override
+  boolean executeCheckCanSkip(ConfigNodeProcedureEnv env) throws PipeException {
+    LOGGER.info("Start to create PIPE [{}]", pipeInfo.getPipeName());
+    env.getConfigManager().getSyncManager().checkAddPipe(pipeInfo);
+    return false;
+  }
+
+  @Override
+  void executePreOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
+    LOGGER.info("Start to pre-create PIPE [{}] on Config Nodes", pipeInfo.getPipeName());
+    TSStatus status = env.getConfigManager().getSyncManager().preCreatePipe(pipeInfo);
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(status.getMessage());
+    }
+  }
+
+  @Override
+  void executeOperatePipeOnDataNode(ConfigNodeProcedureEnv env) throws PipeException {
+    LOGGER.info("Start to broadcast create PIPE [{}] on Data Nodes", pipeInfo.getPipeName());
+    TSStatus status =
+        RpcUtils.squashResponseStatusList(
+            env.getConfigManager().getSyncManager().preCreatePipeOnDataNodes(pipeInfo));
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(
+          String.format(
+              "Fail to create PIPE [%s] on Data Nodes because %s",
+              pipeInfo.getPipeName(), status.getMessage()));
+    }
+  }
+
+  @Override
+  void executeOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
+    LOGGER.info("Start to create PIPE [{}] on Config Nodes", pipeInfo.getPipeName());
+    TSStatus status =
+        env.getConfigManager()
+            .getSyncManager()
+            .setPipeStatus(pipeInfo.getPipeName(), PipeStatus.STOP);
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(status.getMessage());
+    }
+  }
+
+  @Override
+  SyncOperation getOperation() {
+    return SyncOperation.CREATE_PIPE;
+  }
+
+  @Override
+  protected boolean isRollbackSupported(OperatePipeState state) {
+    switch (state) {
+      case OPERATE_CHECK:
+      case OPERATE_PIPE_DATANODE:
+        return true;
+    }
+    return false;
+  }
+
+  @Override
+  protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeState state)
+      throws IOException, InterruptedException, ProcedureException {
+    LOGGER.error("Roll back CreatePipeProcedure at STATE [{}]", state);
+    // TODO(sync): roll back logic;
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    stream.writeInt(ProcedureFactory.ProcedureType.CREATE_PIPE_PROCEDURE.ordinal());
+    super.serialize(stream);
+    pipeInfo.serialize(stream);
+  }
+
+  @Override
+  public void deserialize(ByteBuffer byteBuffer) {
+    super.deserialize(byteBuffer);
+    pipeInfo = PipeInfo.deserializePipeInfo(byteBuffer);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    CreatePipeProcedure that = (CreatePipeProcedure) o;
+    return Objects.equals(pipeInfo, that.pipeInfo);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(pipeInfo);
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/DropPipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/DropPipeProcedure.java
new file mode 100644
index 0000000000..54a1dfd37f
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/DropPipeProcedure.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.procedure.impl.sync;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.sync.PipeException;
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.PipeStatus;
+import org.apache.iotdb.commons.sync.pipe.SyncOperation;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.state.sync.OperatePipeState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+// TODO(sync): drop logic need to be updated
+public class DropPipeProcedure extends AbstractOperatePipeProcedure {
+  private static final Logger LOGGER = LoggerFactory.getLogger(DropPipeProcedure.class);
+
+  private String pipeName;
+
+  public DropPipeProcedure() {
+    super();
+  }
+
+  public DropPipeProcedure(String pipeName) throws PipeException {
+    super();
+    this.pipeName = pipeName;
+  }
+
+  @Override
+  boolean executeCheckCanSkip(ConfigNodeProcedureEnv env) throws PipeException {
+    LOGGER.info("Start to drop PIPE [{}]", pipeName);
+    PipeInfo pipeInfo = env.getConfigManager().getSyncManager().getPipeInfo(pipeName);
+    return pipeInfo.getStatus().equals(PipeStatus.DROP);
+  }
+
+  @Override
+  void executePreOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
+    LOGGER.info("Start to pre-drop PIPE [{}] on Config Nodes", pipeName);
+    TSStatus status =
+        env.getConfigManager().getSyncManager().setPipeStatus(pipeName, PipeStatus.PREPARE_DROP);
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(status.getMessage());
+    }
+  }
+
+  @Override
+  void executeOperatePipeOnDataNode(ConfigNodeProcedureEnv env) throws PipeException {
+    LOGGER.info("Start to broadcast drop PIPE [{}] on Data Nodes", pipeName);
+    TSStatus status =
+        RpcUtils.squashResponseStatusList(
+            env.getConfigManager()
+                .getSyncManager()
+                .operatePipeOnDataNodes(pipeName, SyncOperation.DROP_PIPE));
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(
+          String.format(
+              "Fail to drop PIPE [%s] on Data Nodes because %s", pipeName, status.getMessage()));
+    }
+  }
+
+  @Override
+  void executeOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
+    LOGGER.info("Start to drop PIPE [{}] on Config Nodes", pipeName);
+    TSStatus status =
+        env.getConfigManager().getSyncManager().setPipeStatus(pipeName, PipeStatus.DROP);
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(status.getMessage());
+    }
+  }
+
+  @Override
+  SyncOperation getOperation() {
+    return SyncOperation.DROP_PIPE;
+  }
+
+  @Override
+  protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeState state)
+      throws IOException, InterruptedException, ProcedureException {}
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    stream.writeInt(ProcedureFactory.ProcedureType.DROP_PIPE_PROCEDURE.ordinal());
+    super.serialize(stream);
+    ReadWriteIOUtils.write(pipeName, stream);
+  }
+
+  @Override
+  public void deserialize(ByteBuffer byteBuffer) {
+    super.deserialize(byteBuffer);
+    pipeName = ReadWriteIOUtils.readString(byteBuffer);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    DropPipeProcedure that = (DropPipeProcedure) o;
+    return Objects.equals(pipeName, that.pipeName);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(pipeName);
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java
new file mode 100644
index 0000000000..7061443503
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.procedure.impl.sync;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.sync.PipeException;
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.PipeStatus;
+import org.apache.iotdb.commons.sync.pipe.SyncOperation;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.state.sync.OperatePipeState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class StartPipeProcedure extends AbstractOperatePipeProcedure {
+  private static final Logger LOGGER = LoggerFactory.getLogger(StartPipeProcedure.class);
+
+  private String pipeName;
+
+  public StartPipeProcedure() {
+    super();
+  }
+
+  public StartPipeProcedure(String pipeName) throws PipeException {
+    super();
+    this.pipeName = pipeName;
+  }
+
+  @Override
+  boolean executeCheckCanSkip(ConfigNodeProcedureEnv env) throws PipeException {
+    LOGGER.info("Start to start PIPE [{}]", pipeName);
+    PipeInfo pipeInfo = env.getConfigManager().getSyncManager().getPipeInfo(pipeName);
+    return pipeInfo.getStatus().equals(PipeStatus.RUNNING);
+  }
+
+  @Override
+  void executePreOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
+    LOGGER.info("Start to pre-start PIPE [{}] on Config Nodes", pipeName);
+    TSStatus status =
+        env.getConfigManager().getSyncManager().setPipeStatus(pipeName, PipeStatus.PREPARE_START);
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(status.getMessage());
+    }
+  }
+
+  @Override
+  void executeOperatePipeOnDataNode(ConfigNodeProcedureEnv env) throws PipeException {
+    LOGGER.info("Start to broadcast start PIPE [{}] on Data Nodes", pipeName);
+    TSStatus status =
+        RpcUtils.squashResponseStatusList(
+            env.getConfigManager()
+                .getSyncManager()
+                .operatePipeOnDataNodes(pipeName, SyncOperation.START_PIPE));
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(
+          String.format(
+              "Fail to start PIPE [%s] on Data Nodes because %s", pipeName, status.getMessage()));
+    }
+  }
+
+  @Override
+  void executeOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
+    LOGGER.info("Start to start PIPE [{}] on Config Nodes", pipeName);
+    TSStatus status =
+        env.getConfigManager().getSyncManager().setPipeStatus(pipeName, PipeStatus.RUNNING);
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(status.getMessage());
+    }
+  }
+
+  @Override
+  SyncOperation getOperation() {
+    return SyncOperation.START_PIPE;
+  }
+
+  @Override
+  protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeState state)
+      throws IOException, InterruptedException, ProcedureException {}
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    stream.writeInt(ProcedureFactory.ProcedureType.START_PIPE_PROCEDURE.ordinal());
+    super.serialize(stream);
+    ReadWriteIOUtils.write(pipeName, stream);
+  }
+
+  @Override
+  public void deserialize(ByteBuffer byteBuffer) {
+    super.deserialize(byteBuffer);
+    pipeName = ReadWriteIOUtils.readString(byteBuffer);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    StartPipeProcedure that = (StartPipeProcedure) o;
+    return Objects.equals(pipeName, that.pipeName);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(pipeName);
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StopPipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StopPipeProcedure.java
new file mode 100644
index 0000000000..086811f523
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StopPipeProcedure.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.procedure.impl.sync;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.sync.PipeException;
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.PipeStatus;
+import org.apache.iotdb.commons.sync.pipe.SyncOperation;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.state.sync.OperatePipeState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class StopPipeProcedure extends AbstractOperatePipeProcedure {
+  private static final Logger LOGGER = LoggerFactory.getLogger(StopPipeProcedure.class);
+
+  private String pipeName;
+
+  public StopPipeProcedure() {
+    super();
+  }
+
+  public StopPipeProcedure(String pipeName) throws PipeException {
+    super();
+    this.pipeName = pipeName;
+  }
+
+  @Override
+  boolean executeCheckCanSkip(ConfigNodeProcedureEnv env) throws PipeException {
+    LOGGER.info("Start to stop PIPE [{}]", pipeName);
+    PipeInfo pipeInfo = env.getConfigManager().getSyncManager().getPipeInfo(pipeName);
+    return pipeInfo.getStatus().equals(PipeStatus.STOP);
+  }
+
+  @Override
+  void executePreOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
+    LOGGER.info("Start to pre-stop PIPE [{}] on Config Nodes", pipeName);
+    TSStatus status =
+        env.getConfigManager().getSyncManager().setPipeStatus(pipeName, PipeStatus.PREPARE_STOP);
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(status.getMessage());
+    }
+  }
+
+  @Override
+  void executeOperatePipeOnDataNode(ConfigNodeProcedureEnv env) throws PipeException {
+    LOGGER.info("Start to broadcast stop PIPE [{}] on Data Nodes", pipeName);
+    TSStatus status =
+        RpcUtils.squashResponseStatusList(
+            env.getConfigManager()
+                .getSyncManager()
+                .operatePipeOnDataNodes(pipeName, SyncOperation.STOP_PIPE));
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(
+          String.format(
+              "Fail to stop PIPE [%s] on Data Nodes because %s", pipeName, status.getMessage()));
+    }
+  }
+
+  @Override
+  void executeOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
+    LOGGER.info("Start to stop PIPE [{}] on Config Nodes", pipeName);
+    TSStatus status =
+        env.getConfigManager().getSyncManager().setPipeStatus(pipeName, PipeStatus.STOP);
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(status.getMessage());
+    }
+  }
+
+  @Override
+  SyncOperation getOperation() {
+    return SyncOperation.STOP_PIPE;
+  }
+
+  @Override
+  protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeState state)
+      throws IOException, InterruptedException, ProcedureException {}
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    stream.writeInt(ProcedureFactory.ProcedureType.STOP_PIPE_PROCEDURE.ordinal());
+    super.serialize(stream);
+    ReadWriteIOUtils.write(pipeName, stream);
+  }
+
+  @Override
+  public void deserialize(ByteBuffer byteBuffer) {
+    super.deserialize(byteBuffer);
+    pipeName = ReadWriteIOUtils.readString(byteBuffer);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    StopPipeProcedure that = (StopPipeProcedure) o;
+    return Objects.equals(pipeName, that.pipeName);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(pipeName);
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeStatus.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/sync/OperatePipeState.java
similarity index 81%
copy from node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeStatus.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/sync/OperatePipeState.java
index 0d2a766f37..c8a263ec43 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeStatus.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/sync/OperatePipeState.java
@@ -16,11 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.commons.sync.pipe;
+package org.apache.iotdb.confignode.procedure.state.sync;
 
-public enum PipeStatus {
-  // a new pipe should be stop status
-  RUNNING,
-  STOP,
-  DROP
+public enum OperatePipeState {
+  OPERATE_CHECK,
+  PRE_OPERATE_PIPE_CONFIGNODE,
+  OPERATE_PIPE_DATANODE,
+  OPERATE_PIPE_CONFIGNODE
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index 6951d3cb0e..a28eff1c8d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -29,6 +29,10 @@ import org.apache.iotdb.confignode.procedure.impl.statemachine.CreateRegionGroup
 import org.apache.iotdb.confignode.procedure.impl.statemachine.DeleteStorageGroupProcedure;
 import org.apache.iotdb.confignode.procedure.impl.statemachine.DeleteTimeSeriesProcedure;
 import org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure;
+import org.apache.iotdb.confignode.procedure.impl.sync.CreatePipeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.sync.DropPipeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.sync.StartPipeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.sync.StopPipeProcedure;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,6 +81,18 @@ public class ProcedureFactory implements IProcedureFactory {
       case DROP_TRIGGER_PROCEDURE:
         procedure = new DropTriggerProcedure();
         break;
+      case CREATE_PIPE_PROCEDURE:
+        procedure = new CreatePipeProcedure();
+        break;
+      case START_PIPE_PROCEDURE:
+        procedure = new StartPipeProcedure();
+        break;
+      case STOP_PIPE_PROCEDURE:
+        procedure = new StopPipeProcedure();
+        break;
+      case DROP_PIPE_PROCEDURE:
+        procedure = new DropPipeProcedure();
+        break;
       default:
         LOGGER.error("unknown Procedure type: " + typeNum);
         throw new IOException("unknown Procedure type: " + typeNum);
@@ -104,6 +120,14 @@ public class ProcedureFactory implements IProcedureFactory {
       return ProcedureType.CREATE_TRIGGER_PROCEDURE;
     } else if (procedure instanceof DropTriggerProcedure) {
       return ProcedureType.DROP_TRIGGER_PROCEDURE;
+    } else if (procedure instanceof CreatePipeProcedure) {
+      return ProcedureType.CREATE_PIPE_PROCEDURE;
+    } else if (procedure instanceof StartPipeProcedure) {
+      return ProcedureType.START_PIPE_PROCEDURE;
+    } else if (procedure instanceof StopPipeProcedure) {
+      return ProcedureType.STOP_PIPE_PROCEDURE;
+    } else if (procedure instanceof DropPipeProcedure) {
+      return ProcedureType.DROP_PIPE_PROCEDURE;
     }
     return null;
   }
@@ -117,7 +141,11 @@ public class ProcedureFactory implements IProcedureFactory {
     CREATE_REGION_GROUPS,
     DELETE_TIMESERIES_PROCEDURE,
     CREATE_TRIGGER_PROCEDURE,
-    DROP_TRIGGER_PROCEDURE
+    DROP_TRIGGER_PROCEDURE,
+    CREATE_PIPE_PROCEDURE,
+    START_PIPE_PROCEDURE,
+    STOP_PIPE_PROCEDURE,
+    DROP_PIPE_PROCEDURE
   }
 
   private static class ProcedureFactoryHolder {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 5638835c5c..8381d6e2cc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -101,6 +101,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
@@ -116,6 +117,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TSetTimePartitionIntervalReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowStorageGroupResp;
@@ -620,6 +623,31 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
     return configManager.getPipeSink(req);
   }
 
+  @Override
+  public TSStatus createPipe(TPipeInfo req) {
+    return configManager.createPipe(req);
+  }
+
+  @Override
+  public TSStatus startPipe(String pipeName) {
+    return configManager.startPipe(pipeName);
+  }
+
+  @Override
+  public TSStatus stopPipe(String pipeName) {
+    return configManager.stopPipe(pipeName);
+  }
+
+  @Override
+  public TSStatus dropPipe(String pipeName) {
+    return configManager.dropPipe(pipeName);
+  }
+
+  @Override
+  public TShowPipeResp showPipe(TShowPipeReq req) {
+    return configManager.showPipe(req);
+  }
+
   @Override
   @TestOnly
   public TGetRoutingResp getRouting(TGetRoutingReq req) {
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 92ad262529..ba23c19619 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -35,6 +35,9 @@ import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
 import org.apache.iotdb.commons.partition.SeriesPartitionTable;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.PipeStatus;
+import org.apache.iotdb.commons.sync.pipe.TsFilePipeInfo;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
 import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
 import org.apache.iotdb.confignode.consensus.request.read.CountStorageGroupPlan;
@@ -74,6 +77,9 @@ import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTimeP
 import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
 import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
 import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
 import org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan;
 import org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
@@ -898,6 +904,42 @@ public class ConfigPhysicalPlanSerDeTest {
         getPipeSinkPlanWithNullName1.getPipeSinkName());
   }
 
+  @Test
+  public void PreCreatePipePlanTest() throws IOException {
+    PipeInfo pipeInfo =
+        new TsFilePipeInfo(
+            "name", "demo", PipeStatus.PREPARE_CREATE, System.currentTimeMillis(), 999, false);
+    PreCreatePipePlan PreCreatePipePlan = new PreCreatePipePlan(pipeInfo);
+    PreCreatePipePlan PreCreatePipePlan1 =
+        (PreCreatePipePlan)
+            ConfigPhysicalPlan.Factory.create(PreCreatePipePlan.serializeToByteBuffer());
+    Assert.assertEquals(PreCreatePipePlan.getPipeInfo(), PreCreatePipePlan1.getPipeInfo());
+  }
+
+  @Test
+  public void SetPipeStatusPlan() throws IOException {
+    SetPipeStatusPlan setPipeStatusPlan = new SetPipeStatusPlan("pipe", PipeStatus.PREPARE_CREATE);
+    SetPipeStatusPlan setPipeStatusPlan1 =
+        (SetPipeStatusPlan)
+            ConfigPhysicalPlan.Factory.create(setPipeStatusPlan.serializeToByteBuffer());
+    Assert.assertEquals(setPipeStatusPlan.getPipeName(), setPipeStatusPlan1.getPipeName());
+    Assert.assertEquals(setPipeStatusPlan.getPipeStatus(), setPipeStatusPlan1.getPipeStatus());
+  }
+
+  @Test
+  public void ShowPipePlanTest() throws IOException {
+    ShowPipePlan showPipePlan = new ShowPipePlan("demo");
+    ShowPipePlan showPipePlan1 =
+        (ShowPipePlan) ConfigPhysicalPlan.Factory.create(showPipePlan.serializeToByteBuffer());
+    Assert.assertEquals(showPipePlan.getPipeName(), showPipePlan1.getPipeName());
+    ShowPipePlan showPipePlanWithNullName = new ShowPipePlan();
+    ShowPipePlan showPipePlanWithNullName1 =
+        (ShowPipePlan)
+            ConfigPhysicalPlan.Factory.create(showPipePlanWithNullName.serializeToByteBuffer());
+    Assert.assertEquals(
+        showPipePlanWithNullName.getPipeName(), showPipePlanWithNullName1.getPipeName());
+  }
+
   @Test
   public void GetTriggerTablePlan() throws IOException {
     GetTriggerTablePlan getTriggerTablePlan0 = new GetTriggerTablePlan();
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/OperatePipeProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/OperatePipeProcedureTest.java
new file mode 100644
index 0000000000..16190ea3eb
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/OperatePipeProcedureTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.procedure.impl;
+
+import org.apache.iotdb.commons.exception.sync.PipeException;
+import org.apache.iotdb.confignode.procedure.impl.sync.CreatePipeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.sync.DropPipeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.sync.StartPipeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.sync.StopPipeProcedure;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class OperatePipeProcedureTest {
+
+  @Test
+  public void serializeDeserializeCreatePipeProcedureTest() throws PipeException {
+    PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+    Map<String, String> attributes = new HashMap<>();
+    attributes.put("syncdelop", "false");
+    TPipeInfo pipeInfo =
+        new TPipeInfo()
+            .setPipeName("PipeName")
+            .setPipeSinkName("PipeSinkName")
+            .setStartTime(999)
+            .setAttributes(attributes);
+
+    CreatePipeProcedure p1 = new CreatePipeProcedure(pipeInfo);
+
+    try {
+      p1.serialize(outputStream);
+      ByteBuffer buffer =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+
+      CreatePipeProcedure p2 = (CreatePipeProcedure) ProcedureFactory.getInstance().create(buffer);
+      assertEquals(p1, p2);
+    } catch (Exception e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void serializeDeserializeStartPipeProcedureTest() throws PipeException {
+    PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+    StartPipeProcedure p1 = new StartPipeProcedure("pipeName");
+
+    try {
+      p1.serialize(outputStream);
+      ByteBuffer buffer =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+
+      StartPipeProcedure p2 = (StartPipeProcedure) ProcedureFactory.getInstance().create(buffer);
+      assertEquals(p1, p2);
+    } catch (Exception e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void serializeDeserializeStopPipeProcedureTest() throws PipeException {
+    PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+    StopPipeProcedure p1 = new StopPipeProcedure("pipeName");
+
+    try {
+      p1.serialize(outputStream);
+      ByteBuffer buffer =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+
+      StopPipeProcedure p2 = (StopPipeProcedure) ProcedureFactory.getInstance().create(buffer);
+      assertEquals(p1, p2);
+    } catch (Exception e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void serializeDeserializeDropPipeProcedureTest() throws PipeException {
+    PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+    DropPipeProcedure p1 = new DropPipeProcedure("pipeName");
+
+    try {
+      p1.serialize(outputStream);
+      ByteBuffer buffer =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+
+      DropPipeProcedure p2 = (DropPipeProcedure) ProcedureFactory.getInstance().create(buffer);
+      assertEquals(p1, p2);
+    } catch (Exception e) {
+      fail();
+    }
+  }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeIT.java
index 106e837a23..ad4ff7a3b9 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/sync/IoTDBPipeIT.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.it.sync;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
 import org.apache.iotdb.itbase.category.LocalStandaloneIT;
 
 import org.junit.AfterClass;
@@ -37,7 +38,7 @@ import java.sql.Statement;
 import static org.apache.iotdb.db.it.utils.TestUtils.assertResultSetEqual;
 
 @RunWith(IoTDBTestRunner.class)
-@Category({LocalStandaloneIT.class})
+@Category({LocalStandaloneIT.class, ClusterIT.class})
 public class IoTDBPipeIT {
   private static String ip;
   private static int port;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java b/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeNotExistException.java
similarity index 68%
copy from node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeNotExistException.java
index d2bfdf743b..3edb2a1463 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeNotExistException.java
@@ -16,13 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.commons.sync.pipe;
+package org.apache.iotdb.commons.exception.sync;
 
-public enum SyncOperation {
-  CREATE_PIPESINK,
-  DROP_PIPESINK,
-  CREATE_PIPE,
-  START_PIPE,
-  STOP_PIPE,
-  DROP_PIPE
+public class PipeNotExistException extends PipeException {
+  public PipeNotExistException(String pipeName, int errorCode) {
+    super(String.format("PIPE [%s] does not exist", pipeName), errorCode);
+  }
+
+  public PipeNotExistException(String pipeName) {
+    super(String.format("PIPE [%s] does not exist", pipeName));
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java
index 57043817f7..d8079dd217 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/metadata/SyncMetadata.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.commons.sync.metadata;
 
 import org.apache.iotdb.commons.exception.sync.PipeException;
+import org.apache.iotdb.commons.exception.sync.PipeNotExistException;
 import org.apache.iotdb.commons.exception.sync.PipeSinkException;
 import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
 import org.apache.iotdb.commons.sync.persistence.SyncLogReader;
@@ -145,35 +146,30 @@ public class SyncMetadata implements SnapshotProcessor {
   // region Implement of Pipe
   // ======================================================
 
-  public void addPipe(PipeInfo pipeInfo, PipeSink pipeSink) throws PipeException {
-    // common check
+  public void checkAddPipe(PipeInfo pipeInfo) throws PipeException {
+    // check PipeSink exists
+    if (!isPipeSinkExist(pipeInfo.getPipeSinkName())) {
+      throw new PipeException(
+          String.format("can not find PIPESINK %s.", pipeInfo.getPipeSinkName()));
+    }
+    // check Pipe does not exist
     if (runningPipe != null && runningPipe.getStatus() != PipeStatus.DROP) {
       throw new PipeException(
           String.format(
-              "Pipe %s is %s, please retry after drop it.",
+              "PIPE %s is %s, please retry after drop it.",
               runningPipe.getPipeName(), runningPipe.getStatus().name()));
     }
+  }
+
+  public void addPipe(PipeInfo pipeInfo) {
     runningPipe = pipeInfo;
     pipes
         .computeIfAbsent(runningPipe.getPipeName(), i -> new ConcurrentHashMap<>())
         .computeIfAbsent(runningPipe.getCreateTime(), i -> runningPipe);
   }
 
-  public void operatePipe(String pipeName, SyncOperation syncOperation) throws PipeException {
-    checkIfPipeExistAndRunning(pipeName);
-    switch (syncOperation) {
-      case START_PIPE:
-        runningPipe.start();
-        break;
-      case STOP_PIPE:
-        runningPipe.stop();
-        break;
-      case DROP_PIPE:
-        runningPipe.drop();
-        break;
-      default:
-        throw new PipeException("Unknown operatorType " + syncOperation);
-    }
+  public void setPipeStatus(String pipeName, PipeStatus status) throws PipeException {
+    runningPipe.setStatus(status);
   }
 
   public PipeInfo getPipeInfo(String pipeName, long createTime) {
@@ -193,14 +189,14 @@ public class SyncMetadata implements SnapshotProcessor {
     return runningPipe;
   }
 
-  private void checkIfPipeExistAndRunning(String pipeName) throws PipeException {
+  public void checkIfPipeExist(String pipeName) throws PipeException {
     if (runningPipe == null || runningPipe.getStatus() == PipeStatus.DROP) {
-      throw new PipeException("There is no existing pipe.");
+      throw new PipeNotExistException(pipeName);
     }
     if (!runningPipe.getPipeName().equals(pipeName)) {
       throw new PipeException(
           String.format(
-              "Pipe %s is %s, please retry after drop it.",
+              "PIPE %s is %s, please retry after drop it.",
               runningPipe.getPipeName(), runningPipe.getStatus()));
     }
   }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogReader.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogReader.java
index 2f402caf6d..870b35cbba 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogReader.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/persistence/SyncLogReader.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.commons.sync.persistence;
 
 import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.PipeStatus;
 import org.apache.iotdb.commons.sync.pipe.SyncOperation;
 import org.apache.iotdb.commons.sync.pipesink.PipeSink;
 import org.apache.iotdb.commons.sync.utils.SyncConstant;
@@ -98,17 +99,17 @@ public class SyncLogReader {
         case STOP_PIPE:
           // TODO: support multiple pipe
           ReadWriteIOUtils.readString(inputStream);
-          runningPipe.stop();
+          runningPipe.setStatus(PipeStatus.STOP);
           break;
         case START_PIPE:
           // TODO: support multiple pipe
           ReadWriteIOUtils.readString(inputStream);
-          runningPipe.start();
+          runningPipe.setStatus(PipeStatus.RUNNING);
           break;
         case DROP_PIPE:
           // TODO: support multiple pipe
           ReadWriteIOUtils.readString(inputStream);
-          runningPipe.drop();
+          runningPipe.setStatus(PipeStatus.DROP);
           break;
         default:
           throw new UnsupportedOperationException(
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeInfo.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeInfo.java
index b93da6a86a..074315c716 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeInfo.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeInfo.java
@@ -18,13 +18,22 @@
  */
 package org.apache.iotdb.commons.sync.pipe;
 
+import org.apache.iotdb.commons.exception.runtime.SerializationRunTimeException;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 
 public abstract class PipeInfo {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipeInfo.class);
 
   protected String pipeName;
   protected String pipeSinkName;
@@ -85,18 +94,6 @@ public abstract class PipeInfo {
     this.messageType = messageType;
   }
 
-  public void start() {
-    this.status = PipeStatus.RUNNING;
-  }
-
-  public void stop() {
-    this.status = PipeStatus.STOP;
-  }
-
-  public void drop() {
-    this.status = PipeStatus.DROP;
-  }
-
   public long getCreateTime() {
     return createTime;
   }
@@ -105,6 +102,8 @@ public abstract class PipeInfo {
     this.createTime = createTime;
   }
 
+  public abstract TShowPipeInfo getTShowPipeInfo();
+
   public void serialize(OutputStream outputStream) throws IOException {
     ReadWriteIOUtils.write((byte) getType().ordinal(), outputStream);
     ReadWriteIOUtils.write(pipeName, outputStream);
@@ -122,6 +121,25 @@ public abstract class PipeInfo {
     messageType = PipeMessage.PipeMessageType.values()[ReadWriteIOUtils.readByte(inputStream)];
   }
 
+  protected void deserialize(ByteBuffer byteBuffer) {
+    pipeName = ReadWriteIOUtils.readString(byteBuffer);
+    pipeSinkName = ReadWriteIOUtils.readString(byteBuffer);
+    status = PipeStatus.values()[ReadWriteIOUtils.readByte(byteBuffer)];
+    createTime = ReadWriteIOUtils.readLong(byteBuffer);
+    messageType = PipeMessage.PipeMessageType.values()[ReadWriteIOUtils.readByte(byteBuffer)];
+  }
+
+  public ByteBuffer serializeToByteBuffer() {
+    try (PublicBAOS publicBAOS = new PublicBAOS();
+        DataOutputStream dataOutputStream = new DataOutputStream(publicBAOS)) {
+      serialize(dataOutputStream);
+      return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
+    } catch (IOException e) {
+      LOGGER.error("Unexpected error occurred when serializing PipeInfo.");
+      throw new SerializationRunTimeException(e);
+    }
+  }
+
   public static PipeInfo deserializePipeInfo(InputStream inputStream) throws IOException {
     PipeType pipeType = PipeType.values()[ReadWriteIOUtils.readByte(inputStream)];
     PipeInfo pipeInfo;
@@ -138,6 +156,22 @@ public abstract class PipeInfo {
     return pipeInfo;
   }
 
+  public static PipeInfo deserializePipeInfo(ByteBuffer byteBuffer) {
+    PipeType pipeType = PipeType.values()[ReadWriteIOUtils.readByte(byteBuffer)];
+    PipeInfo pipeInfo;
+    switch (pipeType) {
+      case TsFilePipe:
+        pipeInfo = new TsFilePipeInfo();
+        pipeInfo.deserialize(byteBuffer);
+        break;
+      case WALPipe:
+      default:
+        throw new UnsupportedOperationException(
+            String.format("Can not recognize PipeType %s.", pipeType.name()));
+    }
+    return pipeInfo;
+  }
+
   enum PipeType {
     TsFilePipe,
     WALPipe
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeStatus.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeStatus.java
index 0d2a766f37..09f92c4c25 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeStatus.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/PipeStatus.java
@@ -22,5 +22,10 @@ public enum PipeStatus {
   // a new pipe should be stop status
   RUNNING,
   STOP,
-  DROP
+  DROP,
+  // internal status
+  PREPARE_CREATE,
+  PREPARE_START,
+  PREPARE_STOP,
+  PREPARE_DROP
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
index d2bfdf743b..ec935e4c94 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
@@ -19,8 +19,10 @@
 package org.apache.iotdb.commons.sync.pipe;
 
 public enum SyncOperation {
+  // PIPESINK
   CREATE_PIPESINK,
   DROP_PIPESINK,
+  // PIPE
   CREATE_PIPE,
   START_PIPE,
   STOP_PIPE,
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java
index 391d4c65fd..288cb8f80d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/TsFilePipeInfo.java
@@ -18,11 +18,13 @@
  */
 package org.apache.iotdb.commons.sync.pipe;
 
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.util.Objects;
 
 public class TsFilePipeInfo extends PipeInfo {
@@ -76,6 +78,12 @@ public class TsFilePipeInfo extends PipeInfo {
     return PipeType.TsFilePipe;
   }
 
+  @Override
+  public TShowPipeInfo getTShowPipeInfo() {
+    return new TShowPipeInfo(
+        createTime, pipeName, "sender", pipeSinkName, status.name(), messageType.name());
+  }
+
   @Override
   public void serialize(OutputStream outputStream) throws IOException {
     super.serialize(outputStream);
@@ -90,6 +98,13 @@ public class TsFilePipeInfo extends PipeInfo {
     dataStartTimestamp = ReadWriteIOUtils.readLong(inputStream);
   }
 
+  @Override
+  protected void deserialize(ByteBuffer buffer) {
+    super.deserialize(buffer);
+    syncDelOp = ReadWriteIOUtils.readBool(buffer);
+    dataStartTimestamp = ReadWriteIOUtils.readLong(buffer);
+  }
+
   @Override
   public String toString() {
     return "TsFilePipeInfo{"
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/IoTDBPipeSink.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/IoTDBPipeSink.java
index b0cbf2aef6..4ac32c6899 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/IoTDBPipeSink.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/IoTDBPipeSink.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -143,6 +144,13 @@ public class IoTDBPipeSink implements PipeSink {
     port = ReadWriteIOUtils.readInt(inputStream);
   }
 
+  @Override
+  public void deserialize(ByteBuffer buffer) {
+    name = ReadWriteIOUtils.readString(buffer);
+    ip = ReadWriteIOUtils.readString(buffer);
+    port = ReadWriteIOUtils.readInt(buffer);
+  }
+
   @Override
   public String toString() {
     return "IoTDBPipeSink{"
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/PipeSink.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/PipeSink.java
index 588618d96e..d79aae750c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/PipeSink.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipesink/PipeSink.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 
@@ -49,6 +50,8 @@ public interface PipeSink {
 
   void deserialize(InputStream inputStream) throws IOException;
 
+  void deserialize(ByteBuffer buffer);
+
   static PipeSink deserializePipeSink(InputStream inputStream) throws IOException {
     PipeSinkType pipeSinkType = PipeSinkType.values()[ReadWriteIOUtils.readByte(inputStream)];
     PipeSink pipeSink;
@@ -66,6 +69,23 @@ public interface PipeSink {
     return pipeSink;
   }
 
+  static PipeSink deserializePipeSink(ByteBuffer buffer) {
+    PipeSinkType pipeSinkType = PipeSinkType.values()[ReadWriteIOUtils.readByte(buffer)];
+    PipeSink pipeSink;
+    switch (pipeSinkType) {
+      case IoTDB:
+        pipeSink = new IoTDBPipeSink();
+        pipeSink.deserialize(buffer);
+        break;
+      case ExternalPipe:
+        // TODO(ext-pipe): deserialize external pipesink here
+      default:
+        throw new UnsupportedOperationException(
+            String.format("Can not recognize PipeSinkType %s.", pipeSinkType.name()));
+    }
+    return pipeSink;
+  }
+
   enum PipeSinkType {
     IoTDB,
     ExternalPipe
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
index 53e089dad8..36ba2d3415 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.commons.utils;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 
@@ -136,7 +137,7 @@ public class ThriftConfigNodeSerDeUtils {
     try {
       pipeSinkInfo.write(generateWriteProtocol(stream));
     } catch (TException e) {
-      throw new ThriftSerDeException("Write TConfigNodeLocation failed: ", e);
+      throw new ThriftSerDeException("Write TPipeSinkInfo failed: ", e);
     }
   }
 
@@ -145,8 +146,26 @@ public class ThriftConfigNodeSerDeUtils {
     try {
       pipeSinkInfo.read(generateReadProtocol(buffer));
     } catch (TException e) {
-      throw new ThriftSerDeException("Read TConfigNodeLocation failed: ", e);
+      throw new ThriftSerDeException("Read TPipeSinkInfo failed: ", e);
     }
     return pipeSinkInfo;
   }
+
+  public static void serializeTPipeInfo(TPipeInfo pipeInfo, DataOutputStream stream) {
+    try {
+      pipeInfo.write(generateWriteProtocol(stream));
+    } catch (TException e) {
+      throw new ThriftSerDeException("Write TPipeInfo failed: ", e);
+    }
+  }
+
+  public static TPipeInfo deserializeTPipeInfo(ByteBuffer buffer) {
+    TPipeInfo pipeInfo = new TPipeInfo();
+    try {
+      pipeInfo.read(generateReadProtocol(buffer));
+    } catch (TException e) {
+      throw new ThriftSerDeException("Read TPipeInfo failed: ", e);
+    }
+    return pipeInfo;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index f9dd77cc37..39fae8d887 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -70,6 +70,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
@@ -85,6 +86,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TSetTimePartitionIntervalReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowStorageGroupResp;
@@ -1075,6 +1078,86 @@ public class ConfigNodeClient
     throw new TException(MSG_RECONNECTION_FAIL);
   }
 
+  @Override
+  public TSStatus createPipe(TPipeInfo req) throws TException {
+    for (int i = 0; i < RETRY_NUM; i++) {
+      try {
+        TSStatus status = client.createPipe(req);
+        if (!updateConfigNodeLeader(status)) {
+          return status;
+        }
+      } catch (TException e) {
+        configLeader = null;
+      }
+      reconnect();
+    }
+    throw new TException(MSG_RECONNECTION_FAIL);
+  }
+
+  @Override
+  public TSStatus startPipe(String pipeName) throws TException {
+    for (int i = 0; i < RETRY_NUM; i++) {
+      try {
+        TSStatus status = client.startPipe(pipeName);
+        if (!updateConfigNodeLeader(status)) {
+          return status;
+        }
+      } catch (TException e) {
+        configLeader = null;
+      }
+      reconnect();
+    }
+    throw new TException(MSG_RECONNECTION_FAIL);
+  }
+
+  @Override
+  public TSStatus stopPipe(String pipeName) throws TException {
+    for (int i = 0; i < RETRY_NUM; i++) {
+      try {
+        TSStatus status = client.stopPipe(pipeName);
+        if (!updateConfigNodeLeader(status)) {
+          return status;
+        }
+      } catch (TException e) {
+        configLeader = null;
+      }
+      reconnect();
+    }
+    throw new TException(MSG_RECONNECTION_FAIL);
+  }
+
+  @Override
+  public TSStatus dropPipe(String pipeName) throws TException {
+    for (int i = 0; i < RETRY_NUM; i++) {
+      try {
+        TSStatus status = client.dropPipe(pipeName);
+        if (!updateConfigNodeLeader(status)) {
+          return status;
+        }
+      } catch (TException e) {
+        configLeader = null;
+      }
+      reconnect();
+    }
+    throw new TException(MSG_RECONNECTION_FAIL);
+  }
+
+  @Override
+  public TShowPipeResp showPipe(TShowPipeReq req) throws TException {
+    for (int i = 0; i < RETRY_NUM; i++) {
+      try {
+        TShowPipeResp resp = client.showPipe(req);
+        if (!updateConfigNodeLeader(resp.getStatus())) {
+          return resp;
+        }
+      } catch (TException e) {
+        configLeader = null;
+      }
+      reconnect();
+    }
+    throw new TException(MSG_RECONNECTION_FAIL);
+  }
+
   @Override
   @TestOnly
   public TGetRoutingResp getRouting(TGetRoutingReq req) throws TException {
diff --git a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
index bcbdd4d0ff..c0737f884e 100644
--- a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
@@ -55,7 +55,7 @@ import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.commons.sync.pipesink.PipeSink;
 import org.apache.iotdb.commons.utils.AuthUtils;
 import org.apache.iotdb.commons.utils.StatusUtils;
-import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -95,6 +95,7 @@ import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
 import org.apache.iotdb.db.rescon.MemTableManager;
 import org.apache.iotdb.db.sync.SyncService;
+import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -1337,7 +1338,7 @@ public class LocalConfigNode {
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
   }
 
-  public List<PipeSink> showPipeSink(String pipeSinkName) {
+  public List<PipeSink> showPipeSink(String pipeSinkName) throws PipeSinkException {
     boolean showAll = StringUtils.isEmpty(pipeSinkName);
     if (showAll) {
       return syncService.getAllPipeSink();
@@ -1348,7 +1349,9 @@ public class LocalConfigNode {
 
   public TSStatus createPipe(CreatePipeStatement createPipeStatement) {
     try {
-      syncService.addPipe(createPipeStatement);
+      syncService.addPipe(
+          SyncPipeUtil.parseCreatePipePlanAsPipeInfo(
+              createPipeStatement, System.currentTimeMillis()));
     } catch (PipeException e) {
       return RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, e.getMessage());
     }
@@ -1383,7 +1386,7 @@ public class LocalConfigNode {
   }
 
   public TShowPipeResp showPipe(String pipeName) {
-    List<TPipeInfo> pipeInfos = SyncService.getInstance().showPipe(pipeName);
+    List<TShowPipeInfo> pipeInfos = SyncService.getInstance().showPipe(pipeName);
     return new TShowPipeResp().setPipeInfoList(pipeInfos).setStatus(StatusUtils.OK);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 33db99d1dc..47247a9121 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -43,11 +43,14 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowStorageGroupResp;
@@ -73,6 +76,7 @@ import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowNodes
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowPathSetTemplateTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowSchemaTemplateTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeSinkTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeTask;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTriggerStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
@@ -812,7 +816,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
       TSStatus tsStatus = configNodeClient.createPipeSink(pipeSinkInfo);
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
         LOGGER.error(
-            "Failed to create PipeSink {} with type {} in config node, status is {}.",
+            "Failed to create PIPESINK {} with type {} in config node, status is {}.",
             createPipeSinkStatement.getPipeSinkName(),
             createPipeSinkStatement.getPipeSinkType(),
             tsStatus);
@@ -837,7 +841,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
       TSStatus tsStatus = configNodeClient.dropPipeSink(req);
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
         LOGGER.error(
-            "Failed to drop PipeSink {} in config node, status is {}.",
+            "Failed to drop PIPESINK {} in config node, status is {}.",
             dropPipeSinkStatement.getPipeSinkName(),
             tsStatus);
         future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
@@ -871,50 +875,101 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
   @Override
   public SettableFuture<ConfigTaskResult> createPipe(CreatePipeStatement createPipeStatement) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
-    future.setException(
-        new IoTDBException(
-            "Executing create pipe is not supported",
-            TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
+    try (ConfigNodeClient configNodeClient =
+        CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+      TPipeInfo pipeInfo =
+          new TPipeInfo()
+              .setPipeName(createPipeStatement.getPipeName())
+              .setPipeSinkName(createPipeStatement.getPipeSinkName())
+              .setStartTime(createPipeStatement.getStartTime())
+              .setAttributes(createPipeStatement.getPipeAttributes());
+      TSStatus tsStatus = configNodeClient.createPipe(pipeInfo);
+      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
+        LOGGER.error(
+            "Failed to create PIPE {} in config node, status is {}.",
+            createPipeStatement.getPipeName(),
+            tsStatus);
+        future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
+      } else {
+        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+      }
+    } catch (Exception e) {
+      future.setException(e);
+    }
     return future;
   }
 
   @Override
-  public SettableFuture<ConfigTaskResult> dropPipe(DropPipeStatement dropPipeStatement) {
+  public SettableFuture<ConfigTaskResult> startPipe(StartPipeStatement startPipeStatement) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
-    future.setException(
-        new IoTDBException(
-            "Executing drop pipe is not supported",
-            TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
+    try (ConfigNodeClient configNodeClient =
+        CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+      TSStatus tsStatus = configNodeClient.startPipe(startPipeStatement.getPipeName());
+      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
+        LOGGER.error(
+            "Failed to start PIPE {}, status is {}.", startPipeStatement.getPipeName(), tsStatus);
+        future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
+      } else {
+        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+      }
+    } catch (Exception e) {
+      future.setException(e);
+    }
     return future;
   }
 
   @Override
-  public SettableFuture<ConfigTaskResult> showPipe(ShowPipeStatement showPipeStatement) {
+  public SettableFuture<ConfigTaskResult> dropPipe(DropPipeStatement dropPipeStatement) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
-    future.setException(
-        new IoTDBException(
-            "Executing show pipe is not supported",
-            TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
+    try (ConfigNodeClient configNodeClient =
+        CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+      TSStatus tsStatus = configNodeClient.dropPipe(dropPipeStatement.getPipeName());
+      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
+        LOGGER.error(
+            "Failed to drop PIPE {}, status is {}.", dropPipeStatement.getPipeName(), tsStatus);
+        future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
+      } else {
+        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+      }
+    } catch (Exception e) {
+      future.setException(e);
+    }
     return future;
   }
 
   @Override
-  public SettableFuture<ConfigTaskResult> startPipe(StartPipeStatement startPipeStatement) {
+  public SettableFuture<ConfigTaskResult> stopPipe(StopPipeStatement stopPipeStatement) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
-    future.setException(
-        new IoTDBException(
-            "Executing Start pipe is not supported",
-            TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
+    try (ConfigNodeClient configNodeClient =
+        CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+      TSStatus tsStatus = configNodeClient.stopPipe(stopPipeStatement.getPipeName());
+      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
+        LOGGER.error(
+            "Failed to stop PIPE {}, status is {}.", stopPipeStatement.getPipeName(), tsStatus);
+        future.setException(new IoTDBException(tsStatus.message, tsStatus.code));
+      } else {
+        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+      }
+    } catch (Exception e) {
+      future.setException(e);
+    }
     return future;
   }
 
   @Override
-  public SettableFuture<ConfigTaskResult> stopPipe(StopPipeStatement stopPipeStatement) {
+  public SettableFuture<ConfigTaskResult> showPipe(ShowPipeStatement showPipeStatement) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
-    future.setException(
-        new IoTDBException(
-            "Executing stop pipe is not supported",
-            TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
+    try (ConfigNodeClient configNodeClient =
+        CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+      TShowPipeReq tShowPipeReq = new TShowPipeReq();
+      if (!StringUtils.isEmpty(showPipeStatement.getPipeName())) {
+        tShowPipeReq.setPipeName(showPipeStatement.getPipeName());
+      }
+      TShowPipeResp resp = configNodeClient.showPipe(tShowPipeReq);
+      ShowPipeTask.buildTSBlock(resp.getPipeInfoList(), future);
+    } catch (Exception e) {
+      future.setException(e);
+    }
     return future;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
index 934cec4434..d2c4a0a333 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.exception.sync.PipeSinkException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.sync.pipesink.PipeSink;
 import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
@@ -72,6 +73,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -510,9 +512,13 @@ public class StandaloneConfigTaskExecutor implements IConfigTaskExecutor {
   public SettableFuture<ConfigTaskResult> showPipeSink(
       ShowPipeSinkStatement showPipeSinkStatement) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
-    List<PipeSink> pipeSinkList =
-        LocalConfigNode.getInstance().showPipeSink(showPipeSinkStatement.getPipeSinkName());
-    ShowPipeSinkTask.buildTSBlockByPipeSink(pipeSinkList, future);
+    try {
+      List<PipeSink> pipeSinkList =
+          LocalConfigNode.getInstance().showPipeSink(showPipeSinkStatement.getPipeSinkName());
+      ShowPipeSinkTask.buildTSBlockByPipeSink(pipeSinkList, future);
+    } catch (PipeSinkException e) {
+      ShowPipeSinkTask.buildTSBlockByPipeSink(Collections.emptyList(), future);
+    }
     return future;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeSinkTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeSinkTask.java
index d78ac36e98..2f0965ab5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeSinkTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeSinkTask.java
@@ -83,7 +83,7 @@ public class ShowPipeSinkTask implements IConfigTask {
             .collect(Collectors.toList());
     TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
     for (TPipeSinkInfo pipeSinkInfo : pipeSinkInfoList) {
-      PipeSink pipeSink = SyncPipeUtil.parsePipeInfoAsPipe(pipeSinkInfo);
+      PipeSink pipeSink = SyncPipeUtil.parseTPipeSinkInfoAsPipeSink(pipeSinkInfo);
       builder.getTimeColumnBuilder().writeLong(0L);
       builder.getColumnBuilder(0).writeBinary(new Binary(pipeSink.getPipeSinkName()));
       builder.getColumnBuilder(1).writeBinary(new Binary(pipeSink.getType().name()));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeTask.java
index 180cdd2d1d..5ecb7866c8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/sync/ShowPipeTask.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.mpp.plan.execution.config.sys.sync;
 
-import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
@@ -55,13 +55,13 @@ public class ShowPipeTask implements IConfigTask {
   }
 
   public static void buildTSBlock(
-      List<TPipeInfo> pipeInfoList, SettableFuture<ConfigTaskResult> future) {
+      List<TShowPipeInfo> pipeInfoList, SettableFuture<ConfigTaskResult> future) {
     List<TSDataType> outputDataTypes =
         ColumnHeaderConstant.showPipeColumnHeaders.stream()
             .map(ColumnHeader::getColumnType)
             .collect(Collectors.toList());
     TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
-    for (TPipeInfo tPipeInfo : pipeInfoList) {
+    for (TShowPipeInfo tPipeInfo : pipeInfoList) {
       builder.getTimeColumnBuilder().writeLong(0L);
       builder
           .getColumnBuilder(0)
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 00a275f1e2..1c12638403 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -159,6 +159,7 @@ import org.apache.iotdb.db.tools.TsFileSplitByPartitionTool;
 import org.apache.iotdb.db.utils.FileLoaderUtils;
 import org.apache.iotdb.db.utils.TypeInferenceUtils;
 import org.apache.iotdb.db.utils.UpgradeUtils;
+import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
 import org.apache.iotdb.db.wal.WALManager;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -2494,7 +2495,8 @@ public class PlanExecutor implements IPlanExecutor {
 
   private void createPipe(CreatePipePlan plan) throws QueryProcessException {
     try {
-      SyncService.getInstance().addPipe(plan);
+      SyncService.getInstance()
+          .addPipe(SyncPipeUtil.parseCreatePipePlanAsPipeInfo(plan, System.currentTimeMillis()));
     } catch (PipeException e) {
       throw new QueryProcessException("Create pipe error.", e);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 33d8b9de14..63df1528e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -34,8 +34,11 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.exception.sync.PipeException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.SyncOperation;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
 import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
 import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
@@ -82,6 +85,7 @@ import org.apache.iotdb.db.service.RegionMigrateService;
 import org.apache.iotdb.db.service.metrics.MetricService;
 import org.apache.iotdb.db.service.metrics.enums.Metric;
 import org.apache.iotdb.db.service.metrics.enums.Tag;
+import org.apache.iotdb.db.sync.SyncService;
 import org.apache.iotdb.db.trigger.service.TriggerManagementService;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
@@ -97,6 +101,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
 import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreatePipeOnDataNodeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteTimeSeriesReq;
@@ -118,6 +123,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
 import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
 import org.apache.iotdb.mpp.rpc.thrift.TLoadSample;
 import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
+import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
@@ -505,6 +511,40 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
     return RpcUtils.SUCCESS_STATUS;
   }
 
+  @Override
+  public TSStatus createPipeOnDataNode(TCreatePipeOnDataNodeReq req) throws TException {
+    try {
+      PipeInfo pipeInfo = PipeInfo.deserializePipeInfo(req.pipeInfo);
+      SyncService.getInstance().addPipe(pipeInfo);
+      return RpcUtils.SUCCESS_STATUS;
+    } catch (PipeException e) {
+      return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
+    }
+  }
+
+  @Override
+  public TSStatus operatePipeOnDataNode(TOperatePipeOnDataNodeReq req) throws TException {
+    try {
+      switch (SyncOperation.values()[req.getOperation()]) {
+        case START_PIPE:
+          SyncService.getInstance().startPipe(req.getPipeName());
+          break;
+        case STOP_PIPE:
+          SyncService.getInstance().stopPipe(req.getPipeName());
+          break;
+        case DROP_PIPE:
+          SyncService.getInstance().dropPipe(req.getPipeName());
+          break;
+        default:
+          return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
+              .setMessage("Unsupported operation.");
+      }
+      return RpcUtils.SUCCESS_STATUS;
+    } catch (PipeException e) {
+      return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
+    }
+  }
+
   private PathPatternTree filterPathPatternTree(PathPatternTree patternTree, String storageGroup) {
     PathPatternTree filteredPatternTree = new PathPatternTree();
     try {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
index 35a064c531..f4b9cd6ce4 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
@@ -30,18 +30,20 @@ import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.sync.pipe.PipeInfo;
 import org.apache.iotdb.commons.sync.pipe.PipeMessage;
 import org.apache.iotdb.commons.sync.pipe.PipeStatus;
+import org.apache.iotdb.commons.sync.pipe.TsFilePipeInfo;
 import org.apache.iotdb.commons.sync.pipesink.PipeSink;
 import org.apache.iotdb.commons.sync.utils.SyncConstant;
 import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
 import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPipePlan;
 import org.apache.iotdb.db.qp.utils.DatetimeUtils;
 import org.apache.iotdb.db.query.dataset.ListDataSet;
+import org.apache.iotdb.db.sync.common.ClusterSyncInfoFetcher;
 import org.apache.iotdb.db.sync.common.ISyncInfoFetcher;
 import org.apache.iotdb.db.sync.common.LocalSyncInfoFetcher;
 import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginManager;
@@ -75,19 +77,25 @@ import java.util.List;
 
 public class SyncService implements IService {
   private static final Logger logger = LoggerFactory.getLogger(SyncService.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
   private Pipe runningPipe;
 
   /* handle external Pipe */
   private ExtPipePluginManager extPipePluginManager;
 
-  private ISyncInfoFetcher syncInfoFetcher = LocalSyncInfoFetcher.getInstance();
+  private ISyncInfoFetcher syncInfoFetcher;
 
   /* handle rpc in receiver-side*/
   private final ReceiverManager receiverManager;
 
   private SyncService() {
     receiverManager = new ReceiverManager();
+    if (config.isClusterMode()) {
+      syncInfoFetcher = ClusterSyncInfoFetcher.getInstance();
+    } else {
+      syncInfoFetcher = LocalSyncInfoFetcher.getInstance();
+    }
   }
 
   private static class SyncServiceHolder {
@@ -124,7 +132,7 @@ public class SyncService implements IService {
 
   // region Interfaces and Implementation of PipeSink
 
-  public PipeSink getPipeSink(String name) {
+  public PipeSink getPipeSink(String name) throws PipeSinkException {
     return syncInfoFetcher.getPipeSink(name);
   }
 
@@ -159,51 +167,35 @@ public class SyncService implements IService {
 
   // region Interfaces and Implementation of Pipe
 
-  // TODO(sync): delete this in new-standalone version
-  public synchronized void addPipe(CreatePipePlan plan) throws PipeException {
-    // check plan
+  public synchronized void addPipe(PipeInfo pipeInfo) throws PipeException {
+    logger.info("Execute CREATE PIPE {}", pipeInfo.getPipeName());
     long currentTime = DatetimeUtils.currentTime();
-    if (plan.getDataStartTimestamp() > currentTime) {
-      throw new PipeException(
-          String.format(
-              "Start time %s is later than current time %s, this is not supported yet.",
-              DatetimeUtils.convertLongToDate(plan.getDataStartTimestamp()),
-              DatetimeUtils.convertLongToDate(currentTime)));
+    if (pipeInfo instanceof TsFilePipeInfo) {
+      // TODO(sync): move check logic to PipeInfo#validate()
+      // check statement
+      if (((TsFilePipeInfo) pipeInfo).getDataStartTimestamp() > currentTime) {
+        throw new PipeException(
+            String.format(
+                "Start time %s is later than current time %s, this is not supported yet.",
+                DatetimeUtils.convertLongToDate(
+                    ((TsFilePipeInfo) pipeInfo).getDataStartTimestamp()),
+                DatetimeUtils.convertLongToDate(currentTime)));
+      }
     }
     // add pipe
-    TSStatus status = syncInfoFetcher.addPipe(plan, currentTime);
+    TSStatus status = syncInfoFetcher.addPipe(pipeInfo);
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       throw new PipeException(status.message);
     }
-
-    PipeSink runningPipeSink = getPipeSink(plan.getPipeSinkName());
-    runningPipe = SyncPipeUtil.parseCreatePipePlanAsPipe(plan, runningPipeSink, currentTime);
-    if (runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.ExternalPipe) {
-      // for external pipe
-      // == start ExternalPipeProcessor for send data to external pipe plugin
-      startExternalPipeManager(false);
+    PipeSink runningPipeSink;
+    try {
+      runningPipeSink = getPipeSink(pipeInfo.getPipeSinkName());
+    } catch (PipeSinkException e) {
+      logger.error("failed to add PIPE because {}", e.getMessage(), e);
+      throw new PipeException(String.format("failed to add PIPE because %s", e.getMessage()));
     }
-  }
 
-  public synchronized void addPipe(CreatePipeStatement statement) throws PipeException {
-    logger.info("Execute CREATE PIPE {}", statement.getPipeName());
-    // check statement
-    long currentTime = DatetimeUtils.currentTime();
-    if (statement.getStartTime() > currentTime) {
-      throw new PipeException(
-          String.format(
-              "Start time %s is later than current time %s, this is not supported yet.",
-              DatetimeUtils.convertLongToDate(statement.getStartTime()),
-              DatetimeUtils.convertLongToDate(currentTime)));
-    }
-    // add pipe
-    TSStatus status = syncInfoFetcher.addPipe(statement, currentTime);
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new PipeException(status.message);
-    }
-
-    PipeSink runningPipeSink = getPipeSink(statement.getPipeSinkName());
-    runningPipe = SyncPipeUtil.parseCreatePipePlanAsPipe(statement, runningPipeSink, currentTime);
+    runningPipe = SyncPipeUtil.parseTPipeSinkInfoAsPipeSink(pipeInfo, runningPipeSink);
     if (runningPipe.getPipeSink().getType()
         == PipeSink.PipeSinkType.ExternalPipe) { // for external pipe
       // == start ExternalPipeProcessor for send data to external pipe plugin
@@ -212,7 +204,7 @@ public class SyncService implements IService {
   }
 
   public synchronized void stopPipe(String pipeName) throws PipeException {
-    logger.info("Execute STOP PIPE {}", pipeName);
+    logger.info("Execute stop PIPE {}", pipeName);
     checkRunningPipeExistAndName(pipeName);
     if (runningPipe.getStatus() == PipeStatus.RUNNING) {
       if (runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.IoTDB) {
@@ -232,11 +224,14 @@ public class SyncService implements IService {
         runningPipe.stop();
       }
     }
-    syncInfoFetcher.stopPipe(pipeName);
+    TSStatus status = syncInfoFetcher.stopPipe(pipeName);
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(status.message);
+    }
   }
 
   public synchronized void startPipe(String pipeName) throws PipeException {
-    logger.info("Execute START PIPE {}", pipeName);
+    logger.info("Execute start PIPE {}", pipeName);
     checkRunningPipeExistAndName(pipeName);
     if (runningPipe.getStatus() == PipeStatus.STOP) {
       if (runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.IoTDB) {
@@ -246,11 +241,14 @@ public class SyncService implements IService {
         startExternalPipeManager(true);
       }
     }
-    syncInfoFetcher.startPipe(pipeName);
+    TSStatus status = syncInfoFetcher.startPipe(pipeName);
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(status.message);
+    }
   }
 
   public synchronized void dropPipe(String pipeName) throws PipeException {
-    logger.info("Execute DROP PIPE {}", pipeName);
+    logger.info("Execute drop PIPE {}", pipeName);
     checkRunningPipeExistAndName(pipeName);
     if (runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.IoTDB) {
       runningPipe.drop();
@@ -265,7 +263,10 @@ public class SyncService implements IService {
       runningPipe.drop();
     }
 
-    syncInfoFetcher.dropPipe(pipeName);
+    TSStatus status = syncInfoFetcher.dropPipe(pipeName);
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(status.message);
+    }
   }
 
   public List<PipeInfo> getAllPipeInfos() {
@@ -274,19 +275,19 @@ public class SyncService implements IService {
 
   private void checkRunningPipeExistAndName(String pipeName) throws PipeException {
     if (runningPipe == null || runningPipe.getStatus() == PipeStatus.DROP) {
-      throw new PipeException("There is no existing pipe.");
+      throw new PipeException("There is no existing PIPE.");
     }
     if (!runningPipe.getName().equals(pipeName)) {
       throw new PipeException(
           String.format(
-              "Pipe %s is %s, please retry after drop it.",
+              "PIPE %s is %s, please retry after drop it.",
               runningPipe.getName(), runningPipe.getStatus()));
     }
   }
 
   public synchronized void recordMessage(PipeMessage message) {
     if (runningPipe == null || runningPipe.getStatus() == PipeStatus.DROP) {
-      logger.info(String.format("No running pipe for message %s.", message));
+      logger.info(String.format("No running PIPE for message %s.", message));
       return;
     }
     TSStatus status = null;
@@ -300,7 +301,7 @@ public class SyncService implements IService {
         } catch (PipeException e) {
           logger.error(
               String.format(
-                  "Stop pipe %s when meeting error in sender service.", runningPipe.getName()),
+                  "Stop PIPE %s when meeting error in sender service.", runningPipe.getName()),
               e);
         }
         break;
@@ -317,14 +318,14 @@ public class SyncService implements IService {
     }
   }
 
-  public List<TPipeInfo> showPipe(String pipeName) {
+  public List<TShowPipeInfo> showPipe(String pipeName) {
     boolean showAll = StringUtils.isEmpty(pipeName);
-    List<TPipeInfo> list = new ArrayList<>();
+    List<TShowPipeInfo> list = new ArrayList<>();
     // show pipe in sender
     for (PipeInfo pipe : SyncService.getInstance().getAllPipeInfos()) {
       if (showAll || pipeName.equals(pipe.getPipeName())) {
-        TPipeInfo tPipeInfo =
-            new TPipeInfo(
+        TShowPipeInfo tPipeInfo =
+            new TShowPipeInfo(
                 pipe.getCreateTime(),
                 pipe.getPipeName(),
                 SyncConstant.ROLE_SENDER,
@@ -337,8 +338,8 @@ public class SyncService implements IService {
     // show pipe in receiver
     for (TSyncIdentityInfo identityInfo : receiverManager.getAllTSyncIdentityInfos()) {
       if (showAll || pipeName.equals(identityInfo.getPipeName())) {
-        TPipeInfo tPipeInfo =
-            new TPipeInfo(
+        TShowPipeInfo tPipeInfo =
+            new TShowPipeInfo(
                 identityInfo.getCreateTime(),
                 identityInfo.getPipeName(),
                 SyncConstant.ROLE_RECEIVER,
@@ -358,37 +359,42 @@ public class SyncService implements IService {
     // show pipe in sender
     for (PipeInfo pipe : SyncService.getInstance().getAllPipeInfos()) {
       if (showAll || plan.getPipeName().equals(pipe.getPipeName())) {
-        RowRecord record = new RowRecord(0);
-        record.addField(
-            Binary.valueOf(DatetimeUtils.convertLongToDate(pipe.getCreateTime())), TSDataType.TEXT);
-        record.addField(Binary.valueOf(pipe.getPipeName()), TSDataType.TEXT);
-        record.addField(Binary.valueOf(IoTDBConstant.SYNC_SENDER_ROLE), TSDataType.TEXT);
-        record.addField(Binary.valueOf(pipe.getPipeSinkName()), TSDataType.TEXT);
-        record.addField(Binary.valueOf(pipe.getStatus().name()), TSDataType.TEXT);
-        PipeSink pipeSink = syncInfoFetcher.getPipeSink(pipe.getPipeSinkName());
-        if (pipeSink.getType() == PipeSink.PipeSinkType.ExternalPipe) { // for external pipe
-          ExtPipePluginManager extPipePluginManager =
-              SyncService.getInstance().getExternalPipeManager();
-
-          if (extPipePluginManager != null) {
-            String extPipeType = ((ExternalPipeSink) pipeSink).getExtPipeSinkTypeName();
-            ExternalPipeStatus externalPipeStatus =
-                extPipePluginManager.getExternalPipeStatus(extPipeType);
-
-            // TODO(ext-pipe): Adapting to the new syntax of SHOW PIPE
-            if (externalPipeStatus != null) {
-              record.addField(
-                  Binary.valueOf(
-                      externalPipeStatus.getWriterInvocationFailures().toString()
-                          + ";"
-                          + externalPipeStatus.getWriterStatuses().toString()),
-                  TSDataType.TEXT);
+        try {
+          RowRecord record = new RowRecord(0);
+          record.addField(
+              Binary.valueOf(DatetimeUtils.convertLongToDate(pipe.getCreateTime())),
+              TSDataType.TEXT);
+          record.addField(Binary.valueOf(pipe.getPipeName()), TSDataType.TEXT);
+          record.addField(Binary.valueOf(IoTDBConstant.SYNC_SENDER_ROLE), TSDataType.TEXT);
+          record.addField(Binary.valueOf(pipe.getPipeSinkName()), TSDataType.TEXT);
+          record.addField(Binary.valueOf(pipe.getStatus().name()), TSDataType.TEXT);
+          PipeSink pipeSink = syncInfoFetcher.getPipeSink(pipe.getPipeSinkName());
+          if (pipeSink.getType() == PipeSink.PipeSinkType.ExternalPipe) { // for external pipe
+            ExtPipePluginManager extPipePluginManager =
+                SyncService.getInstance().getExternalPipeManager();
+
+            if (extPipePluginManager != null) {
+              String extPipeType = ((ExternalPipeSink) pipeSink).getExtPipeSinkTypeName();
+              ExternalPipeStatus externalPipeStatus =
+                  extPipePluginManager.getExternalPipeStatus(extPipeType);
+
+              // TODO(ext-pipe): Adapting to the new syntax of SHOW PIPE
+              if (externalPipeStatus != null) {
+                record.addField(
+                    Binary.valueOf(
+                        externalPipeStatus.getWriterInvocationFailures().toString()
+                            + ";"
+                            + externalPipeStatus.getWriterStatuses().toString()),
+                    TSDataType.TEXT);
+              }
             }
+          } else {
+            record.addField(Binary.valueOf(pipe.getMessageType().name()), TSDataType.TEXT);
           }
-        } else {
-          record.addField(Binary.valueOf(pipe.getMessageType().name()), TSDataType.TEXT);
+          listDataSet.putRecord(record);
+        } catch (Exception e) {
+          logger.error("failed to show pipe [{}] because {}", pipe.getPipeName(), e.getMessage());
         }
-        listDataSet.putRecord(record);
       }
     }
     // show pipe in receiver
@@ -489,7 +495,7 @@ public class SyncService implements IService {
         runningPipe.close();
       } catch (PipeException e) {
         logger.warn(
-            String.format("Stop pipe %s error when stop Sender Service.", runningPipe.getName()),
+            String.format("Stop PIPE %s error when stop Sender Service.", runningPipe.getName()),
             e);
       }
     }
@@ -516,13 +522,13 @@ public class SyncService implements IService {
     return ServiceType.SYNC_SERVICE;
   }
 
-  private void recover() throws IOException, PipeException, StartupException {
+  private void recover() throws IOException, PipeException, PipeSinkException {
     PipeInfo runningPipeInfo = syncInfoFetcher.getRunningPipeInfo();
     if (runningPipeInfo == null || PipeStatus.DROP.equals(runningPipeInfo.getStatus())) {
       return;
     } else {
       this.runningPipe =
-          SyncPipeUtil.parsePipeInfoAsPipe(
+          SyncPipeUtil.parseTPipeSinkInfoAsPipeSink(
               runningPipeInfo, syncInfoFetcher.getPipeSink(runningPipeInfo.getPipeSinkName()));
       switch (runningPipeInfo.getStatus()) {
         case RUNNING:
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
new file mode 100644
index 0000000000..b21c6a23f6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.common;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.consensus.PartitionRegionId;
+import org.apache.iotdb.commons.exception.sync.PipeSinkException;
+import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.PipeMessage;
+import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
+import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeInfo;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
+import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
+import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
+import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
+import org.apache.iotdb.rpc.RpcUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/** Only fetch read request. For write request, return SUCCESS directly. */
+public class ClusterSyncInfoFetcher implements ISyncInfoFetcher {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ClusterSyncInfoFetcher.class);
+
+  private static final IClientManager<PartitionRegionId, ConfigNodeClient>
+      CONFIG_NODE_CLIENT_MANAGER =
+          new IClientManager.Factory<PartitionRegionId, ConfigNodeClient>()
+              .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+
+  // region Interfaces of PipeSink
+
+  @Override
+  public TSStatus addPipeSink(CreatePipeSinkPlan plan) {
+    return RpcUtils.SUCCESS_STATUS;
+  }
+
+  @Override
+  public TSStatus addPipeSink(CreatePipeSinkStatement createPipeSinkStatement) {
+    return RpcUtils.SUCCESS_STATUS;
+  }
+
+  @Override
+  public TSStatus dropPipeSink(String name) {
+    return RpcUtils.SUCCESS_STATUS;
+  }
+
+  @Override
+  public PipeSink getPipeSink(String name) throws PipeSinkException {
+    try (ConfigNodeClient configNodeClient =
+        CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+      TGetPipeSinkReq tGetPipeSinkReq = new TGetPipeSinkReq().setPipeSinkName(name);
+      TGetPipeSinkResp resp = configNodeClient.getPipeSink(tGetPipeSinkReq);
+      if (resp.getPipeSinkInfoList().isEmpty()) {
+        throw new PipeSinkException(
+            String.format("Failed to getPipeSink [%s] because it does not exist.", name));
+      }
+      return SyncPipeUtil.parseTPipeSinkInfoAsPipeSink(resp.getPipeSinkInfoList().get(0));
+    } catch (Exception e) {
+      LOGGER.error("Get PipeSink [{}] error because {}", name, e.getMessage(), e);
+      throw new PipeSinkException(e.getMessage());
+    }
+  }
+
+  @Override
+  public List<PipeSink> getAllPipeSinks() {
+    throw new UnsupportedOperationException();
+  }
+
+  // endregion
+
+  // region Interfaces of Pipe
+
+  @Override
+  public TSStatus addPipe(PipeInfo pipeInfo) {
+    return RpcUtils.SUCCESS_STATUS;
+  }
+
+  @Override
+  public TSStatus stopPipe(String pipeName) {
+    return RpcUtils.SUCCESS_STATUS;
+  }
+
+  @Override
+  public TSStatus startPipe(String pipeName) {
+    return RpcUtils.SUCCESS_STATUS;
+  }
+
+  @Override
+  public TSStatus dropPipe(String pipeName) {
+    return RpcUtils.SUCCESS_STATUS;
+  }
+
+  @Override
+  public List<PipeInfo> getAllPipeInfos() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public PipeInfo getRunningPipeInfo() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TSStatus recordMsg(String pipeName, long createTime, PipeMessage message) {
+    return null;
+  }
+
+  // endregion
+
+  // region singleton
+  private static class ClusterSyncInfoFetcherHolder {
+    private static final ClusterSyncInfoFetcher INSTANCE = new ClusterSyncInfoFetcher();
+
+    private ClusterSyncInfoFetcherHolder() {}
+  }
+
+  public static ClusterSyncInfoFetcher getInstance() {
+    return ClusterSyncInfoFetcher.ClusterSyncInfoFetcherHolder.INSTANCE;
+  }
+  // endregion
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
index 5266c16b4e..468456dc23 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
@@ -19,12 +19,11 @@
 package org.apache.iotdb.db.sync.common;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.sync.PipeSinkException;
 import org.apache.iotdb.commons.sync.pipe.PipeInfo;
 import org.apache.iotdb.commons.sync.pipe.PipeMessage;
 import org.apache.iotdb.commons.sync.pipesink.PipeSink;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
 
 import java.util.List;
@@ -39,17 +38,14 @@ public interface ISyncInfoFetcher {
 
   TSStatus dropPipeSink(String name);
 
-  PipeSink getPipeSink(String name);
+  PipeSink getPipeSink(String name) throws PipeSinkException;
 
   List<PipeSink> getAllPipeSinks();
   // endregion
 
   // region Interfaces of Pipe
 
-  // TODO(sync): delete this in new-standalone version
-  TSStatus addPipe(CreatePipePlan plan, long createTime);
-
-  TSStatus addPipe(CreatePipeStatement createPipeStatement, long createTime);
+  TSStatus addPipe(PipeInfo pipeInfo);
 
   TSStatus stopPipe(String pipeName);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java
index a90a67ba9a..ed56cd2520 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfo.java
@@ -25,12 +25,11 @@ import org.apache.iotdb.commons.sync.persistence.SyncLogReader;
 import org.apache.iotdb.commons.sync.persistence.SyncLogWriter;
 import org.apache.iotdb.commons.sync.pipe.PipeInfo;
 import org.apache.iotdb.commons.sync.pipe.PipeMessage;
+import org.apache.iotdb.commons.sync.pipe.PipeStatus;
 import org.apache.iotdb.commons.sync.pipe.SyncOperation;
 import org.apache.iotdb.commons.sync.pipesink.PipeSink;
 import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
 import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
 
@@ -105,33 +104,29 @@ public class LocalSyncInfo {
   // endregion
 
   // region Implement of Pipe
-  // TODO: delete this in new-standalone version
-  public void addPipe(CreatePipePlan plan, long createTime) throws PipeException, IOException {
-    if (!syncMetadata.isPipeSinkExist(plan.getPipeSinkName())) {
-      throw new PipeException(String.format("Can not find pipeSink %s.", plan.getPipeSinkName()));
-    }
-    PipeSink pipeSink = getPipeSink(plan.getPipeSinkName());
-    PipeInfo pipeInfo = SyncPipeUtil.parseCreatePipePlanAsPipeInfo(plan, pipeSink, createTime);
-    syncMetadata.addPipe(pipeInfo, pipeSink);
-    syncLogWriter.addPipe(pipeInfo);
-  }
 
-  public void addPipe(CreatePipeStatement createPipeStatement, long createTime)
-      throws PipeException, IOException {
-    if (!syncMetadata.isPipeSinkExist(createPipeStatement.getPipeSinkName())) {
-      throw new PipeException(
-          String.format("Can not find pipeSink %s.", createPipeStatement.getPipeSinkName()));
-    }
-    PipeSink pipeSink = getPipeSink(createPipeStatement.getPipeSinkName());
-    PipeInfo pipeInfo =
-        SyncPipeUtil.parseCreatePipePlanAsPipeInfo(createPipeStatement, pipeSink, createTime);
-    syncMetadata.addPipe(pipeInfo, pipeSink);
+  public void addPipe(PipeInfo pipeInfo) throws PipeException, IOException {
+    syncMetadata.checkAddPipe(pipeInfo);
+    syncMetadata.addPipe(pipeInfo);
     syncLogWriter.addPipe(pipeInfo);
   }
 
   public void operatePipe(String pipeName, SyncOperation syncOperation)
       throws PipeException, IOException {
-    syncMetadata.operatePipe(pipeName, syncOperation);
+    syncMetadata.checkIfPipeExist(pipeName);
+    switch (syncOperation) {
+      case START_PIPE:
+        syncMetadata.setPipeStatus(pipeName, PipeStatus.RUNNING);
+        break;
+      case STOP_PIPE:
+        syncMetadata.setPipeStatus(pipeName, PipeStatus.STOP);
+        break;
+      case DROP_PIPE:
+        syncMetadata.setPipeStatus(pipeName, PipeStatus.DROP);
+        break;
+      default:
+        throw new PipeException("Unknown operatorType " + syncOperation);
+    }
     syncLogWriter.operatePipe(pipeName, syncOperation);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
index 0a6700f5bc..1e13ffa9f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
@@ -27,8 +27,6 @@ import org.apache.iotdb.commons.sync.pipe.SyncOperation;
 import org.apache.iotdb.commons.sync.pipesink.PipeSink;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
-import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -94,19 +92,9 @@ public class LocalSyncInfoFetcher implements ISyncInfoFetcher {
   // region Implement of Pipe
 
   @Override
-  public TSStatus addPipe(CreatePipePlan plan, long createTime) {
+  public TSStatus addPipe(PipeInfo pipeInfo) {
     try {
-      localSyncInfo.addPipe(plan, createTime);
-      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-    } catch (PipeException | IOException e) {
-      return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
-    }
-  }
-
-  @Override
-  public TSStatus addPipe(CreatePipeStatement createPipeStatement, long createTime) {
-    try {
-      localSyncInfo.addPipe(createPipeStatement, createTime);
+      localSyncInfo.addPipe(pipeInfo);
       return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
     } catch (PipeException e) {
       return RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, e.getMessage());
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/ExternalPipeSink.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/ExternalPipeSink.java
index b926ac1bca..21c81e892b 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/ExternalPipeSink.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/ExternalPipeSink.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -132,6 +133,13 @@ public class ExternalPipeSink implements PipeSink {
     sinkParams = ReadWriteIOUtils.readMap(inputStream);
   }
 
+  @Override
+  public void deserialize(ByteBuffer buffer) {
+    pipeSinkName = ReadWriteIOUtils.readString(buffer);
+    extPipeSinkTypeName = ReadWriteIOUtils.readString(buffer);
+    sinkParams = ReadWriteIOUtils.readMap(buffer);
+  }
+
   public Map<String, String> getSinkParams() {
     return sinkParams;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java b/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java
index 9011e1c480..af7b06cde0 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/sync/SyncPipeUtil.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.sync.pipe.PipeInfo;
 import org.apache.iotdb.commons.sync.pipe.TsFilePipeInfo;
 import org.apache.iotdb.commons.sync.pipesink.IoTDBPipeSink;
 import org.apache.iotdb.commons.sync.pipesink.PipeSink;
+import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
@@ -67,47 +68,9 @@ public class SyncPipeUtil {
   }
 
   // TODO(sync): delete this in new-standalone version
-  public static Pipe parseCreatePipePlanAsPipe(
-      CreatePipePlan plan, PipeSink pipeSink, long pipeCreateTime) throws PipeException {
-    boolean syncDelOp = true;
-    for (Pair<String, String> pair : plan.getPipeAttributes()) {
-      pair.left = pair.left.toLowerCase();
-      if ("syncdelop".equals(pair.left)) {
-        syncDelOp = Boolean.parseBoolean(pair.right);
-      } else {
-        throw new PipeException(String.format("Can not recognition attribute %s", pair.left));
-      }
-    }
-
-    return new TsFilePipe(
-        pipeCreateTime, plan.getPipeName(), pipeSink, plan.getDataStartTimestamp(), syncDelOp);
-  }
-
-  public static Pipe parseCreatePipePlanAsPipe(
-      CreatePipeStatement createPipeStatement, PipeSink pipeSink, long pipeCreateTime)
+  public static PipeInfo parseCreatePipePlanAsPipeInfo(CreatePipePlan plan, long pipeCreateTime)
       throws PipeException {
     boolean syncDelOp = true;
-    for (Map.Entry<String, String> entry : createPipeStatement.getPipeAttributes().entrySet()) {
-      String attributeKey = entry.getKey().toLowerCase();
-      if ("syncdelop".equals(attributeKey)) {
-        syncDelOp = Boolean.parseBoolean(entry.getValue());
-      } else {
-        throw new PipeException(String.format("Can not recognition attribute %s", entry.getKey()));
-      }
-    }
-
-    return new TsFilePipe(
-        pipeCreateTime,
-        createPipeStatement.getPipeName(),
-        pipeSink,
-        createPipeStatement.getStartTime(),
-        syncDelOp);
-  }
-
-  // TODO(sync): delete this in new-standalone version
-  public static PipeInfo parseCreatePipePlanAsPipeInfo(
-      CreatePipePlan plan, PipeSink pipeSink, long pipeCreateTime) throws PipeException {
-    boolean syncDelOp = true;
     for (Pair<String, String> pair : plan.getPipeAttributes()) {
       pair.left = pair.left.toLowerCase();
       if ("syncdelop".equals(pair.left)) {
@@ -119,15 +82,14 @@ public class SyncPipeUtil {
 
     return new TsFilePipeInfo(
         plan.getPipeName(),
-        pipeSink.getPipeSinkName(),
+        plan.getPipeSinkName(),
         pipeCreateTime,
         plan.getDataStartTimestamp(),
         syncDelOp);
   }
 
   public static PipeInfo parseCreatePipePlanAsPipeInfo(
-      CreatePipeStatement createPipeStatement, PipeSink pipeSink, long pipeCreateTime)
-      throws PipeException {
+      CreatePipeStatement createPipeStatement, long pipeCreateTime) throws PipeException {
     boolean syncDelOp = true;
     for (Map.Entry<String, String> entry : createPipeStatement.getPipeAttributes().entrySet()) {
       String attributeKey = entry.getKey().toLowerCase();
@@ -140,14 +102,14 @@ public class SyncPipeUtil {
 
     return new TsFilePipeInfo(
         createPipeStatement.getPipeName(),
-        pipeSink.getPipeSinkName(),
+        createPipeStatement.getPipeSinkName(),
         pipeCreateTime,
         createPipeStatement.getStartTime(),
         syncDelOp);
   }
 
   /** parse PipeInfo to Pipe, ignore status */
-  public static Pipe parsePipeInfoAsPipe(PipeInfo pipeInfo, PipeSink pipeSink)
+  public static Pipe parseTPipeSinkInfoAsPipeSink(PipeInfo pipeInfo, PipeSink pipeSink)
       throws PipeException {
     if (pipeInfo instanceof TsFilePipeInfo) {
       return new TsFilePipe(
@@ -162,7 +124,8 @@ public class SyncPipeUtil {
   }
 
   /** parse TPipeSinkInfo to PipeSink */
-  public static PipeSink parsePipeInfoAsPipe(TPipeSinkInfo pipeSinkInfo) throws PipeSinkException {
+  public static PipeSink parseTPipeSinkInfoAsPipeSink(TPipeSinkInfo pipeSinkInfo)
+      throws PipeSinkException {
     if (pipeSinkInfo.getPipeSinkType().equals(PipeSink.PipeSinkType.IoTDB.name())) {
       PipeSink pipeSink = new IoTDBPipeSink(pipeSinkInfo.getPipeSinkName());
       pipeSink.setAttribute(pipeSinkInfo.getAttributes());
@@ -172,4 +135,25 @@ public class SyncPipeUtil {
       throw new UnsupportedOperationException();
     }
   }
+
+  /** parse TPipeInfo to PipeInfo */
+  public static PipeInfo parseTPipeInfoAsPipeInfo(TPipeInfo pipeInfo, long pipeCreateTime)
+      throws PipeException {
+    boolean syncDelOp = true;
+    for (Map.Entry<String, String> entry : pipeInfo.getAttributes().entrySet()) {
+      String attributeKey = entry.getKey().toLowerCase();
+      if ("syncdelop".equals(attributeKey)) {
+        syncDelOp = Boolean.parseBoolean(entry.getValue());
+      } else {
+        throw new PipeException(String.format("Can not recognition attribute %s", entry.getKey()));
+      }
+    }
+
+    return new TsFilePipeInfo(
+        pipeInfo.getPipeName(),
+        pipeInfo.getPipeSinkName(),
+        pipeCreateTime,
+        pipeInfo.getStartTime(),
+        syncDelOp);
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/LocalSyncInfoTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/LocalSyncInfoTest.java
index 878835691e..5bb762befb 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/LocalSyncInfoTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/LocalSyncInfoTest.java
@@ -22,8 +22,8 @@ import org.apache.iotdb.commons.exception.sync.PipeException;
 import org.apache.iotdb.commons.sync.pipe.PipeInfo;
 import org.apache.iotdb.commons.sync.pipe.PipeMessage;
 import org.apache.iotdb.commons.sync.pipe.SyncOperation;
+import org.apache.iotdb.commons.sync.pipe.TsFilePipeInfo;
 import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
 import org.apache.iotdb.db.sync.common.LocalSyncInfo;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -59,21 +59,21 @@ public class LocalSyncInfoTest {
       createPipeSinkPlan.addPipeSinkAttribute("ip", "127.0.0.1");
       createPipeSinkPlan.addPipeSinkAttribute("port", "6670");
       try {
-        localSyncInfo.addPipe(new CreatePipePlan(pipe1, "demo"), createdTime1);
+        localSyncInfo.addPipe(new TsFilePipeInfo(pipe1, "demo", createdTime1, 0, true));
         Assert.fail();
       } catch (PipeException e) {
         // throw exception because can not find pipeSink
       }
       localSyncInfo.addPipeSink(createPipeSinkPlan);
-      localSyncInfo.addPipe(new CreatePipePlan(pipe1, "demo"), createdTime1);
+      localSyncInfo.addPipe(new TsFilePipeInfo(pipe1, "demo", createdTime1, 0, true));
       try {
-        localSyncInfo.addPipe(new CreatePipePlan(pipe2, "demo"), createdTime2);
+        localSyncInfo.addPipe(new TsFilePipeInfo(pipe2, "demo", createdTime2, 0, true));
         Assert.fail();
       } catch (PipeException e) {
         // throw exception because only one pipe is allowed now
       }
       localSyncInfo.operatePipe(pipe1, SyncOperation.DROP_PIPE);
-      localSyncInfo.addPipe(new CreatePipePlan(pipe2, "demo"), createdTime2);
+      localSyncInfo.addPipe(new TsFilePipeInfo(pipe2, "demo", createdTime2, 0, true));
       localSyncInfo.operatePipe(pipe2, SyncOperation.STOP_PIPE);
       localSyncInfo.operatePipe(pipe2, SyncOperation.START_PIPE);
       Assert.assertEquals(1, localSyncInfo.getAllPipeSink().size());
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index d930bde8c8..4f4fc495d5 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -437,7 +437,7 @@ struct TGetPathsSetTemplatesResp {
 }
 
 // SYNC
-struct TPipeInfo {
+struct TShowPipeInfo {
   1: required i64 createTime
   2: required string pipeName
   3: required string role
@@ -446,6 +446,13 @@ struct TPipeInfo {
   6: required string message
 }
 
+struct TPipeInfo {
+    1: required string pipeName
+    2: required string pipeSinkName
+    3: required i64 startTime
+    4: optional map<string, string> attributes
+}
+
 struct TPipeSinkInfo {
   1: required string pipeSinkName
   2: required string pipeSinkType
@@ -465,9 +472,13 @@ struct TGetPipeSinkResp {
   2: required list<TPipeSinkInfo> pipeSinkInfoList
 }
 
+struct TShowPipeReq {
+  1: optional string pipeName
+}
+
 struct TShowPipeResp {
   1: required common.TSStatus status
-  2: optional list<TPipeInfo> pipeInfoList
+  2: optional list<TShowPipeInfo> pipeInfoList
 }
 
 struct TDeleteTimeSeriesReq{
@@ -828,6 +839,21 @@ service IConfigNodeRPCService {
   /** Get PipeSink by name, if name is empty, get all PipeSink */
   TGetPipeSinkResp getPipeSink(TGetPipeSinkReq req)
 
+  /** Create Pipe */
+  common.TSStatus createPipe(TPipeInfo req)
+
+  /** Start Pipe */
+  common.TSStatus startPipe(string pipeName)
+
+  /** Stop Pipe */
+  common.TSStatus stopPipe(string pipeName)
+
+  /** Drop Pipe */
+  common.TSStatus dropPipe(string pipeName)
+
+  /** Show Pipe by name, if name is empty, show all Pipe */
+  TShowPipeResp showPipe(TShowPipeReq req)
+
   // ======================================================
   // TestTools
   // ======================================================
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index 60cb2a2d43..cf35dd7faf 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -280,6 +280,16 @@ struct TDeleteTimeSeriesReq{
   2: required binary pathPatternTree
 }
 
+struct TCreatePipeOnDataNodeReq{
+  1: required binary pipeInfo
+}
+
+struct TOperatePipeOnDataNodeReq {
+    1: required string pipeName
+    // ordinal of {@linkplain SyncOperation}
+    2: required i8 operation
+}
+
 service IDataNodeRPCService {
 
   // -----------------------------------For Data Node-----------------------------------------------
@@ -519,6 +529,16 @@ service IDataNodeRPCService {
   * Delete matched timeseries and remove according schema black list in target schemRegion
   */
   common.TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req)
+
+ /**
+  * Create PIPE on DataNode
+  */
+  common.TSStatus createPipeOnDataNode(TCreatePipeOnDataNodeReq req)
+
+ /**
+  * Start, stop or drop PIPE on DataNode
+  */
+  common.TSStatus operatePipeOnDataNode(TOperatePipeOnDataNodeReq req)
 }
 
 service MPPDataExchangeService {