You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/04/18 08:47:29 UTC

[iotdb] branch master updated: [IOTDB-5727] pipe task management at config node (#9533)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 110ea55786 [IOTDB-5727] pipe task management at config node  (#9533)
110ea55786 is described below

commit 110ea55786051c23d9bd2e422f1952f337c8ac68
Author: Caideyipi <87...@users.noreply.github.com>
AuthorDate: Tue Apr 18 16:47:17 2023 +0800

    [IOTDB-5727] pipe task management at config node  (#9533)
    
    Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
 .../confignode/client/DataNodeRequestType.java     |   9 +-
 .../client/async/AsyncDataNodeClientPool.java      |  18 +-
 .../client/async/handlers/AsyncClientHandler.java  |  11 -
 .../async/handlers/rpc/OperatePipeRPCHandler.java  |  60 ----
 .../consensus/request/ConfigPhysicalPlan.java      |  56 ++--
 .../consensus/request/ConfigPhysicalPlanType.java  |  35 ++-
 .../task/CreatePipePlanV2.java}                    |  41 +--
 .../task/DropPipePlanV2.java}                      |  12 +-
 .../task/SetPipeStatusPlanV2.java}                 |  34 +--
 ...PipeSinkPlan.java => CreatePipeSinkPlanV1.java} |  10 +-
 .../{DropPipePlan.java => DropPipePlanV1.java}     |  10 +-
 ...opPipeSinkPlan.java => DropPipeSinkPlanV1.java} |  10 +-
 ...GetPipeSinkPlan.java => GetPipeSinkPlanV1.java} |  10 +-
 ...reatePipePlan.java => PreCreatePipePlanV1.java} |  10 +-
 .../request/write/sync/RecordPipeMessagePlan.java  |   4 +-
 ...ipeStatusPlan.java => SetPipeStatusPlanV1.java} |  24 +-
 .../{ShowPipePlan.java => ShowPipePlanV1.java}     |  10 +-
 .../iotdb/confignode/manager/ConfigManager.java    | 103 ++-----
 .../apache/iotdb/confignode/manager/IManager.java  |  42 +--
 .../iotdb/confignode/manager/ProcedureManager.java |  25 +-
 .../confignode/manager/RetryFailedTasksThread.java |  64 +----
 .../iotdb/confignode/manager/SyncManager.java      | 306 ---------------------
 .../iotdb/confignode/manager/load/LoadManager.java |   1 -
 .../iotdb/confignode/manager/node/NodeManager.java |   6 +-
 .../iotdb/confignode/manager/pipe/PipeManager.java |  19 +-
 .../manager/pipe/PipePluginCoordinator.java        |   8 +
 .../manager/pipe/PipeTaskCoordinator.java          |  69 +++++
 .../persistence/executor/ConfigPlanExecutor.java   |  48 ++--
 .../confignode/persistence/pipe/PipeInfo.java      |  59 +++-
 .../persistence/pipe/PipePluginInfo.java           |  45 ++-
 .../confignode/persistence/pipe/PipeTaskInfo.java  | 203 ++++++++++++++
 .../persistence/pipe/PipeTaskOperation.java        |  10 +-
 .../persistence/sync/ClusterSyncInfo.java          | 225 ---------------
 .../procedure/env/ConfigNodeProcedureEnv.java      |  25 ++
 .../pipe/plugin/CreatePipePluginProcedure.java     |  30 +-
 .../impl/pipe/plugin/DropPipePluginProcedure.java  |  24 +-
 .../pipe/task/AbstractOperatePipeProcedureV2.java  | 161 +++++++++++
 .../impl/pipe/task/CreatePipeProcedureV2.java      | 270 ++++++++++++++++++
 .../impl/pipe/task/DropPipeProcedureV2.java        | 157 +++++++++++
 .../impl/pipe/task/StartPipeProcedureV2.java       | 177 ++++++++++++
 .../impl/pipe/task/StopPipeProcedureV2.java        | 177 ++++++++++++
 .../impl/sync/AbstractOperatePipeProcedure.java    | 147 ----------
 .../procedure/impl/sync/CreatePipeProcedure.java   | 131 +--------
 .../procedure/impl/sync/DropPipeProcedure.java     |  97 +------
 .../procedure/impl/sync/StartPipeProcedure.java    | 136 +--------
 .../procedure/impl/sync/StopPipeProcedure.java     | 136 +--------
 .../task/OperatePipeTaskState.java}                |  13 +-
 .../procedure/store/ProcedureFactory.java          |  16 ++
 .../confignode/procedure/store/ProcedureType.java  |  10 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  16 +-
 .../request/ConfigPhysicalPlanSerDeTest.java       | 156 ++++++++---
 .../persistence/ClusterSyncInfoTest.java           | 164 -----------
 .../iotdb/confignode/persistence/PipeInfoTest.java |  99 +++++++
 .../CreatePipePluginProcedureTest.java             |   3 +-
 .../{ => plugin}/DropPipePluginProcedureTest.java  |   3 +-
 .../CreatePipeProcedureV2Test.java}                |  33 ++-
 .../DropPipeProcedureV2Test.java}                  |  12 +-
 .../StartPipeProcedureV2Test.java}                 |  12 +-
 .../StopPipeProcedureV2Test.java}                  |  12 +-
 .../meta/ConfigNodePipePluginMetaKeeper.java       |  26 +-
 .../pipe/plugin/meta/PipePluginMetaKeeper.java     |  22 ++
 .../pipe/task/meta/PipeConsensusGroupTaskMeta.java |  98 +++++++
 .../iotdb/commons/pipe/task/meta/PipeMeta.java     |  95 +++++++
 .../commons/pipe/task/meta/PipeMetaKeeper.java     |  97 +++++++
 .../commons/pipe/task/meta/PipeRuntimeMeta.java    | 142 ++++++++++
 .../commons/pipe/task/meta/PipeStaticMeta.java     | 170 ++++++++++++
 .../{PipeTaskMetaAccessor.java => PipeStatus.java} |  30 +-
 .../iotdb/commons/sync/pipe/SyncOperation.java     |   1 -
 .../plan/statement/crud/InsertRowStatement.java    |   2 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |  41 +--
 thrift/src/main/thrift/datanode.thrift             |   8 +-
 71 files changed, 2638 insertions(+), 1908 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index 00a9423d44..25f18a59a9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -66,10 +66,13 @@ public enum DataNodeRequestType {
   CREATE_PIPE_PLUGIN,
   DROP_PIPE_PLUGIN,
 
-  /** Sync */
-  PRE_CREATE_PIPE,
+  /** Pipe Task */
+  CREATE_PIPE,
+  /**
+   * DROP_PIPE, START_PIPE, STOP_PIPE Merge them into OPERATE_PIPE since these requests only require
+   * pipe name
+   */
   OPERATE_PIPE,
-  ROLLBACK_OPERATE_PIPE,
 
   /** CQ */
   EXECUTE_CQ,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index 5a9d51ac9b..c641cec211 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -219,6 +219,12 @@ public class AsyncDataNodeClientPool {
               (AsyncTSStatusRPCHandler)
                   clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
           break;
+        case CREATE_PIPE:
+          client.createPipeOnDataNode(
+              (TCreatePipeOnDataNodeReq) clientHandler.getRequest(requestId),
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
         case MERGE:
         case FULL_MERGE:
           client.merge(
@@ -301,24 +307,12 @@ public class AsyncDataNodeClientPool {
               (DeleteSchemaRPCHandler)
                   clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
           break;
-        case PRE_CREATE_PIPE:
-          client.createPipeOnDataNode(
-              (TCreatePipeOnDataNodeReq) clientHandler.getRequest(requestId),
-              (AsyncTSStatusRPCHandler)
-                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
-          break;
         case OPERATE_PIPE:
           client.operatePipeOnDataNode(
               (TOperatePipeOnDataNodeReq) clientHandler.getRequest(requestId),
               (AsyncTSStatusRPCHandler)
                   clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
           break;
-        case ROLLBACK_OPERATE_PIPE:
-          client.operatePipeOnDataNodeForRollback(
-              (TOperatePipeOnDataNodeReq) clientHandler.getRequest(requestId),
-              (AsyncTSStatusRPCHandler)
-                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
-          break;
         case CONSTRUCT_SCHEMA_BLACK_LIST_WITH_TEMPLATE:
           client.constructSchemaBlackListWithTemplate(
               (TConstructSchemaBlackListWithTemplateReq) clientHandler.getRequest(requestId),
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
index a5b9422ac8..b807b52c5a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.confignode.client.async.handlers.rpc.AsyncTSStatusRPCHan
 import org.apache.iotdb.confignode.client.async.handlers.rpc.CountPathsUsingTemplateRPCHandler;
 import org.apache.iotdb.confignode.client.async.handlers.rpc.DeleteSchemaRPCHandler;
 import org.apache.iotdb.confignode.client.async.handlers.rpc.FetchSchemaBlackListRPCHandler;
-import org.apache.iotdb.confignode.client.async.handlers.rpc.OperatePipeRPCHandler;
 import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateResp;
 import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
 
@@ -176,16 +175,6 @@ public class AsyncClientHandler<Q, R> {
             dataNodeLocationMap,
             (Map<Integer, TFetchSchemaBlackListResp>) responseMap,
             countDownLatch);
-      case PRE_CREATE_PIPE:
-      case OPERATE_PIPE:
-      case ROLLBACK_OPERATE_PIPE:
-        return new OperatePipeRPCHandler(
-            requestType,
-            requestId,
-            targetDataNode,
-            dataNodeLocationMap,
-            (Map<Integer, TSStatus>) responseMap,
-            countDownLatch);
       case COUNT_PATHS_USING_TEMPLATE:
         return new CountPathsUsingTemplateRPCHandler(
             requestType,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/OperatePipeRPCHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/OperatePipeRPCHandler.java
deleted file mode 100644
index 7adf508562..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/OperatePipeRPCHandler.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.client.async.handlers.rpc;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-public class OperatePipeRPCHandler extends AsyncTSStatusRPCHandler {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(OperatePipeRPCHandler.class);
-
-  public OperatePipeRPCHandler(
-      DataNodeRequestType requestType,
-      int requestId,
-      TDataNodeLocation targetDataNode,
-      Map<Integer, TDataNodeLocation> dataNodeLocationMap,
-      Map<Integer, TSStatus> responseMap,
-      CountDownLatch countDownLatch) {
-    super(requestType, requestId, targetDataNode, dataNodeLocationMap, responseMap, countDownLatch);
-  }
-
-  @Override
-  public void onError(Exception e) {
-    String errorMsg = e.getMessage() + " on DataNode: " + formattedTargetLocation;
-    LOGGER.error(errorMsg);
-
-    responseMap.put(
-        requestId,
-        new TSStatus(
-            RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg)));
-
-    // Always CountDown
-    countDownLatch.countDown();
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index aef3c33276..87e2a32258 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -77,6 +77,9 @@ import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchem
 import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
 import org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
 import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
 import org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
@@ -85,14 +88,14 @@ import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGr
 import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
 import org.apache.iotdb.confignode.consensus.request.write.region.PollRegionMaintainTaskPlan;
 import org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlanV1;
+import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipePlanV1;
+import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlanV1;
+import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlanV1;
+import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlanV1;
 import org.apache.iotdb.confignode.consensus.request.write.sync.RecordPipeMessagePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlanV1;
+import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlanV1;
 import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
 import org.apache.iotdb.confignode.consensus.request.write.template.DropSchemaTemplatePlan;
 import org.apache.iotdb.confignode.consensus.request.write.template.PreUnsetSchemaTemplatePlan;
@@ -339,30 +342,39 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest {
         case UpdateRegionLocation:
           plan = new UpdateRegionLocationPlan();
           break;
-        case CreatePipeSink:
-          plan = new CreatePipeSinkPlan();
+        case CreatePipeSinkV1:
+          plan = new CreatePipeSinkPlanV1();
           break;
-        case DropPipeSink:
-          plan = new DropPipeSinkPlan();
+        case DropPipeSinkV1:
+          plan = new DropPipeSinkPlanV1();
           break;
-        case GetPipeSink:
-          plan = new GetPipeSinkPlan();
+        case GetPipeSinkV1:
+          plan = new GetPipeSinkPlanV1();
           break;
-        case PreCreatePipe:
-          plan = new PreCreatePipePlan();
+        case PreCreatePipeV1:
+          plan = new PreCreatePipePlanV1();
           break;
-        case SetPipeStatus:
-          plan = new SetPipeStatusPlan();
+        case SetPipeStatusV1:
+          plan = new SetPipeStatusPlanV1();
           break;
-        case DropPipe:
-          plan = new DropPipePlan();
+        case DropPipeV1:
+          plan = new DropPipePlanV1();
           break;
-        case ShowPipe:
-          plan = new ShowPipePlan();
+        case ShowPipeV1:
+          plan = new ShowPipePlanV1();
           break;
-        case RecordPipeMessage:
+        case RecordPipeMessageV1:
           plan = new RecordPipeMessagePlan();
           break;
+        case CreatePipeV2:
+          plan = new CreatePipePlanV2();
+          break;
+        case SetPipeStatusV2:
+          plan = new SetPipeStatusPlanV2();
+          break;
+        case DropPipeV2:
+          plan = new DropPipePlanV2();
+          break;
         case GetRegionId:
           plan = new GetRegionIdPlan();
           break;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index b391c3c086..32efc881a7 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -114,15 +114,23 @@ public enum ConfigPhysicalPlanType {
   UnsetTemplate((short) 810),
   DropSchemaTemplate((short) 811),
 
-  /** Sync */
-  CreatePipeSink((short) 900),
-  DropPipeSink((short) 901),
-  GetPipeSink((short) 902),
-  PreCreatePipe((short) 903),
-  SetPipeStatus((short) 904),
-  DropPipe((short) 905),
-  ShowPipe((short) 906),
-  RecordPipeMessage((short) 907),
+  /** Deprecated types for sync, restored them for upgrade */
+  @Deprecated
+  CreatePipeSinkV1((short) 900),
+  @Deprecated
+  DropPipeSinkV1((short) 901),
+  @Deprecated
+  GetPipeSinkV1((short) 902),
+  @Deprecated
+  PreCreatePipeV1((short) 903),
+  @Deprecated
+  SetPipeStatusV1((short) 904),
+  @Deprecated
+  DropPipeV1((short) 905),
+  @Deprecated
+  ShowPipeV1((short) 906),
+  @Deprecated
+  RecordPipeMessageV1((short) 907),
 
   /** Trigger */
   AddTriggerInTable((short) 1000),
@@ -158,7 +166,14 @@ public enum ConfigPhysicalPlanType {
 
   /** Quota */
   setSpaceQuota((short) 1400),
-  setThrottleQuota((short) 1401);
+  setThrottleQuota((short) 1401),
+
+  /** Pipe Task */
+  CreatePipeV2((short) 1500),
+  /** START PIPE & STOP PIPE */
+  SetPipeStatusV2((short) 1501),
+  DropPipeV2((short) 1502),
+  ;
 
   private final short planType;
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/RecordPipeMessagePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/CreatePipePlanV2.java
similarity index 56%
copy from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/RecordPipeMessagePlan.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/CreatePipePlanV2.java
index d29aa92a97..c37f77cdb1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/RecordPipeMessagePlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/CreatePipePlanV2.java
@@ -16,10 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.consensus.request.write.sync;
 
-import org.apache.iotdb.commons.sync.pipe.PipeMessage;
-import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
+package org.apache.iotdb.confignode.consensus.request.write.pipe.task;
+
+import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
 
@@ -27,39 +28,39 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-public class RecordPipeMessagePlan extends ConfigPhysicalPlan {
+public class CreatePipePlanV2 extends ConfigPhysicalPlan {
 
-  private String pipeName;
-  private PipeMessage pipeMessage;
+  private PipeStaticMeta pipeStaticMeta;
+  private PipeRuntimeMeta pipeRuntimeMeta;
 
-  public RecordPipeMessagePlan() {
-    super(ConfigPhysicalPlanType.RecordPipeMessage);
+  public CreatePipePlanV2() {
+    super(ConfigPhysicalPlanType.CreatePipeV2);
   }
 
-  public RecordPipeMessagePlan(String pipeName, PipeMessage pipeMessage) {
-    this();
-    this.pipeName = pipeName;
-    this.pipeMessage = pipeMessage;
+  public CreatePipePlanV2(PipeStaticMeta pipeStaticMeta, PipeRuntimeMeta pipeRuntimeMeta) {
+    super(ConfigPhysicalPlanType.CreatePipeV2);
+    this.pipeStaticMeta = pipeStaticMeta;
+    this.pipeRuntimeMeta = pipeRuntimeMeta;
   }
 
-  public String getPipeName() {
-    return pipeName;
+  public PipeStaticMeta getPipeStaticMeta() {
+    return pipeStaticMeta;
   }
 
-  public PipeMessage getPipeMessage() {
-    return pipeMessage;
+  public PipeRuntimeMeta getPipeRuntimeMeta() {
+    return pipeRuntimeMeta;
   }
 
   @Override
   protected void serializeImpl(DataOutputStream stream) throws IOException {
     stream.writeShort(getType().getPlanType());
-    BasicStructureSerDeUtil.write(pipeName, stream);
-    pipeMessage.serialize(stream);
+    pipeStaticMeta.serialize(stream);
+    pipeRuntimeMeta.serialize(stream);
   }
 
   @Override
   protected void deserializeImpl(ByteBuffer buffer) throws IOException {
-    pipeName = BasicStructureSerDeUtil.readString(buffer);
-    pipeMessage = PipeMessage.deserialize(buffer);
+    pipeStaticMeta = PipeStaticMeta.deserialize(buffer);
+    pipeRuntimeMeta = PipeRuntimeMeta.deserialize(buffer);
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/DropPipePlanV2.java
similarity index 84%
copy from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipePlan.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/DropPipePlanV2.java
index 96f0dd9f53..15b0ea8440 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipePlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/DropPipePlanV2.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.consensus.request.write.sync;
+package org.apache.iotdb.confignode.consensus.request.write.pipe.task;
 
 import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
@@ -26,16 +26,16 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-public class DropPipePlan extends ConfigPhysicalPlan {
+public class DropPipePlanV2 extends ConfigPhysicalPlan {
 
   private String pipeName;
 
-  public DropPipePlan() {
-    super(ConfigPhysicalPlanType.DropPipe);
+  public DropPipePlanV2() {
+    super(ConfigPhysicalPlanType.DropPipeV2);
   }
 
-  public DropPipePlan(String pipeName) {
-    this();
+  public DropPipePlanV2(String pipeName) {
+    super(ConfigPhysicalPlanType.DropPipeV2);
     this.pipeName = pipeName;
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/SetPipeStatusPlanV2.java
similarity index 68%
copy from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlan.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/SetPipeStatusPlanV2.java
index 999ba3aea5..d785204688 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/SetPipeStatusPlanV2.java
@@ -16,9 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.consensus.request.write.sync;
 
-import org.apache.iotdb.commons.sync.pipe.PipeStatus;
+package org.apache.iotdb.confignode.consensus.request.write.pipe.task;
+
+import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -27,46 +28,39 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-public class SetPipeStatusPlan extends ConfigPhysicalPlan {
+public class SetPipeStatusPlanV2 extends ConfigPhysicalPlan {
+
   private String pipeName;
-  private PipeStatus pipeStatus;
+  private PipeStatus status;
 
-  public SetPipeStatusPlan() {
-    super(ConfigPhysicalPlanType.SetPipeStatus);
+  public SetPipeStatusPlanV2() {
+    super(ConfigPhysicalPlanType.SetPipeStatusV2);
   }
 
-  public SetPipeStatusPlan(String pipeName, PipeStatus pipeStatus) {
-    this();
+  public SetPipeStatusPlanV2(String pipeName, PipeStatus status) {
+    super(ConfigPhysicalPlanType.SetPipeStatusV2);
     this.pipeName = pipeName;
-    this.pipeStatus = pipeStatus;
+    this.status = status;
   }
 
   public String getPipeName() {
     return pipeName;
   }
 
-  public void setPipeName(String pipeName) {
-    this.pipeName = pipeName;
-  }
-
   public PipeStatus getPipeStatus() {
-    return pipeStatus;
-  }
-
-  public void setPipeStatus(PipeStatus pipeStatus) {
-    this.pipeStatus = pipeStatus;
+    return status;
   }
 
   @Override
   protected void serializeImpl(DataOutputStream stream) throws IOException {
     stream.writeShort(getType().getPlanType());
     ReadWriteIOUtils.write(pipeName, stream);
-    ReadWriteIOUtils.write((byte) pipeStatus.ordinal(), stream);
+    ReadWriteIOUtils.write(status.getType(), stream);
   }
 
   @Override
   protected void deserializeImpl(ByteBuffer buffer) throws IOException {
     pipeName = ReadWriteIOUtils.readString(buffer);
-    pipeStatus = PipeStatus.values()[ReadWriteIOUtils.readByte(buffer)];
+    status = PipeStatus.getPipeStatus(ReadWriteIOUtils.readByte(buffer));
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/CreatePipeSinkPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/CreatePipeSinkPlanV1.java
similarity index 87%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/CreatePipeSinkPlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/CreatePipeSinkPlanV1.java
index 1729f96765..d357ccf8a3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/CreatePipeSinkPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/CreatePipeSinkPlanV1.java
@@ -27,15 +27,17 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-public class CreatePipeSinkPlan extends ConfigPhysicalPlan {
+// Deprecated, restored for upgrade
+@Deprecated
+public class CreatePipeSinkPlanV1 extends ConfigPhysicalPlan {
 
   private TPipeSinkInfo pipeSinkInfo;
 
-  public CreatePipeSinkPlan() {
-    super(ConfigPhysicalPlanType.CreatePipeSink);
+  public CreatePipeSinkPlanV1() {
+    super(ConfigPhysicalPlanType.CreatePipeSinkV1);
   }
 
-  public CreatePipeSinkPlan(TPipeSinkInfo pipeSinkInfo) {
+  public CreatePipeSinkPlanV1(TPipeSinkInfo pipeSinkInfo) {
     this();
     this.pipeSinkInfo = pipeSinkInfo;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipePlanV1.java
similarity index 88%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipePlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipePlanV1.java
index 96f0dd9f53..7dbcde6d9f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipePlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipePlanV1.java
@@ -26,15 +26,17 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-public class DropPipePlan extends ConfigPhysicalPlan {
+// Deprecated, restored for upgrade
+@Deprecated
+public class DropPipePlanV1 extends ConfigPhysicalPlan {
 
   private String pipeName;
 
-  public DropPipePlan() {
-    super(ConfigPhysicalPlanType.DropPipe);
+  public DropPipePlanV1() {
+    super(ConfigPhysicalPlanType.DropPipeV1);
   }
 
-  public DropPipePlan(String pipeName) {
+  public DropPipePlanV1(String pipeName) {
     this();
     this.pipeName = pipeName;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipeSinkPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipeSinkPlanV1.java
similarity index 87%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipeSinkPlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipeSinkPlanV1.java
index 9ac2c09f6c..f50a8e1cd1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipeSinkPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/DropPipeSinkPlanV1.java
@@ -26,15 +26,17 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-public class DropPipeSinkPlan extends ConfigPhysicalPlan {
+// Deprecated, restored for upgrade
+@Deprecated
+public class DropPipeSinkPlanV1 extends ConfigPhysicalPlan {
 
   private String pipeSinkName;
 
-  public DropPipeSinkPlan() {
-    super(ConfigPhysicalPlanType.DropPipeSink);
+  public DropPipeSinkPlanV1() {
+    super(ConfigPhysicalPlanType.DropPipeSinkV1);
   }
 
-  public DropPipeSinkPlan(String pipeSinkName) {
+  public DropPipeSinkPlanV1(String pipeSinkName) {
     this();
     this.pipeSinkName = pipeSinkName;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlanV1.java
similarity index 87%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlanV1.java
index 7014725438..26c4d2860b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/GetPipeSinkPlanV1.java
@@ -26,15 +26,17 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-public class GetPipeSinkPlan extends ConfigPhysicalPlan {
+// Deprecated, restored for upgrade
+@Deprecated
+public class GetPipeSinkPlanV1 extends ConfigPhysicalPlan {
   /** empty pipeSinkName means get all PIPESINK */
   private String pipeSinkName;
 
-  public GetPipeSinkPlan() {
-    super(ConfigPhysicalPlanType.GetPipeSink);
+  public GetPipeSinkPlanV1() {
+    super(ConfigPhysicalPlanType.GetPipeSinkV1);
   }
 
-  public GetPipeSinkPlan(String pipeSinkName) {
+  public GetPipeSinkPlanV1(String pipeSinkName) {
     this();
     this.pipeSinkName = pipeSinkName;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/PreCreatePipePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/PreCreatePipePlanV1.java
similarity index 86%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/PreCreatePipePlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/PreCreatePipePlanV1.java
index 34665947e9..24c3125fc1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/PreCreatePipePlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/PreCreatePipePlanV1.java
@@ -26,15 +26,17 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-public class PreCreatePipePlan extends ConfigPhysicalPlan {
+// Deprecated, restored for upgrade
+@Deprecated
+public class PreCreatePipePlanV1 extends ConfigPhysicalPlan {
 
   private PipeInfo pipeInfo;
 
-  public PreCreatePipePlan() {
-    super(ConfigPhysicalPlanType.PreCreatePipe);
+  public PreCreatePipePlanV1() {
+    super(ConfigPhysicalPlanType.PreCreatePipeV1);
   }
 
-  public PreCreatePipePlan(PipeInfo pipeInfo) {
+  public PreCreatePipePlanV1(PipeInfo pipeInfo) {
     this();
     this.pipeInfo = pipeInfo;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/RecordPipeMessagePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/RecordPipeMessagePlan.java
index d29aa92a97..3fe6b8c8fa 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/RecordPipeMessagePlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/RecordPipeMessagePlan.java
@@ -27,13 +27,15 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+// Deprecated, restored for upgrade
+@Deprecated
 public class RecordPipeMessagePlan extends ConfigPhysicalPlan {
 
   private String pipeName;
   private PipeMessage pipeMessage;
 
   public RecordPipeMessagePlan() {
-    super(ConfigPhysicalPlanType.RecordPipeMessage);
+    super(ConfigPhysicalPlanType.RecordPipeMessageV1);
   }
 
   public RecordPipeMessagePlan(String pipeName, PipeMessage pipeMessage) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlanV1.java
similarity index 81%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlanV1.java
index 999ba3aea5..4aa0f9668e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/SetPipeStatusPlanV1.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.confignode.consensus.request.write.sync;
 
 import org.apache.iotdb.commons.sync.pipe.PipeStatus;
@@ -27,36 +28,31 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-public class SetPipeStatusPlan extends ConfigPhysicalPlan {
+// Deprecated, restored for upgrade
+@Deprecated
+public class SetPipeStatusPlanV1 extends ConfigPhysicalPlan {
   private String pipeName;
+
   private PipeStatus pipeStatus;
 
-  public SetPipeStatusPlan() {
-    super(ConfigPhysicalPlanType.SetPipeStatus);
+  public SetPipeStatusPlanV1() {
+    super(ConfigPhysicalPlanType.SetPipeStatusV1);
   }
 
-  public SetPipeStatusPlan(String pipeName, PipeStatus pipeStatus) {
-    this();
+  public SetPipeStatusPlanV1(String pipeName, PipeStatus status) {
+    super(ConfigPhysicalPlanType.SetPipeStatusV1);
     this.pipeName = pipeName;
-    this.pipeStatus = pipeStatus;
+    this.pipeStatus = status;
   }
 
   public String getPipeName() {
     return pipeName;
   }
 
-  public void setPipeName(String pipeName) {
-    this.pipeName = pipeName;
-  }
-
   public PipeStatus getPipeStatus() {
     return pipeStatus;
   }
 
-  public void setPipeStatus(PipeStatus pipeStatus) {
-    this.pipeStatus = pipeStatus;
-  }
-
   @Override
   protected void serializeImpl(DataOutputStream stream) throws IOException {
     stream.writeShort(getType().getPlanType());
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/ShowPipePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/ShowPipePlanV1.java
similarity index 88%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/ShowPipePlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/ShowPipePlanV1.java
index cc01a7d53a..0acf6c277d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/ShowPipePlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/sync/ShowPipePlanV1.java
@@ -26,15 +26,17 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-public class ShowPipePlan extends ConfigPhysicalPlan {
+// Deprecated, restored for upgrade
+@Deprecated
+public class ShowPipePlanV1 extends ConfigPhysicalPlan {
   /** empty pipeName means show all PIPE */
   private String pipeName;
 
-  public ShowPipePlan() {
-    super(ConfigPhysicalPlanType.ShowPipe);
+  public ShowPipePlanV1() {
+    super(ConfigPhysicalPlanType.ShowPipeV1);
   }
 
-  public ShowPipePlan(String pipeName) {
+  public ShowPipePlanV1(String pipeName) {
     this();
     this.pipeName = pipeName;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 96ea55f134..69cf4a447f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -39,7 +39,6 @@ import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.commons.service.metric.MetricService;
-import org.apache.iotdb.commons.sync.pipe.PipeMessage;
 import org.apache.iotdb.commons.utils.AuthUtils;
 import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.commons.utils.StatusUtils;
@@ -66,8 +65,6 @@ import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
 import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
 import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
 import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
 import org.apache.iotdb.confignode.consensus.response.auth.PermissionInfoResp;
 import org.apache.iotdb.confignode.consensus.response.database.CountDatabaseResp;
@@ -104,7 +101,6 @@ import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
 import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
 import org.apache.iotdb.confignode.persistence.quota.QuotaInfo;
 import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
-import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TClusterParameters;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
@@ -134,8 +130,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
-import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
-import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
@@ -230,8 +224,6 @@ public class ConfigManager implements IManager {
 
   /** Manage Trigger. */
   private final TriggerManager triggerManager;
-  /** Sync. */
-  private final SyncManager syncManager;
 
   /** CQ. */
   private final CQManager cqManager;
@@ -260,7 +252,6 @@ public class ConfigManager implements IManager {
     ProcedureInfo procedureInfo = new ProcedureInfo();
     UDFInfo udfInfo = new UDFInfo();
     TriggerInfo triggerInfo = new TriggerInfo();
-    ClusterSyncInfo syncInfo = new ClusterSyncInfo();
     CQInfo cqInfo = new CQInfo();
     ModelInfo modelInfo = new ModelInfo();
     PipeInfo pipeInfo = new PipeInfo();
@@ -276,7 +267,6 @@ public class ConfigManager implements IManager {
             procedureInfo,
             udfInfo,
             triggerInfo,
-            syncInfo,
             cqInfo,
             modelInfo,
             pipeInfo,
@@ -291,7 +281,6 @@ public class ConfigManager implements IManager {
     this.procedureManager = new ProcedureManager(this, procedureInfo);
     this.udfManager = new UDFManager(this, udfInfo);
     this.triggerManager = new TriggerManager(this, triggerInfo);
-    this.syncManager = new SyncManager(this, syncInfo);
     this.cqManager = new CQManager(this);
     this.loadManager = new LoadManager(this);
     this.modelManager = new ModelManager(this, modelInfo);
@@ -931,10 +920,6 @@ public class ConfigManager implements IManager {
   }
 
   @Override
-  public SyncManager getSyncManager() {
-    return syncManager;
-  }
-
   public ModelManager getModelManager() {
     return modelManager;
   }
@@ -1569,83 +1554,36 @@ public class ConfigManager implements IManager {
     }
   }
 
-  @Override
-  public TSStatus createPipeSink(CreatePipeSinkPlan plan) {
-    TSStatus status = confirmLeader();
-    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      return syncManager.createPipeSink(plan);
-    } else {
-      return status;
-    }
-  }
-
-  @Override
-  public TSStatus dropPipeSink(DropPipeSinkPlan plan) {
-    TSStatus status = confirmLeader();
-    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      return syncManager.dropPipeSink(plan);
-    } else {
-      return status;
-    }
-  }
-
-  @Override
-  public TGetPipeSinkResp getPipeSink(TGetPipeSinkReq req) {
-    TSStatus status = confirmLeader();
-    TGetPipeSinkResp resp = new TGetPipeSinkResp();
-    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      return syncManager.getPipeSink(req.getPipeSinkName());
-    } else {
-      return resp.setStatus(status);
-    }
-  }
-
   @Override
   public TSStatus createPipe(TCreatePipeReq req) {
     TSStatus status = confirmLeader();
-    LOGGER.info("createPipe: {}", req);
-    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      // TODO: Implement PipeManager
-      return status;
-    } else {
-      return status;
-    }
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        ? pipeManager.getPipeTaskCoordinator().createPipe(req)
+        : status;
   }
 
   @Override
   public TSStatus startPipe(String pipeName) {
     TSStatus status = confirmLeader();
-    LOGGER.info("startPipe: {}", pipeName);
-    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      // TODO: Implement PipeManager
-      return status;
-    } else {
-      return status;
-    }
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        ? pipeManager.getPipeTaskCoordinator().startPipe(pipeName)
+        : status;
   }
 
   @Override
   public TSStatus stopPipe(String pipeName) {
     TSStatus status = confirmLeader();
-    LOGGER.info("stopPipe: {}", pipeName);
-    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      // TODO: Implement PipeManager
-      return status;
-    } else {
-      return status;
-    }
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        ? pipeManager.getPipeTaskCoordinator().stopPipe(pipeName)
+        : status;
   }
 
   @Override
   public TSStatus dropPipe(String pipeName) {
     TSStatus status = confirmLeader();
-    LOGGER.info("dropPipe: {}", pipeName);
-    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      // TODO: Implement PipeManager
-      return status;
-    } else {
-      return status;
-    }
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        ? pipeManager.getPipeTaskCoordinator().dropPipe(pipeName)
+        : status;
   }
 
   @Override
@@ -1664,22 +1602,17 @@ public class ConfigManager implements IManager {
   @Override
   public TGetAllPipeInfoResp getAllPipeInfo() {
     TSStatus status = confirmLeader();
-    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      return syncManager.getAllPipeInfo();
-    } else {
-      return new TGetAllPipeInfoResp().setStatus(status);
-    }
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        ? pipeManager.getPipeTaskCoordinator().showPipes()
+        : new TGetAllPipeInfoResp().setStatus(status);
   }
 
   @Override
   public TSStatus recordPipeMessage(TRecordPipeMessageReq req) {
     TSStatus status = confirmLeader();
-    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      return syncManager.recordPipeMessage(
-          req.getPipeName(), PipeMessage.deserialize(ByteBuffer.wrap(req.getMessage())));
-    } else {
-      return status;
-    }
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        ? pipeManager.getPipeTaskCoordinator().recordPipeMessage(req)
+        : status;
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 3d6490db57..9726b93b2d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -41,8 +41,6 @@ import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaRep
 import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
 import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
 import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.manager.cq.CQManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
@@ -76,8 +74,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
-import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
-import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
@@ -168,13 +164,6 @@ public interface IManager {
    */
   TriggerManager getTriggerManager();
 
-  /**
-   * Get SyncManager
-   *
-   * @return SyncManager instance
-   */
-  SyncManager getSyncManager();
-
   /**
    * Get ProcedureManager
    *
@@ -189,6 +178,13 @@ public interface IManager {
    */
   CQManager getCQManager();
 
+  /**
+   * Get ModelManager
+   *
+   * @return ModelManager instance
+   */
+  ModelManager getModelManager();
+
   /**
    * Get PipeManager
    *
@@ -549,30 +545,6 @@ public interface IManager {
    */
   TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req);
 
-  /**
-   * Create PipeSink
-   *
-   * @param plan Info about PipeSink
-   * @return TSStatus
-   */
-  TSStatus createPipeSink(CreatePipeSinkPlan plan);
-
-  /**
-   * Drop PipeSink
-   *
-   * @param plan Name of PipeSink
-   * @return TSStatus
-   */
-  TSStatus dropPipeSink(DropPipeSinkPlan plan);
-
-  /**
-   * Get PipeSink by name. If pipeSinkName is empty, get all PipeSinks.
-   *
-   * @param req specify the pipeSinkName
-   * @return TGetPipeSinkResp contains the PipeSink
-   */
-  TGetPipeSinkResp getPipeSink(TGetPipeSinkReq req);
-
   /**
    * Create Pipe
    *
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 63a78d36c1..6b03eea066 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.commons.exception.sync.PipeException;
 import org.apache.iotdb.commons.model.ModelInformation;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
@@ -54,16 +53,16 @@ import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure
 import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
 import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
 import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
+import org.apache.iotdb.confignode.procedure.impl.pipe.task.CreatePipeProcedureV2;
+import org.apache.iotdb.confignode.procedure.impl.pipe.task.DropPipeProcedureV2;
+import org.apache.iotdb.confignode.procedure.impl.pipe.task.StartPipeProcedureV2;
+import org.apache.iotdb.confignode.procedure.impl.pipe.task.StopPipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.impl.schema.DeactivateTemplateProcedure;
 import org.apache.iotdb.confignode.procedure.impl.schema.DeleteDatabaseProcedure;
 import org.apache.iotdb.confignode.procedure.impl.schema.DeleteTimeSeriesProcedure;
 import org.apache.iotdb.confignode.procedure.impl.schema.UnsetTemplateProcedure;
 import org.apache.iotdb.confignode.procedure.impl.statemachine.CreateRegionGroupsProcedure;
 import org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure;
-import org.apache.iotdb.confignode.procedure.impl.sync.CreatePipeProcedure;
-import org.apache.iotdb.confignode.procedure.impl.sync.DropPipeProcedure;
-import org.apache.iotdb.confignode.procedure.impl.sync.StartPipeProcedure;
-import org.apache.iotdb.confignode.procedure.impl.sync.StopPipeProcedure;
 import org.apache.iotdb.confignode.procedure.impl.trigger.CreateTriggerProcedure;
 import org.apache.iotdb.confignode.procedure.impl.trigger.DropTriggerProcedure;
 import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
@@ -624,7 +623,7 @@ public class ProcedureManager {
 
   public TSStatus createPipe(TCreatePipeReq req) {
     try {
-      long procedureId = executor.submitProcedure(new CreatePipeProcedure(req));
+      long procedureId = executor.submitProcedure(new CreatePipeProcedureV2(req));
       List<TSStatus> statusList = new ArrayList<>();
       boolean isSucceed =
           waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
@@ -634,14 +633,14 @@ public class ProcedureManager {
         return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
             .setMessage(statusList.get(0).getMessage());
       }
-    } catch (PipeException e) {
+    } catch (Exception e) {
       return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
     }
   }
 
   public TSStatus startPipe(String pipeName) {
     try {
-      long procedureId = executor.submitProcedure(new StartPipeProcedure(pipeName));
+      long procedureId = executor.submitProcedure(new StartPipeProcedureV2(pipeName));
       List<TSStatus> statusList = new ArrayList<>();
       boolean isSucceed =
           waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
@@ -651,14 +650,14 @@ public class ProcedureManager {
         return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
             .setMessage(statusList.get(0).getMessage());
       }
-    } catch (PipeException e) {
+    } catch (Exception e) {
       return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
     }
   }
 
   public TSStatus stopPipe(String pipeName) {
     try {
-      long procedureId = executor.submitProcedure(new StopPipeProcedure(pipeName));
+      long procedureId = executor.submitProcedure(new StopPipeProcedureV2(pipeName));
       List<TSStatus> statusList = new ArrayList<>();
       boolean isSucceed =
           waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
@@ -668,14 +667,14 @@ public class ProcedureManager {
         return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
             .setMessage(statusList.get(0).getMessage());
       }
-    } catch (PipeException e) {
+    } catch (Exception e) {
       return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
     }
   }
 
   public TSStatus dropPipe(String pipeName) {
     try {
-      long procedureId = executor.submitProcedure(new DropPipeProcedure(pipeName));
+      long procedureId = executor.submitProcedure(new DropPipeProcedureV2(pipeName));
       List<TSStatus> statusList = new ArrayList<>();
       boolean isSucceed =
           waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
@@ -685,7 +684,7 @@ public class ProcedureManager {
         return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
             .setMessage(statusList.get(0).getMessage());
       }
-    } catch (PipeException e) {
+    } catch (Exception e) {
       return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
     }
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
index 97666a005b..c9ffdd67b8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
@@ -23,35 +23,26 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
-import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 /**
- * The RetryFailedTasksThread executed periodically to retry failed tasks in Trigger, Sync, Template
- * and CQ
+ * The RetryFailedTasksThread executed periodically to retry failed tasks in Trigger, Template and
+ * CQ
  */
 public class RetryFailedTasksThread {
 
@@ -69,10 +60,6 @@ public class RetryFailedTasksThread {
   /** Trigger */
   private final Set<TDataNodeLocation> oldUnknownNodes;
 
-  /** Sync */
-  private final Map<Integer, Queue<TOperatePipeOnDataNodeReq>> messageMap =
-      new ConcurrentHashMap<>();
-
   public RetryFailedTasksThread(IManager configManager) {
     this.configManager = configManager;
     this.nodeManager = configManager.getNodeManager();
@@ -108,9 +95,6 @@ public class RetryFailedTasksThread {
   private void retryFailedTasks() {
     // trigger
     triggerDetectTask();
-
-    // sync
-    syncDetectTask();
   }
 
   /**
@@ -148,48 +132,4 @@ public class RetryFailedTasksThread {
       }
     }
   }
-
-  public void retryRollbackReq(List<Integer> dataNodeIds, TOperatePipeOnDataNodeReq req) {
-    for (int id : dataNodeIds) {
-      messageMap.computeIfAbsent(id, i -> new LinkedList<>()).add(req);
-    }
-  }
-
-  /**
-   * The syncDetectTask executed periodically to roll back the failed requests in operating pipe.
-   */
-  private void syncDetectTask() {
-    for (Map.Entry<Integer, Queue<TOperatePipeOnDataNodeReq>> entry : messageMap.entrySet()) {
-      int dataNodeId = entry.getKey();
-      if (NodeStatus.Running.equals(nodeManager.getNodeStatusByNodeId(dataNodeId))) {
-        final Map<Integer, TDataNodeLocation> dataNodeLocationMap = new HashMap<>();
-        dataNodeLocationMap.put(
-            dataNodeId, nodeManager.getRegisteredDataNodeLocations().get(dataNodeId));
-        TOperatePipeOnDataNodeReq request;
-        while ((request = entry.getValue().peek()) != null) {
-          AsyncClientHandler<TOperatePipeOnDataNodeReq, TSStatus> clientHandler =
-              new AsyncClientHandler<>(
-                  DataNodeRequestType.ROLLBACK_OPERATE_PIPE, request, dataNodeLocationMap);
-          AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
-          TSStatus tsStatus = clientHandler.getResponseList().get(0);
-          if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-            entry.getValue().poll();
-          } else if (tsStatus.getCode() == TSStatusCode.PIPE_ERROR.getStatusCode()) {
-            // skip
-            LOGGER.warn(
-                String.format(
-                    "Roll back failed because %s. Skip this roll back request [%s].",
-                    tsStatus.getMessage(), request));
-          } else {
-            // connection failure, keep and retry.
-            LOGGER.error(
-                String.format(
-                    "Roll back failed because %s. This roll back request [%s] will be retried later.",
-                    tsStatus.getMessage(), request));
-            break;
-          }
-        }
-      }
-    }
-  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java
deleted file mode 100644
index dc2c7bdd95..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.manager;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.commons.exception.sync.PipeException;
-import org.apache.iotdb.commons.exception.sync.PipeSinkException;
-import org.apache.iotdb.commons.exception.sync.PipeSinkNotExistException;
-import org.apache.iotdb.commons.sync.pipe.PipeInfo;
-import org.apache.iotdb.commons.sync.pipe.PipeMessage;
-import org.apache.iotdb.commons.sync.pipe.PipeStatus;
-import org.apache.iotdb.commons.sync.pipe.SyncOperation;
-import org.apache.iotdb.commons.sync.pipesink.PipeSink;
-import org.apache.iotdb.commons.utils.StatusUtils;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
-import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.RecordPipeMessagePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
-import org.apache.iotdb.confignode.consensus.response.pipe.PipeResp;
-import org.apache.iotdb.confignode.consensus.response.pipe.PipeSinkResp;
-import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
-import org.apache.iotdb.confignode.manager.node.NodeManager;
-import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent;
-import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
-import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
-import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
-import org.apache.iotdb.mpp.rpc.thrift.TCreatePipeOnDataNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import com.google.common.eventbus.AllowConcurrentEvents;
-import com.google.common.eventbus.Subscribe;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-
-public class SyncManager {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(SyncManager.class);
-
-  private final IManager configManager;
-  private final ClusterSyncInfo clusterSyncInfo;
-
-  public SyncManager(IManager configManager, ClusterSyncInfo clusterSyncInfo) {
-    this.configManager = configManager;
-    this.clusterSyncInfo = clusterSyncInfo;
-  }
-
-  public void lockSyncMetadata() {
-    clusterSyncInfo.lockSyncMetadata();
-  }
-
-  public void unlockSyncMetadata() {
-    clusterSyncInfo.unlockSyncMetadata();
-  }
-
-  // ======================================================
-  // region Implement of PipeSink
-  // ======================================================
-
-  public TSStatus createPipeSink(CreatePipeSinkPlan plan) {
-    try {
-      clusterSyncInfo.checkAddPipeSink(plan);
-      return getConsensusManager().write(plan).getStatus();
-    } catch (PipeSinkException e) {
-      LOGGER.error(e.getMessage());
-      return new TSStatus(TSStatusCode.CREATE_PIPE_SINK_ERROR.getStatusCode())
-          .setMessage(e.getMessage());
-    }
-  }
-
-  public TSStatus dropPipeSink(DropPipeSinkPlan plan) {
-    try {
-      clusterSyncInfo.checkDropPipeSink(plan.getPipeSinkName());
-      return getConsensusManager().write(plan).getStatus();
-    } catch (PipeSinkException e) {
-      LOGGER.error(e.getMessage());
-      return new TSStatus(TSStatusCode.CREATE_PIPE_SINK_ERROR.getStatusCode())
-          .setMessage(e.getMessage());
-    }
-  }
-
-  public TGetPipeSinkResp getPipeSink(String pipeSinkName) {
-    GetPipeSinkPlan getPipeSinkPlan = new GetPipeSinkPlan(pipeSinkName);
-    PipeSinkResp pipeSinkResp =
-        (PipeSinkResp) getConsensusManager().read(getPipeSinkPlan).getDataset();
-    TGetPipeSinkResp resp = new TGetPipeSinkResp();
-    resp.setStatus(pipeSinkResp.getStatus());
-    if (pipeSinkResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      resp.setPipeSinkInfoList(
-          pipeSinkResp.getPipeSinkList().stream()
-              .map(PipeSink::getTPipeSinkInfo)
-              .collect(Collectors.toList()));
-    }
-    return resp;
-  }
-
-  // endregion
-
-  // ======================================================
-  // region Implement of Pipe
-  // ======================================================
-
-  public void checkAddPipe(PipeInfo pipeInfo) throws PipeException, PipeSinkNotExistException {
-    clusterSyncInfo.checkAddPipe(pipeInfo);
-  }
-
-  public TSStatus preCreatePipe(PipeInfo pipeInfo) {
-    pipeInfo.setStatus(PipeStatus.PARTIAL_CREATE);
-    return getConsensusManager().write(new PreCreatePipePlan(pipeInfo)).getStatus();
-  }
-
-  public TSStatus setPipeStatus(String pipeName, PipeStatus pipeStatus) {
-    return getConsensusManager().write(new SetPipeStatusPlan(pipeName, pipeStatus)).getStatus();
-  }
-
-  public TSStatus dropPipe(String pipeName) {
-    return getConsensusManager().write(new DropPipePlan(pipeName)).getStatus();
-  }
-
-  public TSStatus recordPipeMessage(String pipeName, PipeMessage pipeMessage) {
-    return getConsensusManager()
-        .write(new RecordPipeMessagePlan(pipeName, pipeMessage))
-        .getStatus();
-  }
-
-  public TShowPipeResp showPipe(String pipeName) {
-    ShowPipePlan showPipePlan = new ShowPipePlan(pipeName);
-    PipeResp pipeResp = (PipeResp) getConsensusManager().read(showPipePlan).getDataset();
-    TShowPipeResp resp = new TShowPipeResp();
-    resp.setStatus(pipeResp.getStatus());
-    if (pipeResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      resp.setPipeInfoList(
-          pipeResp.getPipeInfoList().stream()
-              .map(PipeInfo::getTShowPipeInfo)
-              .collect(Collectors.toList()));
-    }
-    return resp;
-  }
-
-  public PipeInfo getPipeInfo(String pipeName) throws PipeException {
-    return clusterSyncInfo.getPipeInfo(pipeName);
-  }
-
-  public TGetAllPipeInfoResp getAllPipeInfo() {
-    try {
-      // Should lock SyncMetadata to block operation PIPE procedure
-      lockSyncMetadata();
-      TGetAllPipeInfoResp resp = new TGetAllPipeInfoResp();
-      resp.setStatus(StatusUtils.OK);
-      resp.setAllPipeInfo(
-          clusterSyncInfo.getAllPipeInfos().stream()
-              .map(PipeInfo::serializeToByteBuffer)
-              .collect(Collectors.toList()));
-      return resp;
-    } finally {
-      unlockSyncMetadata();
-    }
-  }
-
-  /**
-   * Broadcast DataNodes to operate PIPE operation.
-   *
-   * @param pipeName name of PIPE
-   * @param operation only support {@link SyncOperation#START_PIPE}, {@link SyncOperation#STOP_PIPE}
-   *     and {@link SyncOperation#DROP_PIPE}
-   * @return Map key is DataNodeId and value is TSStatus
-   */
-  public Map<Integer, TSStatus> operatePipeOnDataNodes(String pipeName, SyncOperation operation) {
-    NodeManager nodeManager = configManager.getNodeManager();
-    final Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
-    nodeManager
-        .filterDataNodeThroughStatus(NodeStatus.Running)
-        .forEach(
-            dataNodeConfiguration ->
-                dataNodeLocationMap.put(
-                    dataNodeConfiguration.getLocation().getDataNodeId(),
-                    dataNodeConfiguration.getLocation()));
-    final TOperatePipeOnDataNodeReq request =
-        new TOperatePipeOnDataNodeReq(pipeName, (byte) operation.ordinal());
-
-    AsyncClientHandler<TOperatePipeOnDataNodeReq, TSStatus> clientHandler =
-        new AsyncClientHandler<>(DataNodeRequestType.OPERATE_PIPE, request, dataNodeLocationMap);
-    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
-
-    return clientHandler.getResponseMap();
-  }
-
-  /**
-   * Broadcast DataNodes to operate PIPE operation for roll back procedure.
-   *
-   * @param pipeName name of PIPE
-   * @param createTime specifies the version of the pipe that needs to be rolled back to avoid
-   *     concurrent errors caused by retry requests
-   * @param operation only support {@link SyncOperation#START_PIPE}, {@link SyncOperation#STOP_PIPE}
-   *     and {@link SyncOperation#DROP_PIPE}
-   * @param dataNodeIds target DataNodeId set
-   */
-  public void operatePipeOnDataNodesForRollback(
-      String pipeName, long createTime, SyncOperation operation, Set<Integer> dataNodeIds) {
-    NodeManager nodeManager = configManager.getNodeManager();
-    final Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
-    nodeManager.filterDataNodeThroughStatus(NodeStatus.Running).stream()
-        .filter(
-            dataNodeConfiguration ->
-                dataNodeIds.contains(dataNodeConfiguration.getLocation().dataNodeId))
-        .forEach(
-            dataNodeConfiguration ->
-                dataNodeLocationMap.put(
-                    dataNodeConfiguration.getLocation().getDataNodeId(),
-                    dataNodeConfiguration.getLocation()));
-    final TOperatePipeOnDataNodeReq request =
-        new TOperatePipeOnDataNodeReq(pipeName, (byte) operation.ordinal())
-            .setCreateTime(createTime);
-
-    AsyncClientHandler<TOperatePipeOnDataNodeReq, TSStatus> clientHandler =
-        new AsyncClientHandler<>(
-            DataNodeRequestType.ROLLBACK_OPERATE_PIPE, request, dataNodeLocationMap);
-    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
-
-    List<Integer> failedRollbackDataNodeId = new ArrayList<>();
-    for (Map.Entry<Integer, TSStatus> responseEntry : clientHandler.getResponseMap().entrySet()) {
-      if (responseEntry.getValue().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        failedRollbackDataNodeId.add(responseEntry.getKey());
-      }
-    }
-    configManager.getRetryFailedTasksThread().retryRollbackReq(failedRollbackDataNodeId, request);
-  }
-
-  /**
-   * Broadcast DataNodes to pre create PIPE
-   *
-   * @param pipeInfo pipeInfo
-   * @return Map key is DataNodeId and value is TSStatus
-   */
-  public Map<Integer, TSStatus> preCreatePipeOnDataNodes(PipeInfo pipeInfo) {
-    NodeManager nodeManager = configManager.getNodeManager();
-    final Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
-    nodeManager
-        .filterDataNodeThroughStatus(NodeStatus.Running)
-        .forEach(
-            dataNodeConfiguration ->
-                dataNodeLocationMap.put(
-                    dataNodeConfiguration.getLocation().getDataNodeId(),
-                    dataNodeConfiguration.getLocation()));
-    final TCreatePipeOnDataNodeReq request =
-        new TCreatePipeOnDataNodeReq(pipeInfo.serializeToByteBuffer());
-
-    AsyncClientHandler<TCreatePipeOnDataNodeReq, TSStatus> clientHandler =
-        new AsyncClientHandler<>(DataNodeRequestType.PRE_CREATE_PIPE, request, dataNodeLocationMap);
-    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
-
-    return clientHandler.getResponseMap();
-  }
-
-  /**
-   * When some Nodes' states changed during a heartbeat loop, the eventbus in LoadManager will post
-   * the different NodeStatstics event to SyncManager and ClusterSchemaManager.
-   *
-   * @param nodeStatisticsEvent nodeStatistics that changed in a heartbeat loop
-   */
-  @Subscribe
-  @AllowConcurrentEvents
-  public void handleNodeStatistics(NodeStatisticsEvent nodeStatisticsEvent) {
-    // TODO
-  }
-
-  // endregion
-
-  private ConsensusManager getConsensusManager() {
-    return configManager.getConsensusManager();
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index a19c942d25..932f4039cc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -105,7 +105,6 @@ public class LoadManager {
     this.routeBalancer = new RouteBalancer(configManager);
 
     eventBus.register(configManager.getClusterSchemaManager());
-    eventBus.register(configManager.getSyncManager());
   }
 
   /**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 97125db05a..bcf116737b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -233,7 +233,8 @@ public class NodeManager {
   }
 
   private TRuntimeConfiguration getRuntimeConfiguration() {
-    getPipeManager().getPipePluginCoordinator().getPipePluginInfo().acquirePipePluginInfoLock();
+    getPipeManager().getPipePluginCoordinator().lock();
+    getPipeManager().getPipeTaskCoordinator().lock();
     getTriggerManager().getTriggerInfo().acquireTriggerTableLock();
     getUDFManager().getUdfInfo().acquireUDFTableLock();
 
@@ -252,7 +253,8 @@ public class NodeManager {
     } finally {
       getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
       getUDFManager().getUdfInfo().releaseUDFTableLock();
-      getPipeManager().getPipePluginCoordinator().getPipePluginInfo().releasePipePluginInfoLock();
+      getPipeManager().getPipeTaskCoordinator().unlock();
+      getPipeManager().getPipePluginCoordinator().unlock();
     }
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java
index dbe95bab31..648d9eaff2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java
@@ -22,21 +22,30 @@ package org.apache.iotdb.confignode.manager.pipe;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public class PipeManager {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(PipeManager.class);
-
   private final PipePluginCoordinator pipePluginCoordinator;
 
+  private final PipeTaskCoordinator pipeTaskCoordinator;
+
+  private final PipeInfo pipeInfo;
+
   public PipeManager(ConfigManager configManager, PipeInfo pipeInfo) {
     this.pipePluginCoordinator =
         new PipePluginCoordinator(configManager, pipeInfo.getPipePluginInfo());
+    this.pipeTaskCoordinator = new PipeTaskCoordinator(configManager, pipeInfo.getPipeTaskInfo());
+    this.pipeInfo = pipeInfo;
   }
 
   public PipePluginCoordinator getPipePluginCoordinator() {
     return pipePluginCoordinator;
   }
+
+  public PipeTaskCoordinator getPipeTaskCoordinator() {
+    return pipeTaskCoordinator;
+  }
+
+  public PipeInfo getPipeInfo() {
+    return pipeInfo;
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
index 9e45a41dce..91da899463 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
@@ -55,6 +55,14 @@ public class PipePluginCoordinator {
     return pipePluginInfo;
   }
 
+  public void lock() {
+    pipePluginInfo.acquirePipePluginInfoLock();
+  }
+
+  public void unlock() {
+    pipePluginInfo.releasePipePluginInfoLock();
+  }
+
   public TSStatus createPipePlugin(TCreatePipePluginReq req) {
     final String pluginName = req.getPluginName().toUpperCase();
     final String className = req.getClassName();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
new file mode 100644
index 0000000000..908bd643f3
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.pipe;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
+
+public class PipeTaskCoordinator {
+
+  private final ConfigManager configManager;
+  private final PipeTaskInfo pipeTaskInfo;
+
+  public PipeTaskCoordinator(ConfigManager configManager, PipeTaskInfo pipeTaskInfo) {
+    this.configManager = configManager;
+    this.pipeTaskInfo = pipeTaskInfo;
+  }
+
+  public void lock() {
+    pipeTaskInfo.acquirePipeTaskInfoLock();
+  }
+
+  public void unlock() {
+    pipeTaskInfo.releasePipeTaskInfoLock();
+  }
+
+  public TSStatus createPipe(TCreatePipeReq req) {
+    return configManager.getProcedureManager().createPipe(req);
+  }
+
+  public TSStatus startPipe(String pipeName) {
+    return configManager.getProcedureManager().startPipe(pipeName);
+  }
+
+  public TSStatus stopPipe(String pipeName) {
+    return configManager.getProcedureManager().stopPipe(pipeName);
+  }
+
+  public TSStatus dropPipe(String pipeName) {
+    return configManager.getProcedureManager().dropPipe(pipeName);
+  }
+
+  public TGetAllPipeInfoResp showPipes() {
+    throw new UnsupportedOperationException("Not implemented yet");
+  }
+
+  public TSStatus recordPipeMessage(TRecordPipeMessageReq req) {
+    throw new UnsupportedOperationException("Not implemented yet");
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 954d7489a9..1721eef36e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -76,6 +76,9 @@ import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchem
 import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
 import org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
 import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
 import org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
@@ -83,14 +86,6 @@ import org.apache.iotdb.confignode.consensus.request.write.quota.SetThrottleQuot
 import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
 import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
 import org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.RecordPipeMessagePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
 import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
 import org.apache.iotdb.confignode.consensus.request.write.template.DropSchemaTemplatePlan;
 import org.apache.iotdb.confignode.consensus.request.write.template.PreUnsetSchemaTemplatePlan;
@@ -115,7 +110,6 @@ import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
 import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
 import org.apache.iotdb.confignode.persistence.quota.QuotaInfo;
 import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
-import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
 import org.apache.iotdb.consensus.common.DataSet;
@@ -157,7 +151,6 @@ public class ConfigPlanExecutor {
   private final UDFInfo udfInfo;
 
   private final TriggerInfo triggerInfo;
-  private final ClusterSyncInfo syncInfo;
 
   private final CQInfo cqInfo;
 
@@ -175,7 +168,6 @@ public class ConfigPlanExecutor {
       ProcedureInfo procedureInfo,
       UDFInfo udfInfo,
       TriggerInfo triggerInfo,
-      ClusterSyncInfo syncInfo,
       CQInfo cqInfo,
       ModelInfo modelInfo,
       PipeInfo pipeInfo,
@@ -201,9 +193,6 @@ public class ConfigPlanExecutor {
     this.udfInfo = udfInfo;
     this.snapshotProcessorList.add(udfInfo);
 
-    this.syncInfo = syncInfo;
-    this.snapshotProcessorList.add(syncInfo);
-
     this.cqInfo = cqInfo;
     this.snapshotProcessorList.add(cqInfo);
 
@@ -258,10 +247,6 @@ public class ConfigPlanExecutor {
         return clusterSchemaInfo.getAllTemplateSetInfo();
       case GetTemplateSetInfo:
         return clusterSchemaInfo.getTemplateSetInfo((GetTemplateSetInfoPlan) req);
-      case GetPipeSink:
-        return syncInfo.getPipeSink((GetPipeSinkPlan) req);
-      case ShowPipe:
-        return syncInfo.showPipe((ShowPipePlan) req);
       case GetTriggerTable:
         return triggerInfo.getTriggerTable((GetTriggerTablePlan) req);
       case GetTriggerLocation:
@@ -394,18 +379,12 @@ public class ConfigPlanExecutor {
         return clusterSchemaInfo.unsetSchemaTemplate((UnsetSchemaTemplatePlan) physicalPlan);
       case DropSchemaTemplate:
         return clusterSchemaInfo.dropSchemaTemplate((DropSchemaTemplatePlan) physicalPlan);
-      case CreatePipeSink:
-        return syncInfo.addPipeSink((CreatePipeSinkPlan) physicalPlan);
-      case DropPipeSink:
-        return syncInfo.dropPipeSink((DropPipeSinkPlan) physicalPlan);
-      case PreCreatePipe:
-        return syncInfo.preCreatePipe((PreCreatePipePlan) physicalPlan);
-      case SetPipeStatus:
-        return syncInfo.setPipeStatus((SetPipeStatusPlan) physicalPlan);
-      case DropPipe:
-        return syncInfo.dropPipe((DropPipePlan) physicalPlan);
-      case RecordPipeMessage:
-        return syncInfo.recordPipeMessage((RecordPipeMessagePlan) physicalPlan);
+      case CreatePipeV2:
+        return pipeInfo.getPipeTaskInfo().createPipe((CreatePipePlanV2) physicalPlan);
+      case SetPipeStatusV2:
+        return pipeInfo.getPipeTaskInfo().setPipeStatus((SetPipeStatusPlanV2) physicalPlan);
+      case DropPipeV2:
+        return pipeInfo.getPipeTaskInfo().dropPipe((DropPipePlanV2) physicalPlan);
       case ADD_CQ:
         return cqInfo.addCQ((AddCQPlan) physicalPlan);
       case DROP_CQ:
@@ -426,6 +405,15 @@ public class ConfigPlanExecutor {
         return pipeInfo.getPipePluginInfo().createPipePlugin((CreatePipePluginPlan) physicalPlan);
       case DropPipePlugin:
         return pipeInfo.getPipePluginInfo().dropPipePlugin((DropPipePluginPlan) physicalPlan);
+      case CreatePipeSinkV1:
+      case DropPipeV1:
+      case DropPipeSinkV1:
+      case GetPipeSinkV1:
+      case PreCreatePipeV1:
+      case RecordPipeMessageV1:
+      case SetPipeStatusV1:
+      case ShowPipeV1:
+        return new TSStatus(TSStatusCode.INCOMPATIBLE_VERSION.getStatusCode());
       case setSpaceQuota:
         return quotaInfo.setSpaceQuota((SetSpaceQuotaPlan) physicalPlan);
       case setThrottleQuota:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
index c881b0a646..733dc4c776 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
@@ -23,26 +23,81 @@ import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Objects;
 
 public class PipeInfo implements SnapshotProcessor {
 
   private final PipePluginInfo pipePluginInfo;
+  private final PipeTaskInfo pipeTaskInfo;
 
   public PipeInfo() throws IOException {
     pipePluginInfo = new PipePluginInfo();
+    pipeTaskInfo = new PipeTaskInfo();
   }
 
   public PipePluginInfo getPipePluginInfo() {
     return pipePluginInfo;
   }
 
+  public PipeTaskInfo getPipeTaskInfo() {
+    return pipeTaskInfo;
+  }
+
+  /////////////////////////////////  SnapshotProcessor  /////////////////////////////////
+
   @Override
   public boolean processTakeSnapshot(File snapshotDir) throws IOException {
-    return pipePluginInfo.processTakeSnapshot(snapshotDir);
+    pipeTaskInfo.acquirePipeTaskInfoLock();
+    pipePluginInfo.acquirePipePluginInfoLock();
+    try {
+      return pipeTaskInfo.processTakeSnapshot(snapshotDir)
+          && pipePluginInfo.processTakeSnapshot(snapshotDir);
+    } finally {
+      pipePluginInfo.releasePipePluginInfoLock();
+      pipeTaskInfo.releasePipeTaskInfoLock();
+    }
   }
 
   @Override
   public void processLoadSnapshot(File snapshotDir) throws IOException {
-    pipePluginInfo.processLoadSnapshot(snapshotDir);
+    pipeTaskInfo.acquirePipeTaskInfoLock();
+    pipePluginInfo.acquirePipePluginInfoLock();
+    try {
+      pipeTaskInfo.processLoadSnapshot(snapshotDir);
+      pipePluginInfo.processLoadSnapshot(snapshotDir);
+    } finally {
+      pipePluginInfo.releasePipePluginInfoLock();
+      pipeTaskInfo.releasePipeTaskInfoLock();
+    }
+  }
+
+  /////////////////////////////////  equals & hashCode  /////////////////////////////////
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    PipeInfo pipeInfo = (PipeInfo) o;
+    return Objects.equals(pipePluginInfo, pipeInfo.pipePluginInfo)
+        && Objects.equals(pipeTaskInfo, pipeInfo.pipeTaskInfo);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(pipePluginInfo, pipeTaskInfo);
+  }
+
+  @Override
+  public String toString() {
+    return "PipeInfo{"
+        + "pipePluginInfo="
+        + pipePluginInfo
+        + ", pipeTaskInfo="
+        + pipeTaskInfo
+        + '}';
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 4a56f79328..0b47dccb6c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePip
 import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
 import org.apache.iotdb.confignode.consensus.response.pipe.plugin.PipePluginTableResp;
 import org.apache.iotdb.confignode.consensus.response.udf.JarResp;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.pipe.api.exception.PipeManagementException;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -48,6 +49,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.locks.ReentrantLock;
 
 public class PipePluginInfo implements SnapshotProcessor {
@@ -110,6 +112,11 @@ public class PipePluginInfo implements SnapshotProcessor {
     return !pipePluginMetaKeeper.containsJar(jarName);
   }
 
+  public boolean checkBeforeCreatePipe(TCreatePipeReq createPipeRequest) {
+    // TODO: validate the plugins in the create pipe Req.
+    throw new UnsupportedOperationException("Not implemented yet");
+  }
+
   /////////////////////////////// Pipe Plugin Management ///////////////////////////////
 
   public TSStatus createPipePlugin(CreatePipePluginPlan physicalPlan) {
@@ -185,11 +192,8 @@ public class PipePluginInfo implements SnapshotProcessor {
       return false;
     }
 
-    acquirePipePluginInfoLock();
     try (FileOutputStream fileOutputStream = new FileOutputStream(snapshotFile)) {
       pipePluginMetaKeeper.processTakeSnapshot(fileOutputStream);
-    } finally {
-      releasePipePluginInfoLock();
     }
     return true;
   }
@@ -204,11 +208,40 @@ public class PipePluginInfo implements SnapshotProcessor {
       return;
     }
 
-    acquirePipePluginInfoLock();
     try (FileInputStream fileInputStream = new FileInputStream(snapshotFile)) {
       pipePluginMetaKeeper.processLoadSnapshot(fileInputStream);
-    } finally {
-      releasePipePluginInfoLock();
     }
   }
+
+  /////////////////////////////// hashCode & equals ///////////////////////////////
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(pipePluginMetaKeeper, pipePluginExecutableManager);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    PipePluginInfo other = (PipePluginInfo) obj;
+    return Objects.equals(pipePluginExecutableManager, other.pipePluginExecutableManager)
+        && Objects.equals(pipePluginMetaKeeper, other.pipePluginMetaKeeper);
+  }
+
+  @Override
+  public String toString() {
+    return "PipePluginInfo [pipePluginMetaKeeper="
+        + pipePluginMetaKeeper
+        + ", pipePluginExecutableManager="
+        + pipePluginExecutableManager
+        + "]";
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
new file mode 100644
index 0000000000..9ea25320ff
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.persistence.pipe;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class PipeTaskInfo implements SnapshotProcessor {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskInfo.class);
+  private static final String SNAPSHOT_FILE_NAME = "pipe_task_info.bin";
+
+  private final ReentrantLock pipeTaskInfoLock = new ReentrantLock();
+
+  private final PipeMetaKeeper pipeMetaKeeper;
+
+  public PipeTaskInfo() {
+    this.pipeMetaKeeper = new PipeMetaKeeper();
+  }
+
+  /////////////////////////////// Lock ///////////////////////////////
+
+  public void acquirePipeTaskInfoLock() {
+    pipeTaskInfoLock.lock();
+  }
+
+  public void releasePipeTaskInfoLock() {
+    pipeTaskInfoLock.unlock();
+  }
+
+  /////////////////////////////// Validator ///////////////////////////////
+
+  public boolean checkBeforeCreatePipe(TCreatePipeReq createPipeRequest) {
+    if (!isPipeExisted(createPipeRequest.getPipeName())) {
+      return true;
+    }
+
+    LOGGER.info(
+        String.format(
+            "Failed to create pipe [%s], the pipe with the same name has been created",
+            createPipeRequest.getPipeName()));
+    return false;
+  }
+
+  public boolean checkBeforeStartPipe(String pipeName) {
+    if (!isPipeExisted(pipeName)) {
+      LOGGER.info(String.format("Failed to start pipe [%s], the pipe does not exist", pipeName));
+      return false;
+    }
+
+    if (getPipeStatus(pipeName) == PipeStatus.RUNNING) {
+      LOGGER.info(
+          String.format("Failed to start pipe [%s], the pipe is already running", pipeName));
+      return false;
+    }
+
+    return true;
+  }
+
+  public boolean checkBeforeStopPipe(String pipeName) {
+    if (!isPipeExisted(pipeName)) {
+      LOGGER.info(String.format("Failed to stop pipe [%s], the pipe does not exist", pipeName));
+      return false;
+    }
+
+    if (getPipeStatus(pipeName) == PipeStatus.STOPPED) {
+      LOGGER.info(String.format("Failed to stop pipe [%s], the pipe is already stop", pipeName));
+      return false;
+    }
+
+    return true;
+  }
+
+  public boolean checkBeforeDropPipe(String pipeName) {
+    if (isPipeExisted(pipeName)) {
+      return true;
+    }
+
+    LOGGER.info(String.format("Failed to drop pipe [%s], the pipe does not exist", pipeName));
+    return false;
+  }
+
+  private boolean isPipeExisted(String pipeName) {
+    return pipeMetaKeeper.containsPipeMeta(pipeName);
+  }
+
+  private PipeStatus getPipeStatus(String pipeName) {
+    return pipeMetaKeeper.getPipeMeta(pipeName).getRuntimeMeta().getStatus().get();
+  }
+
+  /////////////////////////////// Pipe Task Management ///////////////////////////////
+
+  public TSStatus createPipe(CreatePipePlanV2 plan) {
+    pipeMetaKeeper.addPipeMeta(
+        plan.getPipeStaticMeta().getPipeName(),
+        new PipeMeta(plan.getPipeStaticMeta(), plan.getPipeRuntimeMeta()));
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
+  public TSStatus setPipeStatus(SetPipeStatusPlanV2 plan) {
+    pipeMetaKeeper
+        .getPipeMeta(plan.getPipeName())
+        .getRuntimeMeta()
+        .getStatus()
+        .set(plan.getPipeStatus());
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
+  public TSStatus dropPipe(DropPipePlanV2 plan) {
+    pipeMetaKeeper.removePipeMeta(plan.getPipeName());
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
+  /////////////////////////////// Snapshot ///////////////////////////////
+
+  @Override
+  public boolean processTakeSnapshot(File snapshotDir) throws IOException {
+    File snapshotFile = new File(snapshotDir, SNAPSHOT_FILE_NAME);
+    if (snapshotFile.exists() && snapshotFile.isFile()) {
+      LOGGER.error(
+          "Failed to take snapshot, because snapshot file [{}] is already exist.",
+          snapshotFile.getAbsolutePath());
+      return false;
+    }
+
+    try (FileOutputStream fileOutputStream = new FileOutputStream(snapshotFile)) {
+      pipeMetaKeeper.processTakeSnapshot(fileOutputStream);
+    }
+    return true;
+  }
+
+  @Override
+  public void processLoadSnapshot(File snapshotDir) throws IOException {
+    File snapshotFile = new File(snapshotDir, SNAPSHOT_FILE_NAME);
+    if (!snapshotFile.exists() || !snapshotFile.isFile()) {
+      LOGGER.error(
+          "Failed to load snapshot,snapshot file [{}] is not exist.",
+          snapshotFile.getAbsolutePath());
+      return;
+    }
+
+    try (FileInputStream fileInputStream = new FileInputStream(snapshotFile)) {
+      pipeMetaKeeper.processLoadSnapshot(fileInputStream);
+    }
+  }
+
+  /////////////////////////////// hashCode & equals ///////////////////////////////
+
+  @Override
+  public int hashCode() {
+    return pipeMetaKeeper.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    PipeTaskInfo other = (PipeTaskInfo) obj;
+    return pipeMetaKeeper.equals(other.pipeMetaKeeper);
+  }
+
+  @Override
+  public String toString() {
+    return pipeMetaKeeper.toString();
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskOperation.java
similarity index 84%
copy from node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskOperation.java
index 04653122c2..ee42bc2462 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskOperation.java
@@ -17,6 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.pipe.task.meta;
+package org.apache.iotdb.confignode.persistence.pipe;
 
-public class PipeTaskMetaAccessor {}
+public enum PipeTaskOperation {
+  CREATE_PIPE,
+  START_PIPE,
+  STOP_PIPE,
+  DROP_PIPE,
+  ;
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java
deleted file mode 100644
index 43ecde07c1..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/sync/ClusterSyncInfo.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.persistence.sync;
-
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.sync.PipeException;
-import org.apache.iotdb.commons.exception.sync.PipeNotExistException;
-import org.apache.iotdb.commons.exception.sync.PipeSinkException;
-import org.apache.iotdb.commons.exception.sync.PipeSinkNotExistException;
-import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
-import org.apache.iotdb.commons.sync.metadata.SyncMetadata;
-import org.apache.iotdb.commons.sync.pipe.PipeInfo;
-import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.RecordPipeMessagePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
-import org.apache.iotdb.confignode.consensus.response.pipe.PipeResp;
-import org.apache.iotdb.confignode.consensus.response.pipe.PipeSinkResp;
-import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.locks.ReentrantLock;
-
-public class ClusterSyncInfo implements SnapshotProcessor {
-
-  protected static final Logger LOGGER = LoggerFactory.getLogger(ClusterSyncInfo.class);
-
-  private final SyncMetadata syncMetadata;
-
-  private final ReentrantLock syncMetadataLock = new ReentrantLock();
-
-  public ClusterSyncInfo() {
-    syncMetadata = new SyncMetadata();
-  }
-  // ======================================================
-  // region Implement of PipeSink
-  // ======================================================
-
-  /**
-   * Check PipeSink before create operation
-   *
-   * @param createPipeSinkPlan createPipeSinkPlan
-   * @throws PipeSinkException if there is PipeSink with the same name exists or attributes is
-   *     unsupported
-   */
-  public void checkAddPipeSink(CreatePipeSinkPlan createPipeSinkPlan) throws PipeSinkException {
-    // check no exist
-    syncMetadata.checkPipeSinkNoExist(createPipeSinkPlan.getPipeSinkInfo().getPipeSinkName());
-    // check attributes
-    SyncPipeUtil.parseTPipeSinkInfoAsPipeSink(createPipeSinkPlan.getPipeSinkInfo());
-  }
-
-  public TSStatus addPipeSink(CreatePipeSinkPlan plan) {
-    TSStatus status = new TSStatus();
-    try {
-      syncMetadata.addPipeSink(SyncPipeUtil.parseTPipeSinkInfoAsPipeSink(plan.getPipeSinkInfo()));
-      status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    } catch (PipeSinkException e) {
-      LOGGER.error("failed to execute CreatePipeSinkPlan {} on ClusterSyncInfo", plan, e);
-      status.setCode(TSStatusCode.CREATE_PIPE_SINK_ERROR.getStatusCode());
-      LOGGER.error(e.getMessage());
-    }
-    return status;
-  }
-
-  /**
-   * Check PipeSink before drop operation
-   *
-   * @param pipeSinkName name
-   * @throws PipeSinkException if PipeSink is being used or does not exist
-   */
-  public void checkDropPipeSink(String pipeSinkName) throws PipeSinkException {
-    syncMetadata.checkDropPipeSink(pipeSinkName);
-  }
-
-  public TSStatus dropPipeSink(DropPipeSinkPlan plan) {
-    syncMetadata.dropPipeSink(plan.getPipeSinkName());
-    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-  }
-
-  public PipeSinkResp getPipeSink(GetPipeSinkPlan plan) {
-    PipeSinkResp resp = new PipeSinkResp();
-    if (StringUtils.isEmpty(plan.getPipeSinkName())) {
-      resp.setPipeSinkList(syncMetadata.getAllPipeSink());
-    } else {
-      resp.setPipeSinkList(
-          Collections.singletonList(syncMetadata.getPipeSink(plan.getPipeSinkName())));
-    }
-    resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
-    return resp;
-  }
-
-  // endregion
-
-  // ======================================================
-  // region Implement of Pipe
-  // ======================================================
-
-  /**
-   * Check Pipe before create operation
-   *
-   * @param pipeInfo pipe info
-   * @throws PipeException if there is Pipe with the same name exists or PipeSink does not exist
-   */
-  public void checkAddPipe(PipeInfo pipeInfo) throws PipeException, PipeSinkNotExistException {
-    syncMetadata.checkAddPipe(pipeInfo);
-  }
-
-  public TSStatus preCreatePipe(PreCreatePipePlan physicalPlan) {
-    syncMetadata.addPipe(physicalPlan.getPipeInfo());
-    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-  }
-
-  public TSStatus setPipeStatus(SetPipeStatusPlan physicalPlan) {
-    syncMetadata.setPipeStatus(physicalPlan.getPipeName(), physicalPlan.getPipeStatus());
-    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-  }
-
-  public TSStatus dropPipe(DropPipePlan physicalPlan) {
-    syncMetadata.dropPipe(physicalPlan.getPipeName());
-    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-  }
-
-  public TSStatus recordPipeMessage(RecordPipeMessagePlan physicalPlan) {
-    syncMetadata.changePipeMessage(
-        physicalPlan.getPipeName(), physicalPlan.getPipeMessage().getType());
-    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-  }
-
-  public PipeResp showPipe(ShowPipePlan plan) {
-    PipeResp resp = new PipeResp();
-    if (StringUtils.isEmpty(plan.getPipeName())) {
-      // show all
-      resp.setPipeInfoList(syncMetadata.getAllPipeInfos());
-    } else {
-      // show specific pipe
-      resp.setPipeInfoList(Collections.singletonList(syncMetadata.getPipeInfo(plan.getPipeName())));
-    }
-    resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
-    return resp;
-  }
-
-  /**
-   * Get PipeInfo by pipeName. Check before start, stop and drop operation
-   *
-   * @param pipeName pipe name
-   * @throws PipeNotExistException if there is Pipe does not exist
-   */
-  public PipeInfo getPipeInfo(String pipeName) throws PipeNotExistException {
-    PipeInfo pipeInfo = syncMetadata.getPipeInfo(pipeName);
-    if (pipeInfo == null) {
-      throw new PipeNotExistException(pipeName);
-    }
-    return pipeInfo;
-  }
-
-  public List<PipeInfo> getAllPipeInfos() {
-    return syncMetadata.getAllPipeInfos();
-  }
-
-  // endregion
-
-  // ======================================================
-  // region Implement of Lock and Unlock
-  // ======================================================
-
-  public void lockSyncMetadata() {
-    LOGGER.info("Lock SyncMetadata");
-    syncMetadataLock.lock();
-    LOGGER.info("Acquire SyncMetadata lock");
-  }
-
-  public void unlockSyncMetadata() {
-    LOGGER.info("Unlock SyncMetadata");
-    syncMetadataLock.unlock();
-  }
-
-  // endregion
-
-  // ======================================================
-  // region Implement of Snapshot
-  // ======================================================
-
-  @Override
-  public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException {
-    return syncMetadata.processTakeSnapshot(snapshotDir);
-  }
-
-  @Override
-  public void processLoadSnapshot(File snapshotDir) throws TException, IOException {
-    syncMetadata.processLoadSnapshot(snapshotDir);
-  }
-
-  // endregion
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index efa53d330d..d722d37f0d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
 import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
@@ -61,6 +62,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreatePipeOnDataNodeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreatePipePluginInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
@@ -68,6 +70,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TDropPipePluginInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
+import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.Binary;
@@ -656,6 +659,28 @@ public class ConfigNodeProcedureEnv {
     return clientHandler.getResponseList();
   }
 
+  public List<TSStatus> createPipeOnDataNodes(PipeMeta pipeMeta) throws IOException {
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        configManager.getNodeManager().getRegisteredDataNodeLocations();
+
+    TCreatePipeOnDataNodeReq request =
+        new TCreatePipeOnDataNodeReq().setPipeMeta(pipeMeta.serialize());
+    final AsyncClientHandler<TCreatePipeOnDataNodeReq, TSStatus> clientHandler =
+        new AsyncClientHandler<>(DataNodeRequestType.CREATE_PIPE, request, dataNodeLocationMap);
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    return clientHandler.getResponseList();
+  }
+
+  public List<TSStatus> operatePipeOnDataNodes(TOperatePipeOnDataNodeReq request) {
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        configManager.getNodeManager().getRegisteredDataNodeLocations();
+
+    final AsyncClientHandler<TOperatePipeOnDataNodeReq, TSStatus> clientHandler =
+        new AsyncClientHandler<>(DataNodeRequestType.OPERATE_PIPE, request, dataNodeLocationMap);
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    return clientHandler.getResponseList();
+  }
+
   public LockQueue getNodeLock() {
     return nodeLock;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
index 7d2a494fc7..b479066127 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
 import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.persistence.pipe.PipePluginInfo;
+import org.apache.iotdb.confignode.manager.pipe.PipePluginCoordinator;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
@@ -116,14 +116,18 @@ public class CreatePipePluginProcedure extends AbstractNodeProcedure<CreatePipeP
 
   private Flow executeFromLock(ConfigNodeProcedureEnv env) {
     LOGGER.info("CreatePipePluginProcedure: executeFromLock({})", pipePluginMeta.getPluginName());
-    final PipePluginInfo pipePluginInfo =
-        env.getConfigManager().getPipeManager().getPipePluginCoordinator().getPipePluginInfo();
+    final PipePluginCoordinator pipePluginCoordinator =
+        env.getConfigManager().getPipeManager().getPipePluginCoordinator();
 
-    pipePluginInfo.acquirePipePluginInfoLock();
+    pipePluginCoordinator.lock();
 
     try {
-      pipePluginInfo.validateBeforeCreatingPipePlugin(
-          pipePluginMeta.getPluginName(), pipePluginMeta.getJarName(), pipePluginMeta.getJarMD5());
+      pipePluginCoordinator
+          .getPipePluginInfo()
+          .validateBeforeCreatingPipePlugin(
+              pipePluginMeta.getPluginName(),
+              pipePluginMeta.getJarName(),
+              pipePluginMeta.getJarMD5());
     } catch (PipeManagementException e) {
       // The pipe plugin has already created, we should end the procedure
       LOGGER.warn(
@@ -131,7 +135,7 @@ public class CreatePipePluginProcedure extends AbstractNodeProcedure<CreatePipeP
           pipePluginMeta.getPluginName(),
           pipePluginMeta.getPluginName());
       setFailure(new ProcedureException(e.getMessage()));
-      pipePluginInfo.releasePipePluginInfoLock();
+      pipePluginCoordinator.unlock();
       return Flow.NO_MORE_STATE;
     }
 
@@ -186,11 +190,7 @@ public class CreatePipePluginProcedure extends AbstractNodeProcedure<CreatePipeP
   private Flow executeFromUnlock(ConfigNodeProcedureEnv env) {
     LOGGER.info("CreatePipePluginProcedure: executeFromUnlock({})", pipePluginMeta.getPluginName());
 
-    env.getConfigManager()
-        .getPipeManager()
-        .getPipePluginCoordinator()
-        .getPipePluginInfo()
-        .releasePipePluginInfoLock();
+    env.getConfigManager().getPipeManager().getPipePluginCoordinator().unlock();
 
     return Flow.NO_MORE_STATE;
   }
@@ -214,11 +214,7 @@ public class CreatePipePluginProcedure extends AbstractNodeProcedure<CreatePipeP
   private void rollbackFromLock(ConfigNodeProcedureEnv env) {
     LOGGER.info("CreatePipePluginProcedure: rollbackFromLock({})", pipePluginMeta.getPluginName());
 
-    env.getConfigManager()
-        .getPipeManager()
-        .getPipePluginCoordinator()
-        .getPipePluginInfo()
-        .releasePipePluginInfoLock();
+    env.getConfigManager().getPipeManager().getPipePluginCoordinator().unlock();
   }
 
   private void rollbackFromCreateOnConfigNodes(ConfigNodeProcedureEnv env) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index 815782a0ac..6aa4d6109d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.confignode.procedure.impl.pipe.plugin;
 
 import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
-import org.apache.iotdb.confignode.persistence.pipe.PipePluginInfo;
+import org.apache.iotdb.confignode.manager.pipe.PipePluginCoordinator;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
@@ -102,18 +102,18 @@ public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePlugi
 
   private Flow executeFromLock(ConfigNodeProcedureEnv env) {
     LOGGER.info("DropPipePluginProcedure: executeFromLock({})", pluginName);
-    final PipePluginInfo pipePluginInfo =
-        env.getConfigManager().getPipeManager().getPipePluginCoordinator().getPipePluginInfo();
+    final PipePluginCoordinator pipePluginCoordinator =
+        env.getConfigManager().getPipeManager().getPipePluginCoordinator();
 
-    pipePluginInfo.acquirePipePluginInfoLock();
+    pipePluginCoordinator.lock();
 
     try {
-      pipePluginInfo.validateBeforeDroppingPipePlugin(pluginName);
+      pipePluginCoordinator.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName);
     } catch (PipeManagementException e) {
       // if the pipe plugin is not exist, we should end the procedure
       LOGGER.warn(e.getMessage());
       setFailure(new ProcedureException(e.getMessage()));
-      pipePluginInfo.releasePipePluginInfoLock();
+      pipePluginCoordinator.unlock();
       return Flow.NO_MORE_STATE;
     }
 
@@ -149,11 +149,7 @@ public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePlugi
   private Flow executeFromUnlock(ConfigNodeProcedureEnv env) {
     LOGGER.info("DropPipePluginProcedure: executeFromUnlock({})", pluginName);
 
-    env.getConfigManager()
-        .getPipeManager()
-        .getPipePluginCoordinator()
-        .getPipePluginInfo()
-        .releasePipePluginInfoLock();
+    env.getConfigManager().getPipeManager().getPipePluginCoordinator().unlock();
 
     return Flow.NO_MORE_STATE;
   }
@@ -177,11 +173,7 @@ public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePlugi
   private void rollbackFromLock(ConfigNodeProcedureEnv env) {
     LOGGER.info("DropPipePluginProcedure: rollbackFromLock({})", pluginName);
 
-    env.getConfigManager()
-        .getPipeManager()
-        .getPipePluginCoordinator()
-        .getPipePluginInfo()
-        .releasePipePluginInfoLock();
+    env.getConfigManager().getPipeManager().getPipePluginCoordinator().unlock();
   }
 
   private void rollbackFromDropOnDataNodes(ConfigNodeProcedureEnv env) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
new file mode 100644
index 0000000000..b840d94591
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.procedure.impl.pipe.task;
+
+import org.apache.iotdb.commons.exception.sync.PipeException;
+import org.apache.iotdb.commons.exception.sync.PipeSinkException;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskOperation;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
+import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
+import org.apache.iotdb.confignode.procedure.state.pipe.task.OperatePipeTaskState;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * This procedure manage 4 kinds of PIPE operations: CREATE, START, STOP and DROP.
+ *
+ * <p>This class extends AbstractNodeProcedure to make sure that pipe task procedures can be
+ * executed in sequence and node procedures can be locked when a pipe task procedure is running.
+ */
+abstract class AbstractOperatePipeProcedureV2 extends AbstractNodeProcedure<OperatePipeTaskState> {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(AbstractOperatePipeProcedureV2.class);
+
+  private static final int RETRY_THRESHOLD = 3;
+
+  abstract PipeTaskOperation getOperation();
+
+  /**
+   * Execute at state VALIDATE_TASK
+   *
+   * @return true if procedure can finish directly
+   */
+  abstract boolean executeFromValidateTask(ConfigNodeProcedureEnv env)
+      throws PipeException, PipeSinkException;
+
+  /** Execute at state CALCULATE_INFO_FOR_TASK */
+  abstract void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) throws PipeException;
+
+  /** Execute at state WRITE_CONFIG_NODE_CONSENSUS */
+  abstract void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env)
+      throws PipeException;
+
+  /** Execute at state OPERATE_ON_DATA_NODES */
+  abstract void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
+      throws PipeException, IOException;
+
+  @Override
+  protected Flow executeFromState(ConfigNodeProcedureEnv env, OperatePipeTaskState state)
+      throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+    try {
+      switch (state) {
+        case VALIDATE_TASK:
+          env.getConfigManager().getPipeManager().getPipeTaskCoordinator().lock();
+          if (!executeFromValidateTask(env)) {
+            env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
+            return Flow.NO_MORE_STATE;
+          }
+          setNextState(OperatePipeTaskState.CALCULATE_INFO_FOR_TASK);
+          break;
+        case CALCULATE_INFO_FOR_TASK:
+          executeFromCalculateInfoForTask(env);
+          setNextState(OperatePipeTaskState.WRITE_CONFIG_NODE_CONSENSUS);
+          break;
+        case WRITE_CONFIG_NODE_CONSENSUS:
+          executeFromWriteConfigNodeConsensus(env);
+          setNextState(OperatePipeTaskState.OPERATE_ON_DATA_NODES);
+          break;
+        case OPERATE_ON_DATA_NODES:
+          executeFromOperateOnDataNodes(env);
+          env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
+          return Flow.NO_MORE_STATE;
+      }
+    } catch (PipeException | PipeSinkException | IOException e) {
+      if (isRollbackSupported(state)) {
+        LOGGER.error("Fail in OperatePipeProcedure", e);
+        setFailure(new ProcedureException(e.getMessage()));
+      } else {
+        LOGGER.error("Retrievable error trying to {} at state [{}]", getOperation(), state, e);
+        if (getCycles() > RETRY_THRESHOLD) {
+          setFailure(
+              new ProcedureException(
+                  String.format("Fail to %s because %s", getOperation().name(), e.getMessage())));
+        }
+      }
+    }
+    return Flow.HAS_MORE_STATE;
+  }
+
+  @Override
+  protected boolean isRollbackSupported(OperatePipeTaskState state) {
+    return true;
+  }
+
+  @Override
+  protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeTaskState state)
+      throws IOException, InterruptedException, ProcedureException {
+    switch (state) {
+      case VALIDATE_TASK:
+        rollbackFromValidateTask(env);
+        env.getConfigManager().getPipeManager().getPipeTaskCoordinator().unlock();
+        break;
+      case CALCULATE_INFO_FOR_TASK:
+        rollbackFromCalculateInfoForTask(env);
+        break;
+      case WRITE_CONFIG_NODE_CONSENSUS:
+        rollbackFromWriteConfigNodeConsensus(env);
+        break;
+      case OPERATE_ON_DATA_NODES:
+        rollbackFromOperateOnDataNodes(env);
+        break;
+      default:
+        LOGGER.error("Unsupported roll back STATE [{}]", state);
+    }
+  }
+
+  protected abstract void rollbackFromValidateTask(ConfigNodeProcedureEnv env);
+
+  protected abstract void rollbackFromCalculateInfoForTask(ConfigNodeProcedureEnv env);
+
+  protected abstract void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env);
+
+  protected abstract void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env);
+
+  @Override
+  protected OperatePipeTaskState getState(int stateId) {
+    return OperatePipeTaskState.values()[stateId];
+  }
+
+  @Override
+  protected int getStateId(OperatePipeTaskState state) {
+    return state.ordinal();
+  }
+
+  @Override
+  protected OperatePipeTaskState getInitialState() {
+    return OperatePipeTaskState.VALIDATE_TASK;
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
new file mode 100644
index 0000000000..7b7c6def4e
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.pipe.task;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.pipe.task.meta.PipeConsensusGroupTaskMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
+import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskOperation;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(CreatePipeProcedureV2.class);
+
+  private TCreatePipeReq createPipeRequest;
+
+  private PipeStaticMeta pipeStaticMeta;
+  private PipeRuntimeMeta pipeRuntimeMeta;
+
+  public CreatePipeProcedureV2() {
+    super();
+  }
+
+  public CreatePipeProcedureV2(TCreatePipeReq createPipeRequest) throws PipeException {
+    super();
+    this.createPipeRequest = createPipeRequest;
+  }
+
+  @Override
+  PipeTaskOperation getOperation() {
+    return PipeTaskOperation.CREATE_PIPE;
+  }
+
+  @Override
+  boolean executeFromValidateTask(ConfigNodeProcedureEnv env) {
+    LOGGER.info(
+        "CreatePipeProcedureV2: executeFromValidateTask({})", createPipeRequest.getPipeName());
+
+    final PipeInfo pipeInfo = env.getConfigManager().getPipeManager().getPipeInfo();
+    return pipeInfo.getPipePluginInfo().checkBeforeCreatePipe(createPipeRequest)
+        && pipeInfo.getPipeTaskInfo().checkBeforeCreatePipe(createPipeRequest);
+  }
+
+  @Override
+  void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) throws PipeManagementException {
+    LOGGER.info(
+        "CreatePipeProcedureV2: executeFromCalculateInfoForTask({})",
+        createPipeRequest.getPipeName());
+
+    pipeStaticMeta =
+        new PipeStaticMeta(
+            createPipeRequest.getPipeName(),
+            System.currentTimeMillis(),
+            createPipeRequest.getCollectorAttributes(),
+            createPipeRequest.getProcessorAttributes(),
+            createPipeRequest.getConnectorAttributes());
+
+    final Map<TConsensusGroupId, PipeConsensusGroupTaskMeta> consensusGroupIdToTaskMetaMap =
+        new HashMap<>();
+    env.getConfigManager()
+        .getLoadManager()
+        .getLatestRegionLeaderMap()
+        .forEach(
+            (region, leader) -> {
+              consensusGroupIdToTaskMetaMap.put(region, new PipeConsensusGroupTaskMeta(0, leader));
+            });
+    pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
+  }
+
+  @Override
+  void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env)
+      throws PipeManagementException {
+    LOGGER.info(
+        "CreatePipeProcedureV2: executeFromWriteConfigNodeConsensus({})",
+        createPipeRequest.getPipeName());
+
+    final ConsensusWriteResponse response =
+        env.getConfigManager()
+            .getConsensusManager()
+            .write(new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta));
+    if (!response.isSuccessful()) {
+      throw new PipeManagementException(response.getErrorMessage());
+    }
+  }
+
+  @Override
+  void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
+      throws PipeManagementException, IOException {
+    LOGGER.info(
+        "CreatePipeProcedureV2: executeFromOperateOnDataNodes({})",
+        createPipeRequest.getPipeName());
+
+    if (RpcUtils.squashResponseStatusList(
+                env.createPipeOnDataNodes(new PipeMeta(pipeStaticMeta, pipeRuntimeMeta)))
+            .getCode()
+        != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeManagementException(
+          String.format(
+              "Failed to create pipe instance [%s] on data nodes",
+              createPipeRequest.getPipeName()));
+    }
+  }
+
+  @Override
+  protected void rollbackFromValidateTask(ConfigNodeProcedureEnv env) {
+    LOGGER.info(
+        "CreatePipeProcedureV2: rollbackFromValidateTask({})", createPipeRequest.getPipeName());
+    // Do nothing
+  }
+
+  @Override
+  protected void rollbackFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
+    LOGGER.info(
+        "CreatePipeProcedureV2: rollbackFromCalculateInfoForTask({})",
+        createPipeRequest.getPipeName());
+    // Do nothing
+  }
+
+  @Override
+  protected void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
+    LOGGER.info(
+        "CreatePipeProcedureV2: rollbackFromWriteConfigNodeConsensus({})",
+        createPipeRequest.getPipeName());
+
+    final ConsensusWriteResponse response =
+        env.getConfigManager()
+            .getConsensusManager()
+            .write(new DropPipePlanV2(createPipeRequest.getPipeName()));
+    if (!response.isSuccessful()) {
+      throw new PipeManagementException(response.getErrorMessage());
+    }
+  }
+
+  @Override
+  protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
+    LOGGER.info(
+        "CreatePipeProcedureV2: rollbackFromOperateOnDataNodes({})",
+        createPipeRequest.getPipeName());
+
+    final TOperatePipeOnDataNodeReq request =
+        new TOperatePipeOnDataNodeReq()
+            .setPipeName(createPipeRequest.getPipeName())
+            .setOperation((byte) PipeTaskOperation.DROP_PIPE.ordinal());
+    if (RpcUtils.squashResponseStatusList(env.operatePipeOnDataNodes(request)).getCode()
+        != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeManagementException(
+          String.format(
+              "Failed to rollback from operate on data nodes for task [%s]",
+              createPipeRequest.getPipeName()));
+    }
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    stream.writeShort(ProcedureType.CREATE_PIPE_PROCEDURE_V2.getTypeCode());
+    super.serialize(stream);
+    ReadWriteIOUtils.write(createPipeRequest.getPipeName(), stream);
+    stream.writeInt(createPipeRequest.getCollectorAttributesSize());
+    for (Map.Entry<String, String> entry : createPipeRequest.getCollectorAttributes().entrySet()) {
+      ReadWriteIOUtils.write(entry.getKey(), stream);
+      ReadWriteIOUtils.write(entry.getValue(), stream);
+    }
+    stream.writeInt(createPipeRequest.getProcessorAttributesSize());
+    for (Map.Entry<String, String> entry : createPipeRequest.getProcessorAttributes().entrySet()) {
+      ReadWriteIOUtils.write(entry.getKey(), stream);
+      ReadWriteIOUtils.write(entry.getValue(), stream);
+    }
+    stream.writeInt(createPipeRequest.getConnectorAttributesSize());
+    for (Map.Entry<String, String> entry : createPipeRequest.getConnectorAttributes().entrySet()) {
+      ReadWriteIOUtils.write(entry.getKey(), stream);
+      ReadWriteIOUtils.write(entry.getValue(), stream);
+    }
+    if (pipeStaticMeta != null) {
+      stream.writeBoolean(true);
+      pipeStaticMeta.serialize(stream);
+    } else {
+      stream.writeBoolean(false);
+    }
+  }
+
+  @Override
+  public void deserialize(ByteBuffer byteBuffer) {
+    super.deserialize(byteBuffer);
+    createPipeRequest =
+        new TCreatePipeReq()
+            .setPipeName(ReadWriteIOUtils.readString(byteBuffer))
+            .setCollectorAttributes(new HashMap<>())
+            .setProcessorAttributes(new HashMap<>())
+            .setConnectorAttributes(new HashMap<>());
+    int size = byteBuffer.getInt();
+    for (int i = 0; i < size; ++i) {
+      createPipeRequest
+          .getCollectorAttributes()
+          .put(ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
+    }
+    size = byteBuffer.getInt();
+    for (int i = 0; i < size; ++i) {
+      createPipeRequest
+          .getProcessorAttributes()
+          .put(ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
+    }
+    size = byteBuffer.getInt();
+    for (int i = 0; i < size; ++i) {
+      createPipeRequest
+          .getConnectorAttributes()
+          .put(ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
+    }
+    if (ReadWriteIOUtils.readBool(byteBuffer)) {
+      pipeStaticMeta = PipeStaticMeta.deserialize(byteBuffer);
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    CreatePipeProcedureV2 that = (CreatePipeProcedureV2) o;
+    return createPipeRequest.getPipeName().equals(that.createPipeRequest.getPipeName());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(createPipeRequest.getPipeName());
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
new file mode 100644
index 0000000000..36bd95094d
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.procedure.impl.pipe.task;
+
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskOperation;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class DropPipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(DropPipeProcedureV2.class);
+
+  private String pipeName;
+
+  public DropPipeProcedureV2() {
+    super();
+  }
+
+  public DropPipeProcedureV2(String pipeName) throws PipeException {
+    super();
+    this.pipeName = pipeName;
+  }
+
+  @Override
+  PipeTaskOperation getOperation() {
+    return PipeTaskOperation.DROP_PIPE;
+  }
+
+  @Override
+  boolean executeFromValidateTask(ConfigNodeProcedureEnv env) throws PipeManagementException {
+    LOGGER.info("DropPipeProcedureV2: executeFromValidateTask({})", pipeName);
+
+    return env.getConfigManager()
+        .getPipeManager()
+        .getPipeInfo()
+        .getPipeTaskInfo()
+        .checkBeforeDropPipe(pipeName);
+  }
+
+  @Override
+  void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) throws PipeManagementException {
+    LOGGER.info("DropPipeProcedureV2: executeFromCalculateInfoForTask({})", pipeName);
+    // Do nothing
+  }
+
+  @Override
+  void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env)
+      throws PipeManagementException {
+    LOGGER.info("DropPipeProcedureV2: executeFromWriteConfigNodeConsensus({})", pipeName);
+
+    final ConsensusWriteResponse response =
+        env.getConfigManager().getConsensusManager().write(new DropPipePlanV2(pipeName));
+    if (!response.isSuccessful()) {
+      throw new PipeManagementException(response.getErrorMessage());
+    }
+  }
+
+  @Override
+  void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws PipeManagementException {
+    LOGGER.info("DropPipeProcedureV2: executeFromOperateOnDataNodes({})", pipeName);
+
+    final TOperatePipeOnDataNodeReq request =
+        new TOperatePipeOnDataNodeReq()
+            .setPipeName(pipeName)
+            .setOperation((byte) PipeTaskOperation.DROP_PIPE.ordinal());
+    if (RpcUtils.squashResponseStatusList(env.operatePipeOnDataNodes(request)).getCode()
+        != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeManagementException(
+          String.format("Failed to drop pipe instance [%s] on data nodes", pipeName));
+    }
+  }
+
+  @Override
+  protected void rollbackFromValidateTask(ConfigNodeProcedureEnv env) {
+    LOGGER.info("DropPipeProcedureV2: rollbackFromValidateTask({})", pipeName);
+    // Do nothing
+  }
+
+  @Override
+  protected void rollbackFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
+    LOGGER.info("DropPipeProcedureV2: rollbackFromCalculateInfoForTask({})", pipeName);
+    // Do nothing
+  }
+
+  @Override
+  protected void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
+    LOGGER.info("DropPipeProcedureV2: rollbackFromWriteConfigNodeConsensus({})", pipeName);
+    // Do nothing
+  }
+
+  @Override
+  protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
+    LOGGER.info("DropPipeProcedureV2: rollbackFromOperateOnDataNodes({})", pipeName);
+    // Do nothing
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    stream.writeShort(ProcedureType.DROP_PIPE_PROCEDURE_V2.getTypeCode());
+    super.serialize(stream);
+    ReadWriteIOUtils.write(pipeName, stream);
+  }
+
+  @Override
+  public void deserialize(ByteBuffer byteBuffer) {
+    super.deserialize(byteBuffer);
+    pipeName = ReadWriteIOUtils.readString(byteBuffer);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    DropPipeProcedureV2 that = (DropPipeProcedureV2) o;
+    return pipeName.equals(that.pipeName);
+  }
+
+  @Override
+  public int hashCode() {
+    return pipeName.hashCode();
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
new file mode 100644
index 0000000000..786526714e
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.procedure.impl.pipe.task;
+
+import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskOperation;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class StartPipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(StartPipeProcedureV2.class);
+
+  private String pipeName;
+
+  public StartPipeProcedureV2() {
+    super();
+  }
+
+  public StartPipeProcedureV2(String pipeName) throws PipeException {
+    super();
+    this.pipeName = pipeName;
+  }
+
+  @Override
+  PipeTaskOperation getOperation() {
+    return PipeTaskOperation.START_PIPE;
+  }
+
+  @Override
+  boolean executeFromValidateTask(ConfigNodeProcedureEnv env) throws PipeManagementException {
+    LOGGER.info("StartPipeProcedureV2: executeFromValidateTask({})", pipeName);
+
+    return env.getConfigManager()
+        .getPipeManager()
+        .getPipeInfo()
+        .getPipeTaskInfo()
+        .checkBeforeStartPipe(pipeName);
+  }
+
+  @Override
+  void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) throws PipeManagementException {
+    LOGGER.info("StartPipeProcedureV2: executeFromCalculateInfoForTask({})", pipeName);
+    // Do nothing
+  }
+
+  @Override
+  void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env)
+      throws PipeManagementException {
+    LOGGER.info("StartPipeProcedureV2: executeFromWriteConfigNodeConsensus({})", pipeName);
+
+    final ConsensusWriteResponse response =
+        env.getConfigManager()
+            .getConsensusManager()
+            .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.RUNNING));
+    if (!response.isSuccessful()) {
+      throw new PipeManagementException(response.getErrorMessage());
+    }
+  }
+
+  @Override
+  void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws PipeManagementException {
+    LOGGER.info("StartPipeProcedureV2: executeFromOperateOnDataNodes({})", pipeName);
+
+    final TOperatePipeOnDataNodeReq request =
+        new TOperatePipeOnDataNodeReq()
+            .setPipeName(pipeName)
+            .setOperation((byte) PipeTaskOperation.START_PIPE.ordinal());
+    if (RpcUtils.squashResponseStatusList(env.operatePipeOnDataNodes(request)).getCode()
+        != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeManagementException(
+          String.format("Failed to start pipe instance [%s] on data nodes", pipeName));
+    }
+  }
+
+  @Override
+  protected void rollbackFromValidateTask(ConfigNodeProcedureEnv env) {
+    LOGGER.info("StartPipeProcedureV2: rollbackFromValidateTask({})", pipeName);
+    // Do nothing
+  }
+
+  @Override
+  protected void rollbackFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
+    LOGGER.info("StartPipeProcedureV2: rollbackFromCalculateInfoForTask({})", pipeName);
+    // Do nothing
+  }
+
+  @Override
+  protected void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
+    LOGGER.info("StartPipeProcedureV2: rollbackFromWriteConfigNodeConsensus({})", pipeName);
+
+    final ConsensusWriteResponse response =
+        env.getConfigManager()
+            .getConsensusManager()
+            .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.STOPPED));
+    if (!response.isSuccessful()) {
+      throw new PipeManagementException(response.getErrorMessage());
+    }
+  }
+
+  @Override
+  protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
+      throws PipeManagementException {
+    LOGGER.info("StartPipeProcedureV2: rollbackFromOperateOnDataNodes({})", pipeName);
+
+    final TOperatePipeOnDataNodeReq request =
+        new TOperatePipeOnDataNodeReq()
+            .setPipeName(pipeName)
+            .setOperation((byte) PipeTaskOperation.STOP_PIPE.ordinal());
+    if (RpcUtils.squashResponseStatusList(env.operatePipeOnDataNodes(request)).getCode()
+        != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeManagementException(
+          String.format("Failed to rollback from start on data nodes for task [%s]", pipeName));
+    }
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    stream.writeShort(ProcedureType.START_PIPE_PROCEDURE_V2.getTypeCode());
+    super.serialize(stream);
+    ReadWriteIOUtils.write(pipeName, stream);
+  }
+
+  @Override
+  public void deserialize(ByteBuffer byteBuffer) {
+    super.deserialize(byteBuffer);
+    pipeName = ReadWriteIOUtils.readString(byteBuffer);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    StartPipeProcedureV2 that = (StartPipeProcedureV2) o;
+    return pipeName.equals(that.pipeName);
+  }
+
+  @Override
+  public int hashCode() {
+    return pipeName.hashCode();
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
new file mode 100644
index 0000000000..0bb231b96d
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.procedure.impl.pipe.task;
+
+import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskOperation;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class StopPipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(StopPipeProcedureV2.class);
+
+  private String pipeName;
+
+  public StopPipeProcedureV2() {
+    super();
+  }
+
+  public StopPipeProcedureV2(String pipeName) throws PipeException {
+    super();
+    this.pipeName = pipeName;
+  }
+
+  @Override
+  PipeTaskOperation getOperation() {
+    return PipeTaskOperation.STOP_PIPE;
+  }
+
+  @Override
+  boolean executeFromValidateTask(ConfigNodeProcedureEnv env) throws PipeManagementException {
+    LOGGER.info("StopPipeProcedureV2: executeFromValidateTask({})", pipeName);
+
+    return env.getConfigManager()
+        .getPipeManager()
+        .getPipeInfo()
+        .getPipeTaskInfo()
+        .checkBeforeStopPipe(pipeName);
+  }
+
+  @Override
+  void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) throws PipeManagementException {
+    LOGGER.info("StopPipeProcedureV2: executeFromCalculateInfoForTask({})", pipeName);
+    // Do nothing
+  }
+
+  @Override
+  void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env)
+      throws PipeManagementException {
+    LOGGER.info("StopPipeProcedureV2: executeFromWriteConfigNodeConsensus({})", pipeName);
+
+    final ConsensusWriteResponse response =
+        env.getConfigManager()
+            .getConsensusManager()
+            .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.STOPPED));
+    if (!response.isSuccessful()) {
+      throw new PipeManagementException(response.getErrorMessage());
+    }
+  }
+
+  @Override
+  void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws PipeManagementException {
+    LOGGER.info("StopPipeProcedureV2: executeFromOperateOnDataNodes({})", pipeName);
+
+    final TOperatePipeOnDataNodeReq request =
+        new TOperatePipeOnDataNodeReq()
+            .setPipeName(pipeName)
+            .setOperation((byte) PipeTaskOperation.STOP_PIPE.ordinal());
+    if (RpcUtils.squashResponseStatusList(env.operatePipeOnDataNodes(request)).getCode()
+        != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeManagementException(
+          String.format("Failed to stop pipe instance [%s] on data nodes", pipeName));
+    }
+  }
+
+  @Override
+  protected void rollbackFromValidateTask(ConfigNodeProcedureEnv env) {
+    LOGGER.info("StopPipeProcedureV2: rollbackFromValidateTask({})", pipeName);
+    // Do nothing
+  }
+
+  @Override
+  protected void rollbackFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
+    LOGGER.info("StopPipeProcedureV2: rollbackFromCalculateInfoForTask({})", pipeName);
+    // Do nothing
+  }
+
+  @Override
+  protected void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
+    LOGGER.info("StopPipeProcedureV2: rollbackFromWriteConfigNodeConsensus({})", pipeName);
+
+    final ConsensusWriteResponse response =
+        env.getConfigManager()
+            .getConsensusManager()
+            .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.RUNNING));
+    if (!response.isSuccessful()) {
+      throw new PipeManagementException(response.getErrorMessage());
+    }
+  }
+
+  @Override
+  protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
+      throws PipeManagementException {
+    LOGGER.info("StopPipeProcedureV2: rollbackFromOperateOnDataNodes({})", pipeName);
+
+    final TOperatePipeOnDataNodeReq request =
+        new TOperatePipeOnDataNodeReq()
+            .setPipeName(pipeName)
+            .setOperation((byte) PipeTaskOperation.START_PIPE.ordinal());
+    if (RpcUtils.squashResponseStatusList(env.operatePipeOnDataNodes(request)).getCode()
+        != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeManagementException(
+          String.format("Failed to rollback from stop on data nodes for task [%s]", pipeName));
+    }
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    stream.writeShort(ProcedureType.STOP_PIPE_PROCEDURE_V2.getTypeCode());
+    super.serialize(stream);
+    ReadWriteIOUtils.write(pipeName, stream);
+  }
+
+  @Override
+  public void deserialize(ByteBuffer byteBuffer) {
+    super.deserialize(byteBuffer);
+    pipeName = ReadWriteIOUtils.readString(byteBuffer);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    StopPipeProcedureV2 that = (StopPipeProcedureV2) o;
+    return pipeName.equals(that.pipeName);
+  }
+
+  @Override
+  public int hashCode() {
+    return pipeName.hashCode();
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/AbstractOperatePipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/AbstractOperatePipeProcedure.java
deleted file mode 100644
index c1ffa6bdf1..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/AbstractOperatePipeProcedure.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.procedure.impl.sync;
-
-import org.apache.iotdb.commons.exception.sync.PipeException;
-import org.apache.iotdb.commons.exception.sync.PipeSinkException;
-import org.apache.iotdb.commons.sync.pipe.SyncOperation;
-import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
-import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
-import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
-import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
-import org.apache.iotdb.confignode.procedure.impl.statemachine.StateMachineProcedure;
-import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
-import org.apache.iotdb.confignode.procedure.state.sync.OperatePipeState;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** This procedure manage three kinds of PIPE operations: CREATE, START and STOP */
-abstract class AbstractOperatePipeProcedure
-    extends StateMachineProcedure<ConfigNodeProcedureEnv, OperatePipeState> {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractOperatePipeProcedure.class);
-
-  private static final int RETRY_THRESHOLD = 3;
-
-  /**
-   * Execute at state OPERATE_CHECK
-   *
-   * @return true if procedure can finish directly
-   */
-  abstract boolean executeCheckCanSkip(ConfigNodeProcedureEnv env)
-      throws PipeException, PipeSinkException;
-
-  /** Execute at state PRE_OPERATE_PIPE_CONFIGNODE */
-  abstract void executePreOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException;
-
-  /** Execute at state OPERATE_PIPE_DATANODE */
-  abstract void executeOperatePipeOnDataNode(ConfigNodeProcedureEnv env) throws PipeException;
-
-  /** Execute at state OPERATE_PIPE_CONFIGNODE */
-  abstract void executeOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException;
-
-  abstract SyncOperation getOperation();
-
-  @Override
-  protected Flow executeFromState(ConfigNodeProcedureEnv env, OperatePipeState state)
-      throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
-    try {
-      switch (state) {
-        case OPERATE_CHECK:
-          env.getConfigManager().getSyncManager().lockSyncMetadata();
-          if (executeCheckCanSkip(env)) {
-            env.getConfigManager().getSyncManager().unlockSyncMetadata();
-            return Flow.NO_MORE_STATE;
-          }
-          setNextState(OperatePipeState.PRE_OPERATE_PIPE_CONFIGNODE);
-          break;
-        case PRE_OPERATE_PIPE_CONFIGNODE:
-          executePreOperatePipeOnConfigNode(env);
-          setNextState(OperatePipeState.OPERATE_PIPE_DATANODE);
-          break;
-        case OPERATE_PIPE_DATANODE:
-          executeOperatePipeOnDataNode(env);
-          setNextState(OperatePipeState.OPERATE_PIPE_CONFIGNODE);
-          break;
-        case OPERATE_PIPE_CONFIGNODE:
-          executeOperatePipeOnConfigNode(env);
-          env.getConfigManager().getSyncManager().unlockSyncMetadata();
-          return Flow.NO_MORE_STATE;
-      }
-    } catch (PipeException | PipeSinkException e) {
-      if (isRollbackSupported(state)) {
-        LOGGER.error("Fail in OperatePipeProcedure", e);
-        setFailure(new ProcedureException(e.getMessage()));
-      } else {
-        LOGGER.error("Retrievable error trying to {} at state [{}]", getOperation(), state, e);
-        if (getCycles() > RETRY_THRESHOLD) {
-          setFailure(
-              new ProcedureException(
-                  String.format("Fail to %s because %s", getOperation().name(), e.getMessage())));
-        }
-      }
-    }
-    return Flow.HAS_MORE_STATE;
-  }
-
-  @Override
-  protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv env) {
-    env.getSchedulerLock().lock();
-    try {
-      if (env.getPipeLock().tryLock(this)) {
-        LOGGER.info("procedureId {} acquire lock.", getProcId());
-        return ProcedureLockState.LOCK_ACQUIRED;
-      }
-      env.getPipeLock().waitProcedure(this);
-      LOGGER.info("procedureId {} wait for lock.", getProcId());
-      return ProcedureLockState.LOCK_EVENT_WAIT;
-    } finally {
-      env.getSchedulerLock().unlock();
-    }
-  }
-
-  @Override
-  protected void releaseLock(ConfigNodeProcedureEnv env) {
-    env.getSchedulerLock().lock();
-    try {
-      LOGGER.info("procedureId {} release lock.", getProcId());
-      if (env.getPipeLock().releaseLock(this)) {
-        env.getPipeLock().wakeWaitingProcedures(env.getScheduler());
-      }
-    } finally {
-      env.getSchedulerLock().unlock();
-    }
-  }
-
-  @Override
-  protected OperatePipeState getState(int stateId) {
-    return OperatePipeState.values()[stateId];
-  }
-
-  @Override
-  protected int getStateId(OperatePipeState state) {
-    return state.ordinal();
-  }
-
-  @Override
-  protected OperatePipeState getInitialState() {
-    return OperatePipeState.OPERATE_CHECK;
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/CreatePipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/CreatePipeProcedure.java
index 24dc3cef22..b1f22e5711 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/CreatePipeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/CreatePipeProcedure.java
@@ -18,38 +18,24 @@
  */
 package org.apache.iotdb.confignode.procedure.impl.sync;
 
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.sync.PipeException;
-import org.apache.iotdb.commons.exception.sync.PipeSinkException;
 import org.apache.iotdb.commons.sync.pipe.PipeInfo;
-import org.apache.iotdb.commons.sync.pipe.PipeStatus;
-import org.apache.iotdb.commons.sync.pipe.SyncOperation;
-import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.procedure.Procedure;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
-import org.apache.iotdb.confignode.procedure.state.sync.OperatePipeState;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
-import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
-public class CreatePipeProcedure extends AbstractOperatePipeProcedure {
-  private static final Logger LOGGER = LoggerFactory.getLogger(CreatePipeProcedure.class);
+// Empty procedure for old sync, restored only for compatibility
+public class CreatePipeProcedure extends Procedure<ConfigNodeProcedureEnv> {
 
   private PipeInfo pipeInfo;
   private Set<Integer> executedDataNodeIds = new HashSet<>();
@@ -58,111 +44,20 @@ public class CreatePipeProcedure extends AbstractOperatePipeProcedure {
     super();
   }
 
-  public CreatePipeProcedure(TCreatePipeReq req) throws PipeException {
-    super();
-    this.pipeInfo = SyncPipeUtil.parseTCreatePipeReqAsPipeInfo(req, System.currentTimeMillis());
-  }
-
-  @TestOnly
-  public void setExecutedDataNodeIds(Set<Integer> executedDataNodeIds) {
-    this.executedDataNodeIds = executedDataNodeIds;
-  }
-
   @Override
-  boolean executeCheckCanSkip(ConfigNodeProcedureEnv env) throws PipeException, PipeSinkException {
-    LOGGER.info("Start to create PIPE [{}]", pipeInfo.getPipeName());
-    env.getConfigManager().getSyncManager().checkAddPipe(pipeInfo);
-    return false;
+  protected Procedure<ConfigNodeProcedureEnv>[] execute(
+      ConfigNodeProcedureEnv configNodeProcedureEnv)
+      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+    return new Procedure[0];
   }
 
   @Override
-  void executePreOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
-    LOGGER.info("Start to pre-create PIPE [{}] on Config Nodes", pipeInfo.getPipeName());
-    TSStatus status = env.getConfigManager().getSyncManager().preCreatePipe(pipeInfo);
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new PipeException(status.getMessage());
-    }
-  }
-
-  @Override
-  void executeOperatePipeOnDataNode(ConfigNodeProcedureEnv env) throws PipeException {
-    LOGGER.info("Start to broadcast create PIPE [{}] on Data Nodes", pipeInfo.getPipeName());
-    Map<Integer, TSStatus> responseMap =
-        env.getConfigManager().getSyncManager().preCreatePipeOnDataNodes(pipeInfo);
-    TSStatus status = RpcUtils.squashResponseStatusList(new ArrayList<>(responseMap.values()));
-    executedDataNodeIds.addAll(responseMap.keySet());
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new PipeException(
-          String.format(
-              "Fail to create PIPE [%s] because %s.",
-              pipeInfo.getPipeName(),
-              StringUtils.join(
-                  responseMap.values().stream()
-                      .filter(i -> i.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode())
-                      .map(TSStatus::getMessage)
-                      .toArray(),
-                  ", ")));
-    }
-  }
+  protected void rollback(ConfigNodeProcedureEnv configNodeProcedureEnv)
+      throws IOException, InterruptedException, ProcedureException {}
 
   @Override
-  void executeOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
-    LOGGER.info("Start to create PIPE [{}] on Config Nodes", pipeInfo.getPipeName());
-    TSStatus status =
-        env.getConfigManager()
-            .getSyncManager()
-            .setPipeStatus(pipeInfo.getPipeName(), PipeStatus.STOP);
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new PipeException(status.getMessage());
-    }
-  }
-
-  @Override
-  SyncOperation getOperation() {
-    return SyncOperation.CREATE_PIPE;
-  }
-
-  @Override
-  protected boolean isRollbackSupported(OperatePipeState state) {
-    switch (state) {
-      case OPERATE_CHECK:
-      case PRE_OPERATE_PIPE_CONFIGNODE:
-      case OPERATE_PIPE_DATANODE:
-        return true;
-      default:
-        return false;
-    }
-  }
-
-  @Override
-  protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeState state)
-      throws IOException, InterruptedException, ProcedureException {
-    LOGGER.info("Roll back CreatePipeProcedure at STATE [{}]", state);
-    switch (state) {
-      case OPERATE_CHECK:
-        env.getConfigManager().getSyncManager().unlockSyncMetadata();
-        break;
-      case PRE_OPERATE_PIPE_CONFIGNODE:
-        TSStatus status = env.getConfigManager().getSyncManager().dropPipe(pipeInfo.getPipeName());
-        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-          throw new ProcedureException(
-              String.format(
-                  "Failed to create pipe and failed to roll back because %s. Please execute [DROP PIPE %s] manually.",
-                  status.getMessage(), pipeInfo.getPipeName()));
-        }
-        break;
-      case OPERATE_PIPE_DATANODE:
-        env.getConfigManager()
-            .getSyncManager()
-            .operatePipeOnDataNodesForRollback(
-                pipeInfo.getPipeName(),
-                pipeInfo.getCreateTime(),
-                SyncOperation.DROP_PIPE,
-                executedDataNodeIds);
-        break;
-      default:
-        LOGGER.error("Unsupported roll back STATE [{}]", state);
-    }
+  protected boolean abort(ConfigNodeProcedureEnv configNodeProcedureEnv) {
+    return false;
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/DropPipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/DropPipeProcedure.java
index fd7f6f36a8..aa61bc8949 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/DropPipeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/DropPipeProcedure.java
@@ -18,32 +18,21 @@
  */
 package org.apache.iotdb.confignode.procedure.impl.sync;
 
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.sync.PipeException;
-import org.apache.iotdb.commons.sync.pipe.PipeInfo;
-import org.apache.iotdb.commons.sync.pipe.PipeStatus;
-import org.apache.iotdb.commons.sync.pipe.SyncOperation;
+import org.apache.iotdb.confignode.procedure.Procedure;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
-import org.apache.iotdb.confignode.procedure.state.sync.OperatePipeState;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Map;
 import java.util.Objects;
 
-public class DropPipeProcedure extends AbstractOperatePipeProcedure {
-  private static final Logger LOGGER = LoggerFactory.getLogger(DropPipeProcedure.class);
+// Empty procedure for old sync, restored only for compatibility
+public class DropPipeProcedure extends Procedure<ConfigNodeProcedureEnv> {
 
   private String pipeName;
 
@@ -51,80 +40,20 @@ public class DropPipeProcedure extends AbstractOperatePipeProcedure {
     super();
   }
 
-  public DropPipeProcedure(String pipeName) throws PipeException {
-    super();
-    this.pipeName = pipeName;
-  }
-
-  @Override
-  boolean executeCheckCanSkip(ConfigNodeProcedureEnv env) throws PipeException {
-    LOGGER.info("Start to drop PIPE [{}]", pipeName);
-    // throw PipeNotExistException if pipe not exist
-    PipeInfo pipeInfo = env.getConfigManager().getSyncManager().getPipeInfo(pipeName);
-    return false;
-  }
-
   @Override
-  void executePreOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
-    LOGGER.info("Start to pre-drop PIPE [{}] on Config Nodes", pipeName);
-    TSStatus status =
-        env.getConfigManager().getSyncManager().setPipeStatus(pipeName, PipeStatus.DROP);
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new PipeException(status.getMessage());
-    }
+  protected Procedure<ConfigNodeProcedureEnv>[] execute(
+      ConfigNodeProcedureEnv configNodeProcedureEnv)
+      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+    return new Procedure[0];
   }
 
   @Override
-  void executeOperatePipeOnDataNode(ConfigNodeProcedureEnv env) throws PipeException {
-    LOGGER.info("Start to broadcast drop PIPE [{}] on Data Nodes", pipeName);
-    Map<Integer, TSStatus> responseMap =
-        env.getConfigManager()
-            .getSyncManager()
-            .operatePipeOnDataNodes(pipeName, SyncOperation.DROP_PIPE);
-    TSStatus status = RpcUtils.squashResponseStatusList(new ArrayList<>(responseMap.values()));
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new PipeException(
-          String.format(
-              "Fail to drop PIPE [%s] because %s. Please execute [DROP PIPE %s] later to retry.",
-              pipeName,
-              StringUtils.join(
-                  responseMap.values().stream()
-                      .filter(i -> i.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode())
-                      .map(TSStatus::getMessage)
-                      .toArray(),
-                  ", "),
-              pipeName));
-    }
-  }
+  protected void rollback(ConfigNodeProcedureEnv configNodeProcedureEnv)
+      throws IOException, InterruptedException, ProcedureException {}
 
   @Override
-  void executeOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
-    LOGGER.info("Start to drop PIPE [{}] on Config Nodes", pipeName);
-    TSStatus status = env.getConfigManager().getSyncManager().dropPipe(pipeName);
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new PipeException(status.getMessage());
-    }
-  }
-
-  @Override
-  SyncOperation getOperation() {
-    return SyncOperation.DROP_PIPE;
-  }
-
-  @Override
-  protected boolean isRollbackSupported(OperatePipeState state) {
-    return state == OperatePipeState.OPERATE_CHECK;
-  }
-
-  @Override
-  protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeState state)
-      throws IOException, InterruptedException, ProcedureException {
-    LOGGER.info("Roll back DropPipeProcedure at STATE [{}]", state);
-    if (state == OperatePipeState.OPERATE_CHECK) {
-      env.getConfigManager().getSyncManager().unlockSyncMetadata();
-    } else {
-      LOGGER.error("Unsupported roll back STATE [{}]", state);
-    }
+  protected boolean abort(ConfigNodeProcedureEnv configNodeProcedureEnv) {
+    return false;
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java
index d7e68434d9..658d319cbd 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java
@@ -18,35 +18,24 @@
  */
 package org.apache.iotdb.confignode.procedure.impl.sync;
 
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.sync.PipeException;
 import org.apache.iotdb.commons.sync.pipe.PipeInfo;
-import org.apache.iotdb.commons.sync.pipe.PipeStatus;
-import org.apache.iotdb.commons.sync.pipe.SyncOperation;
-import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.procedure.Procedure;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
-import org.apache.iotdb.confignode.procedure.state.sync.OperatePipeState;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
-public class StartPipeProcedure extends AbstractOperatePipeProcedure {
-  private static final Logger LOGGER = LoggerFactory.getLogger(StartPipeProcedure.class);
+// Empty procedure for old sync, used only for compatibility
+public class StartPipeProcedure extends Procedure<ConfigNodeProcedureEnv> {
 
   private String pipeName;
   private PipeInfo pipeInfo;
@@ -56,119 +45,20 @@ public class StartPipeProcedure extends AbstractOperatePipeProcedure {
     super();
   }
 
-  public StartPipeProcedure(String pipeName) throws PipeException {
-    super();
-    this.pipeName = pipeName;
-  }
-
-  @TestOnly
-  public void setPipeInfo(PipeInfo pipeInfo) {
-    this.pipeInfo = pipeInfo;
-  }
-
-  @TestOnly
-  public void setExecutedDataNodeIds(Set<Integer> executedDataNodeIds) {
-    this.executedDataNodeIds = executedDataNodeIds;
-  }
-
   @Override
-  boolean executeCheckCanSkip(ConfigNodeProcedureEnv env) throws PipeException {
-    LOGGER.info("Start to start PIPE [{}]", pipeName);
-    pipeInfo = env.getConfigManager().getSyncManager().getPipeInfo(pipeName);
-    if (pipeInfo.getStatus().equals(PipeStatus.DROP)) {
-      throw new PipeException(
-          String.format("PIPE [%s] has been dropped and cannot be started again.", pipeName));
-    }
-    return pipeInfo.getStatus().equals(PipeStatus.RUNNING);
+  protected Procedure<ConfigNodeProcedureEnv>[] execute(
+      ConfigNodeProcedureEnv configNodeProcedureEnv)
+      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+    return new Procedure[0];
   }
 
   @Override
-  void executePreOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
-    LOGGER.info("Start to pre-start PIPE [{}] on Config Nodes", pipeName);
-    TSStatus status =
-        env.getConfigManager().getSyncManager().setPipeStatus(pipeName, PipeStatus.PARTIAL_START);
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new PipeException(status.getMessage());
-    }
-  }
+  protected void rollback(ConfigNodeProcedureEnv configNodeProcedureEnv)
+      throws IOException, InterruptedException, ProcedureException {}
 
   @Override
-  void executeOperatePipeOnDataNode(ConfigNodeProcedureEnv env) throws PipeException {
-    LOGGER.info("Start to broadcast start PIPE [{}] on Data Nodes", pipeName);
-    Map<Integer, TSStatus> responseMap =
-        env.getConfigManager()
-            .getSyncManager()
-            .operatePipeOnDataNodes(pipeName, SyncOperation.START_PIPE);
-    TSStatus status = RpcUtils.squashResponseStatusList(new ArrayList<>(responseMap.values()));
-    executedDataNodeIds.addAll(responseMap.keySet());
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new PipeException(
-          String.format(
-              "Fail to start PIPE [%s] because %s.",
-              pipeName,
-              StringUtils.join(
-                  responseMap.values().stream()
-                      .filter(i -> i.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode())
-                      .map(TSStatus::getMessage)
-                      .toArray(),
-                  ", ")));
-    }
-  }
-
-  @Override
-  void executeOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
-    LOGGER.info("Start to start PIPE [{}] on Config Nodes", pipeName);
-    TSStatus status =
-        env.getConfigManager().getSyncManager().setPipeStatus(pipeName, PipeStatus.RUNNING);
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new PipeException(status.getMessage());
-    }
-  }
-
-  @Override
-  SyncOperation getOperation() {
-    return SyncOperation.START_PIPE;
-  }
-
-  @Override
-  protected boolean isRollbackSupported(OperatePipeState state) {
-    switch (state) {
-      case OPERATE_CHECK:
-      case PRE_OPERATE_PIPE_CONFIGNODE:
-      case OPERATE_PIPE_DATANODE:
-        return true;
-      default:
-        return false;
-    }
-  }
-
-  @Override
-  protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeState state)
-      throws IOException, InterruptedException, ProcedureException {
-    LOGGER.info("Roll back StartPipeProcedure at STATE [{}]", state);
-    switch (state) {
-      case OPERATE_CHECK:
-        env.getConfigManager().getSyncManager().unlockSyncMetadata();
-        break;
-      case PRE_OPERATE_PIPE_CONFIGNODE:
-        TSStatus status =
-            env.getConfigManager().getSyncManager().setPipeStatus(pipeName, PipeStatus.STOP);
-        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-          throw new ProcedureException(
-              String.format(
-                  "Failed to start pipe and failed to roll back because %s. Please execute [STOP PIPE %s] manually.",
-                  status.getMessage(), pipeName));
-        }
-        break;
-      case OPERATE_PIPE_DATANODE:
-        env.getConfigManager()
-            .getSyncManager()
-            .operatePipeOnDataNodesForRollback(
-                pipeName, pipeInfo.getCreateTime(), SyncOperation.STOP_PIPE, executedDataNodeIds);
-        break;
-      default:
-        LOGGER.error("Unsupported roll back STATE [{}]", state);
-    }
+  protected boolean abort(ConfigNodeProcedureEnv configNodeProcedureEnv) {
+    return false;
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StopPipeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StopPipeProcedure.java
index 0f15b04840..88f4d61edd 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StopPipeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StopPipeProcedure.java
@@ -18,35 +18,24 @@
  */
 package org.apache.iotdb.confignode.procedure.impl.sync;
 
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.sync.PipeException;
 import org.apache.iotdb.commons.sync.pipe.PipeInfo;
-import org.apache.iotdb.commons.sync.pipe.PipeStatus;
-import org.apache.iotdb.commons.sync.pipe.SyncOperation;
-import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.procedure.Procedure;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
-import org.apache.iotdb.confignode.procedure.state.sync.OperatePipeState;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
-public class StopPipeProcedure extends AbstractOperatePipeProcedure {
-  private static final Logger LOGGER = LoggerFactory.getLogger(StopPipeProcedure.class);
+// Empty procedure for old sync, used only for compatibility
+public class StopPipeProcedure extends Procedure<ConfigNodeProcedureEnv> {
 
   private String pipeName;
   private PipeInfo pipeInfo;
@@ -56,119 +45,20 @@ public class StopPipeProcedure extends AbstractOperatePipeProcedure {
     super();
   }
 
-  public StopPipeProcedure(String pipeName) throws PipeException {
-    super();
-    this.pipeName = pipeName;
-  }
-
-  @TestOnly
-  public void setPipeInfo(PipeInfo pipeInfo) {
-    this.pipeInfo = pipeInfo;
-  }
-
-  @TestOnly
-  public void setExecutedDataNodeIds(Set<Integer> executedDataNodeIds) {
-    this.executedDataNodeIds = executedDataNodeIds;
-  }
-
   @Override
-  boolean executeCheckCanSkip(ConfigNodeProcedureEnv env) throws PipeException {
-    LOGGER.info("Start to stop PIPE [{}]", pipeName);
-    pipeInfo = env.getConfigManager().getSyncManager().getPipeInfo(pipeName);
-    if (pipeInfo.getStatus().equals(PipeStatus.DROP)) {
-      throw new PipeException(
-          String.format("PIPE [%s] has been dropped and cannot be stopped again.", pipeName));
-    }
-    return pipeInfo.getStatus().equals(PipeStatus.STOP);
+  protected Procedure<ConfigNodeProcedureEnv>[] execute(
+      ConfigNodeProcedureEnv configNodeProcedureEnv)
+      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+    return new Procedure[0];
   }
 
   @Override
-  void executePreOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
-    LOGGER.info("Start to pre-stop PIPE [{}] on Config Nodes", pipeName);
-    TSStatus status =
-        env.getConfigManager().getSyncManager().setPipeStatus(pipeName, PipeStatus.PARTIAL_STOP);
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new PipeException(status.getMessage());
-    }
-  }
+  protected void rollback(ConfigNodeProcedureEnv configNodeProcedureEnv)
+      throws IOException, InterruptedException, ProcedureException {}
 
   @Override
-  void executeOperatePipeOnDataNode(ConfigNodeProcedureEnv env) throws PipeException {
-    LOGGER.info("Start to broadcast stop PIPE [{}] on Data Nodes", pipeName);
-    Map<Integer, TSStatus> responseMap =
-        env.getConfigManager()
-            .getSyncManager()
-            .operatePipeOnDataNodes(pipeName, SyncOperation.STOP_PIPE);
-    TSStatus status = RpcUtils.squashResponseStatusList(new ArrayList<>(responseMap.values()));
-    executedDataNodeIds.addAll(responseMap.keySet());
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new PipeException(
-          String.format(
-              "Fail to stop PIPE [%s] because %s.",
-              pipeName,
-              StringUtils.join(
-                  responseMap.values().stream()
-                      .filter(i -> i.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode())
-                      .map(TSStatus::getMessage)
-                      .toArray(),
-                  ", ")));
-    }
-  }
-
-  @Override
-  void executeOperatePipeOnConfigNode(ConfigNodeProcedureEnv env) throws PipeException {
-    LOGGER.info("Start to stop PIPE [{}] on Config Nodes", pipeName);
-    TSStatus status =
-        env.getConfigManager().getSyncManager().setPipeStatus(pipeName, PipeStatus.STOP);
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new PipeException(status.getMessage());
-    }
-  }
-
-  @Override
-  SyncOperation getOperation() {
-    return SyncOperation.STOP_PIPE;
-  }
-
-  @Override
-  protected boolean isRollbackSupported(OperatePipeState state) {
-    switch (state) {
-      case OPERATE_CHECK:
-      case PRE_OPERATE_PIPE_CONFIGNODE:
-      case OPERATE_PIPE_DATANODE:
-        return true;
-      default:
-        return false;
-    }
-  }
-
-  @Override
-  protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeState state)
-      throws IOException, InterruptedException, ProcedureException {
-    LOGGER.info("Roll back StopPipeProcedure at STATE [{}]", state);
-    switch (state) {
-      case OPERATE_CHECK:
-        env.getConfigManager().getSyncManager().unlockSyncMetadata();
-        break;
-      case PRE_OPERATE_PIPE_CONFIGNODE:
-        TSStatus status =
-            env.getConfigManager().getSyncManager().setPipeStatus(pipeName, PipeStatus.RUNNING);
-        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-          throw new ProcedureException(
-              String.format(
-                  "Failed to stop pipe and failed to roll back because %s. Please execute [STOP PIPE %s] manually.",
-                  status.getMessage(), pipeName));
-        }
-        break;
-      case OPERATE_PIPE_DATANODE:
-        env.getConfigManager()
-            .getSyncManager()
-            .operatePipeOnDataNodesForRollback(
-                pipeName, pipeInfo.getCreateTime(), SyncOperation.START_PIPE, executedDataNodeIds);
-        break;
-      default:
-        LOGGER.error("Unsupported roll back STATE [{}]", state);
-    }
+  protected boolean abort(ConfigNodeProcedureEnv configNodeProcedureEnv) {
+    return false;
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/sync/OperatePipeState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/pipe/task/OperatePipeTaskState.java
similarity index 80%
rename from confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/sync/OperatePipeState.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/pipe/task/OperatePipeTaskState.java
index c8a263ec43..fdbc65fe4d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/sync/OperatePipeState.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/pipe/task/OperatePipeTaskState.java
@@ -16,11 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.procedure.state.sync;
 
-public enum OperatePipeState {
-  OPERATE_CHECK,
-  PRE_OPERATE_PIPE_CONFIGNODE,
-  OPERATE_PIPE_DATANODE,
-  OPERATE_PIPE_CONFIGNODE
+package org.apache.iotdb.confignode.procedure.state.pipe.task;
+
+public enum OperatePipeTaskState {
+  VALIDATE_TASK,
+  CALCULATE_INFO_FOR_TASK,
+  OPERATE_ON_DATA_NODES,
+  WRITE_CONFIG_NODE_CONSENSUS
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index 9ef0b7422d..7e016c4dc8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -28,6 +28,10 @@ import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure
 import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
 import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
 import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
+import org.apache.iotdb.confignode.procedure.impl.pipe.task.CreatePipeProcedureV2;
+import org.apache.iotdb.confignode.procedure.impl.pipe.task.DropPipeProcedureV2;
+import org.apache.iotdb.confignode.procedure.impl.pipe.task.StartPipeProcedureV2;
+import org.apache.iotdb.confignode.procedure.impl.pipe.task.StopPipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.impl.schema.DeactivateTemplateProcedure;
 import org.apache.iotdb.confignode.procedure.impl.schema.DeleteDatabaseProcedure;
 import org.apache.iotdb.confignode.procedure.impl.schema.DeleteTimeSeriesProcedure;
@@ -102,6 +106,18 @@ public class ProcedureFactory implements IProcedureFactory {
       case DROP_PIPE_PROCEDURE:
         procedure = new DropPipeProcedure();
         break;
+      case CREATE_PIPE_PROCEDURE_V2:
+        procedure = new CreatePipeProcedureV2();
+        break;
+      case START_PIPE_PROCEDURE_V2:
+        procedure = new StartPipeProcedureV2();
+        break;
+      case STOP_PIPE_PROCEDURE_V2:
+        procedure = new StopPipeProcedureV2();
+        break;
+      case DROP_PIPE_PROCEDURE_V2:
+        procedure = new DropPipeProcedureV2();
+        break;
       case CREATE_CQ_PROCEDURE:
         procedure =
             new CreateCQProcedure(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
index c5d6054646..74869f51d3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
@@ -44,7 +44,7 @@ public enum ProcedureType {
   CREATE_TRIGGER_PROCEDURE((short) 400),
   DROP_TRIGGER_PROCEDURE((short) 401),
 
-  /** Sync */
+  /** Old sync */
   CREATE_PIPE_PROCEDURE((short) 500),
   START_PIPE_PROCEDURE((short) 501),
   STOP_PIPE_PROCEDURE((short) 502),
@@ -63,7 +63,13 @@ public enum ProcedureType {
 
   /** Pipe Plugin */
   CREATE_PIPE_PLUGIN_PROCEDURE((short) 900),
-  DROP_PIPE_PLUGIN_PROCEDURE((short) 901);
+  DROP_PIPE_PLUGIN_PROCEDURE((short) 901),
+
+  /** Pipe Task */
+  CREATE_PIPE_PROCEDURE_V2((short) 1001),
+  START_PIPE_PROCEDURE_V2((short) 1002),
+  STOP_PIPE_PROCEDURE_V2((short) 1003),
+  DROP_PIPE_PROCEDURE_V2((short) 1004);
 
   private final short typeCode;
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 39975c550d..2e80a9b0d1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -55,8 +55,6 @@ import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaRep
 import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
 import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
 import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
 import org.apache.iotdb.confignode.consensus.response.auth.PermissionInfoResp;
 import org.apache.iotdb.confignode.consensus.response.database.CountDatabaseResp;
 import org.apache.iotdb.confignode.consensus.response.database.DatabaseSchemaResp;
@@ -853,18 +851,24 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
   }
 
   @Override
+  @Deprecated
   public TSStatus createPipeSink(TPipeSinkInfo req) {
-    return configManager.createPipeSink(new CreatePipeSinkPlan(req));
+    // To be deleted
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
   @Override
+  @Deprecated
   public TSStatus dropPipeSink(TDropPipeSinkReq req) {
-    return configManager.dropPipeSink(new DropPipeSinkPlan(req.getPipeSinkName()));
+    // To be deleted
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
   @Override
+  @Deprecated
   public TGetPipeSinkResp getPipeSink(TGetPipeSinkReq req) {
-    return configManager.getPipeSink(req);
+    // To be deleted
+    return new TGetPipeSinkResp();
   }
 
   @Override
@@ -893,11 +897,13 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
   }
 
   @Override
+  @Deprecated
   public TGetAllPipeInfoResp getAllPipeInfo() {
     return configManager.getAllPipeInfo();
   }
 
   @Override
+  @Deprecated
   public TSStatus recordPipeMessage(TRecordPipeMessageReq req) {
     return configManager.recordPipeMessage(req);
   }
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 83c26ce16e..203e9f6a36 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -39,7 +39,12 @@ import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
 import org.apache.iotdb.commons.partition.SeriesPartitionTable;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeConsensusGroupTaskMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.sync.pipe.PipeInfo;
+import org.apache.iotdb.commons.sync.pipe.PipeMessage;
 import org.apache.iotdb.commons.sync.pipe.PipeStatus;
 import org.apache.iotdb.commons.sync.pipe.TsFilePipeInfo;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
@@ -85,6 +90,11 @@ import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNo
 import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
 import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
 import org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
 import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
 import org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
@@ -93,12 +103,13 @@ import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGr
 import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
 import org.apache.iotdb.confignode.consensus.request.write.region.PollRegionMaintainTaskPlan;
 import org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlanV1;
+import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlanV1;
+import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlanV1;
+import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlanV1;
+import org.apache.iotdb.confignode.consensus.request.write.sync.RecordPipeMessagePlan;
+import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlanV1;
+import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlanV1;
 import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
 import org.apache.iotdb.confignode.consensus.request.write.template.DropSchemaTemplatePlan;
 import org.apache.iotdb.confignode.consensus.request.write.template.PreUnsetSchemaTemplatePlan;
@@ -954,9 +965,9 @@ public class ConfigPhysicalPlanSerDeTest {
             .setPipeSinkName("demo")
             .setPipeSinkType("IoTDB")
             .setAttributes(attributes);
-    CreatePipeSinkPlan createPipeSinkPlan = new CreatePipeSinkPlan(pipeSinkInfo);
-    CreatePipeSinkPlan createPipeSinkPlan1 =
-        (CreatePipeSinkPlan)
+    CreatePipeSinkPlanV1 createPipeSinkPlan = new CreatePipeSinkPlanV1(pipeSinkInfo);
+    CreatePipeSinkPlanV1 createPipeSinkPlan1 =
+        (CreatePipeSinkPlanV1)
             ConfigPhysicalPlan.Factory.create(createPipeSinkPlan.serializeToByteBuffer());
     Assert.assertEquals(
         createPipeSinkPlan.getPipeSinkInfo(), createPipeSinkPlan1.getPipeSinkInfo());
@@ -964,23 +975,23 @@ public class ConfigPhysicalPlanSerDeTest {
 
   @Test
   public void DropPipeSinkPlanTest() throws IOException {
-    DropPipeSinkPlan dropPipeSinkPlan = new DropPipeSinkPlan("demo");
-    DropPipeSinkPlan dropPipeSinkPlan1 =
-        (DropPipeSinkPlan)
+    DropPipeSinkPlanV1 dropPipeSinkPlan = new DropPipeSinkPlanV1("demo");
+    DropPipeSinkPlanV1 dropPipeSinkPlan1 =
+        (DropPipeSinkPlanV1)
             ConfigPhysicalPlan.Factory.create(dropPipeSinkPlan.serializeToByteBuffer());
     Assert.assertEquals(dropPipeSinkPlan.getPipeSinkName(), dropPipeSinkPlan1.getPipeSinkName());
   }
 
   @Test
   public void GetPipeSinkPlanTest() throws IOException {
-    GetPipeSinkPlan getPipeSinkPlan = new GetPipeSinkPlan("demo");
-    GetPipeSinkPlan getPipeSinkPlan1 =
-        (GetPipeSinkPlan)
+    GetPipeSinkPlanV1 getPipeSinkPlan = new GetPipeSinkPlanV1("demo");
+    GetPipeSinkPlanV1 getPipeSinkPlan1 =
+        (GetPipeSinkPlanV1)
             ConfigPhysicalPlan.Factory.create(getPipeSinkPlan.serializeToByteBuffer());
     Assert.assertEquals(getPipeSinkPlan.getPipeSinkName(), getPipeSinkPlan1.getPipeSinkName());
-    GetPipeSinkPlan getPipeSinkPlanWithNullName = new GetPipeSinkPlan();
-    GetPipeSinkPlan getPipeSinkPlanWithNullName1 =
-        (GetPipeSinkPlan)
+    GetPipeSinkPlanV1 getPipeSinkPlanWithNullName = new GetPipeSinkPlanV1();
+    GetPipeSinkPlanV1 getPipeSinkPlanWithNullName1 =
+        (GetPipeSinkPlanV1)
             ConfigPhysicalPlan.Factory.create(getPipeSinkPlanWithNullName.serializeToByteBuffer());
     Assert.assertEquals(
         getPipeSinkPlanWithNullName.getPipeSinkName(),
@@ -992,37 +1003,120 @@ public class ConfigPhysicalPlanSerDeTest {
     PipeInfo pipeInfo =
         new TsFilePipeInfo(
             "name", "demo", PipeStatus.PARTIAL_CREATE, System.currentTimeMillis(), 999, false);
-    PreCreatePipePlan PreCreatePipePlan = new PreCreatePipePlan(pipeInfo);
-    PreCreatePipePlan PreCreatePipePlan1 =
-        (PreCreatePipePlan)
+    PreCreatePipePlanV1 PreCreatePipePlan = new PreCreatePipePlanV1(pipeInfo);
+    PreCreatePipePlanV1 PreCreatePipePlan1 =
+        (PreCreatePipePlanV1)
             ConfigPhysicalPlan.Factory.create(PreCreatePipePlan.serializeToByteBuffer());
     Assert.assertEquals(PreCreatePipePlan.getPipeInfo(), PreCreatePipePlan1.getPipeInfo());
   }
 
   @Test
-  public void SetPipeStatusPlan() throws IOException {
-    SetPipeStatusPlan setPipeStatusPlan = new SetPipeStatusPlan("pipe", PipeStatus.PARTIAL_CREATE);
-    SetPipeStatusPlan setPipeStatusPlan1 =
-        (SetPipeStatusPlan)
+  public void RecordPipeMessagePlanTest() throws IOException {
+    RecordPipeMessagePlan recordPipeMessagePlan =
+        new RecordPipeMessagePlan(
+            "testPipe", new PipeMessage(PipeMessage.PipeMessageType.ERROR, "testError"));
+    RecordPipeMessagePlan recordPipeMessagePlan1 =
+        (RecordPipeMessagePlan)
+            ConfigPhysicalPlan.Factory.create(recordPipeMessagePlan.serializeToByteBuffer());
+    Assert.assertEquals(recordPipeMessagePlan.getPipeName(), recordPipeMessagePlan1.getPipeName());
+    Assert.assertEquals(
+        recordPipeMessagePlan.getPipeMessage().getType(),
+        recordPipeMessagePlan1.getPipeMessage().getType());
+    Assert.assertEquals(
+        recordPipeMessagePlan.getPipeMessage().getMessage(),
+        recordPipeMessagePlan1.getPipeMessage().getMessage());
+  }
+
+  @Test
+  public void SetPipeStatusPlanTest() throws IOException {
+    SetPipeStatusPlanV1 setPipeStatusPlan =
+        new SetPipeStatusPlanV1("pipe", PipeStatus.PARTIAL_CREATE);
+    SetPipeStatusPlanV1 setPipeStatusPlan1 =
+        (SetPipeStatusPlanV1)
             ConfigPhysicalPlan.Factory.create(setPipeStatusPlan.serializeToByteBuffer());
     Assert.assertEquals(setPipeStatusPlan.getPipeName(), setPipeStatusPlan1.getPipeName());
     Assert.assertEquals(setPipeStatusPlan.getPipeStatus(), setPipeStatusPlan1.getPipeStatus());
   }
 
+  @Test
+  public void CreatePipePlanV2Test() throws IOException {
+    Map<String, String> collectorAttributes = new HashMap<>();
+    Map<String, String> processorAttributes = new HashMap<>();
+    Map<String, String> connectorAttributes = new HashMap<>();
+    collectorAttributes.put("collector", "org.apache.iotdb.pipe.collector.DefaultCollector");
+    processorAttributes.put("processor", "org.apache.iotdb.pipe.processor.SDTFilterProcessor");
+    connectorAttributes.put("connector", "org.apache.iotdb.pipe.protocal.ThriftTransporter");
+    PipeConsensusGroupTaskMeta pipeConsensusGroupTaskMeta = new PipeConsensusGroupTaskMeta(0, 1);
+    Map<TConsensusGroupId, PipeConsensusGroupTaskMeta> pipeTasks = new HashMap<>();
+    pipeTasks.put(new TConsensusGroupId(DataRegion, 1), pipeConsensusGroupTaskMeta);
+    PipeStaticMeta pipeStaticMeta =
+        new PipeStaticMeta(
+            "testPipe", 121, collectorAttributes, processorAttributes, connectorAttributes);
+    PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
+    CreatePipePlanV2 createPipePlanV2 = new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta);
+    CreatePipePlanV2 createPipePlanV21 =
+        (CreatePipePlanV2)
+            ConfigPhysicalPlan.Factory.create(createPipePlanV2.serializeToByteBuffer());
+    Assert.assertEquals(
+        createPipePlanV2.getPipeStaticMeta(), createPipePlanV21.getPipeStaticMeta());
+  }
+
+  @Test
+  public void SetPipeStatusPlanV2Test() throws IOException {
+    SetPipeStatusPlanV2 setPipeStatusPlanV2 =
+        new SetPipeStatusPlanV2("pipe", org.apache.iotdb.commons.pipe.task.meta.PipeStatus.RUNNING);
+    SetPipeStatusPlanV2 setPipeStatusPlanV21 =
+        (SetPipeStatusPlanV2)
+            ConfigPhysicalPlan.Factory.create(setPipeStatusPlanV2.serializeToByteBuffer());
+    Assert.assertEquals(setPipeStatusPlanV2.getPipeName(), setPipeStatusPlanV21.getPipeName());
+    Assert.assertEquals(setPipeStatusPlanV2.getPipeStatus(), setPipeStatusPlanV21.getPipeStatus());
+  }
+
+  @Test
+  public void DropPipePlanV2Test() throws IOException {
+    DropPipePlanV2 dropPipePlanV2 = new DropPipePlanV2("demo");
+    DropPipePlanV2 dropPipePlanV21 =
+        (DropPipePlanV2) ConfigPhysicalPlan.Factory.create(dropPipePlanV2.serializeToByteBuffer());
+    Assert.assertEquals(dropPipePlanV2.getPipeName(), dropPipePlanV21.getPipeName());
+  }
+
   @Test
   public void ShowPipePlanTest() throws IOException {
-    ShowPipePlan showPipePlan = new ShowPipePlan("demo");
-    ShowPipePlan showPipePlan1 =
-        (ShowPipePlan) ConfigPhysicalPlan.Factory.create(showPipePlan.serializeToByteBuffer());
+    ShowPipePlanV1 showPipePlan = new ShowPipePlanV1("demo");
+    ShowPipePlanV1 showPipePlan1 =
+        (ShowPipePlanV1) ConfigPhysicalPlan.Factory.create(showPipePlan.serializeToByteBuffer());
     Assert.assertEquals(showPipePlan.getPipeName(), showPipePlan1.getPipeName());
-    ShowPipePlan showPipePlanWithNullName = new ShowPipePlan();
-    ShowPipePlan showPipePlanWithNullName1 =
-        (ShowPipePlan)
+    ShowPipePlanV1 showPipePlanWithNullName = new ShowPipePlanV1();
+    ShowPipePlanV1 showPipePlanWithNullName1 =
+        (ShowPipePlanV1)
             ConfigPhysicalPlan.Factory.create(showPipePlanWithNullName.serializeToByteBuffer());
     Assert.assertEquals(
         showPipePlanWithNullName.getPipeName(), showPipePlanWithNullName1.getPipeName());
   }
 
+  @Test
+  public void CreatePipePluginPlanTest() throws IOException {
+    CreatePipePluginPlan createPipePluginPlan =
+        new CreatePipePluginPlan(
+            new PipePluginMeta("testPlugin", "org.apache.iotdb.testJar", "testJar", "???"),
+            new Binary("123"));
+    CreatePipePluginPlan createPipePluginPlan1 =
+        (CreatePipePluginPlan)
+            ConfigPhysicalPlan.Factory.create(createPipePluginPlan.serializeToByteBuffer());
+    Assert.assertEquals(
+        createPipePluginPlan.getPipePluginMeta(), createPipePluginPlan1.getPipePluginMeta());
+    Assert.assertEquals(createPipePluginPlan.getJarFile(), createPipePluginPlan1.getJarFile());
+  }
+
+  @Test
+  public void DropPipePluginPlanTest() throws IOException {
+    DropPipePluginPlan dropPipePluginPlan = new DropPipePluginPlan("testPlugin");
+    DropPipePluginPlan dropPipePluginPlan1 =
+        (DropPipePluginPlan)
+            ConfigPhysicalPlan.Factory.create(dropPipePluginPlan.serializeToByteBuffer());
+    Assert.assertEquals(dropPipePluginPlan.getPluginName(), dropPipePluginPlan1.getPluginName());
+  }
+
   @Test
   public void GetTriggerTablePlanTest() throws IOException {
     GetTriggerTablePlan getTriggerTablePlan0 = new GetTriggerTablePlan(true);
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSyncInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSyncInfoTest.java
deleted file mode 100644
index d69eeec641..0000000000
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSyncInfoTest.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.persistence;
-
-import org.apache.iotdb.commons.exception.sync.PipeSinkException;
-import org.apache.iotdb.commons.sync.pipesink.PipeSink;
-import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan;
-import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan;
-import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
-import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.iotdb.db.constant.TestConstant.BASE_OUTPUT_PATH;
-
-public class ClusterSyncInfoTest {
-
-  private ClusterSyncInfo clusterSyncInfo;
-  private static final File snapshotDir = new File(BASE_OUTPUT_PATH, "snapshot");
-
-  @Before
-  public void setup() throws IOException {
-    clusterSyncInfo = new ClusterSyncInfo();
-    if (!snapshotDir.exists()) {
-      snapshotDir.mkdirs();
-    }
-  }
-
-  @After
-  public void cleanup() throws IOException {
-    if (snapshotDir.exists()) {
-      FileUtils.deleteDirectory(snapshotDir);
-    }
-  }
-
-  private void prepareClusterSyncInfo() {
-    Map<String, String> attributes1 = new HashMap<>();
-    attributes1.put("ip", "192.168.11.11");
-    attributes1.put("port", "7766");
-    TPipeSinkInfo pipeSinkInfo1 =
-        new TPipeSinkInfo()
-            .setPipeSinkName("demo1")
-            .setPipeSinkType("IoTDB")
-            .setAttributes(attributes1);
-    Map<String, String> attributes2 = new HashMap<>();
-    attributes2.put("ip", "192.168.22.2");
-    attributes2.put("port", "7777");
-    TPipeSinkInfo pipeSinkInfo2 =
-        new TPipeSinkInfo()
-            .setPipeSinkName("demo2")
-            .setPipeSinkType("IoTDB")
-            .setAttributes(attributes2);
-
-    clusterSyncInfo.addPipeSink(new CreatePipeSinkPlan(pipeSinkInfo1));
-    clusterSyncInfo.addPipeSink(new CreatePipeSinkPlan(pipeSinkInfo2));
-  }
-
-  @Test
-  public void testEmptySnapshot() throws Exception {
-    // test empty snapshot
-    Assert.assertTrue(clusterSyncInfo.processTakeSnapshot(snapshotDir));
-    ClusterSyncInfo clusterSyncInfo2 = new ClusterSyncInfo();
-    clusterSyncInfo2.processLoadSnapshot(snapshotDir);
-
-    List<PipeSink> expectedPipeSink =
-        clusterSyncInfo.getPipeSink(new GetPipeSinkPlan()).getPipeSinkList();
-    List<PipeSink> actualPipeSink =
-        clusterSyncInfo2.getPipeSink(new GetPipeSinkPlan()).getPipeSinkList();
-    Assert.assertEquals(expectedPipeSink, actualPipeSink);
-  }
-
-  @Test
-  public void testSnapshot() throws Exception {
-    // test snapshot with data
-    prepareClusterSyncInfo();
-    Assert.assertTrue(clusterSyncInfo.processTakeSnapshot(snapshotDir));
-    ClusterSyncInfo clusterSyncInfo2 = new ClusterSyncInfo();
-    clusterSyncInfo2.processLoadSnapshot(snapshotDir);
-
-    List<PipeSink> expectedPipeSink =
-        clusterSyncInfo.getPipeSink(new GetPipeSinkPlan()).getPipeSinkList();
-    List<PipeSink> actualPipeSink =
-        clusterSyncInfo2.getPipeSink(new GetPipeSinkPlan()).getPipeSinkList();
-    Assert.assertEquals(expectedPipeSink, actualPipeSink);
-  }
-
-  @Test
-  public void testPipeSinkOperation() {
-    prepareClusterSyncInfo();
-    Map<String, String> attributes1 = new HashMap<>();
-    attributes1.put("ip", "192.168.11.11");
-    attributes1.put("port", "7766");
-    Map<String, String> attributes2 = new HashMap<>();
-    attributes2.put("ip", "Nonstandard");
-    attributes2.put("port", "7777");
-    TPipeSinkInfo alreadyExistSink =
-        new TPipeSinkInfo()
-            .setPipeSinkName("demo1")
-            .setPipeSinkType("IoTDB")
-            .setAttributes(attributes1);
-    TPipeSinkInfo errorAttributeSink =
-        new TPipeSinkInfo()
-            .setPipeSinkName("demo3")
-            .setPipeSinkType("IoTDB")
-            .setAttributes(attributes2);
-    TPipeSinkInfo nonExistSink =
-        new TPipeSinkInfo()
-            .setPipeSinkName("demo3")
-            .setPipeSinkType("IoTDB")
-            .setAttributes(attributes1);
-
-    try {
-      clusterSyncInfo.checkAddPipeSink(new CreatePipeSinkPlan(alreadyExistSink));
-      Assert.fail("checkOperatePipeSink ignore failure.");
-    } catch (PipeSinkException e) {
-      // nothing
-    }
-    try {
-      clusterSyncInfo.checkAddPipeSink(new CreatePipeSinkPlan(alreadyExistSink));
-      Assert.fail("checkOperatePipeSink ignore failure.");
-    } catch (PipeSinkException e) {
-      // nothing
-    }
-    try {
-      clusterSyncInfo.checkAddPipeSink(new CreatePipeSinkPlan(errorAttributeSink));
-      Assert.fail("checkOperatePipeSink ignore failure.");
-    } catch (PipeSinkException e) {
-      // nothing
-    }
-
-    try {
-      clusterSyncInfo.checkAddPipeSink(new CreatePipeSinkPlan(nonExistSink));
-      clusterSyncInfo.checkDropPipeSink("demo1");
-    } catch (PipeSinkException e) {
-      Assert.fail("checkOperatePipeSink should not throw exception.");
-    }
-  }
-}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
new file mode 100644
index 0000000000..6384652baa
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeConsensusGroupTaskMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
+import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.iotdb.common.rpc.thrift.TConsensusGroupType.DataRegion;
+import static org.apache.iotdb.db.constant.TestConstant.BASE_OUTPUT_PATH;
+
+public class PipeInfoTest {
+
+  private static PipeInfo pipeInfo;
+  private static final File snapshotDir = new File(BASE_OUTPUT_PATH, "snapshot");
+
+  @Before
+  public void setup() throws IOException {
+    pipeInfo = new PipeInfo();
+    if (!snapshotDir.exists()) {
+      snapshotDir.mkdirs();
+    }
+  }
+
+  @After
+  public void cleanup() throws IOException {
+    if (snapshotDir.exists()) {
+      FileUtils.deleteDirectory(snapshotDir);
+    }
+  }
+
+  @Test
+  public void testSnapshot() throws TException, IOException {
+    Map<String, String> collectorAttributes = new HashMap<>();
+    Map<String, String> processorAttributes = new HashMap<>();
+    Map<String, String> connectorAttributes = new HashMap<>();
+    collectorAttributes.put("collector", "org.apache.iotdb.pipe.collector.DefaultCollector");
+    processorAttributes.put("processor", "org.apache.iotdb.pipe.processor.SDTFilterProcessor");
+    connectorAttributes.put("connector", "org.apache.iotdb.pipe.protocal.ThriftTransporter");
+    PipeConsensusGroupTaskMeta pipeConsensusGroupTaskMeta = new PipeConsensusGroupTaskMeta(0, 1);
+    Map<TConsensusGroupId, PipeConsensusGroupTaskMeta> pipeTasks = new HashMap<>();
+    pipeTasks.put(new TConsensusGroupId(DataRegion, 1), pipeConsensusGroupTaskMeta);
+    PipeStaticMeta pipeStaticMeta =
+        new PipeStaticMeta(
+            "testPipe", 121, collectorAttributes, processorAttributes, connectorAttributes);
+    PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
+    CreatePipePlanV2 createPipePlanV2 = new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta);
+    pipeInfo.getPipeTaskInfo().createPipe(createPipePlanV2);
+
+    CreatePipePluginPlan createPipePluginPlan =
+        new CreatePipePluginPlan(
+            new PipePluginMeta("testPlugin", "org.apache.iotdb.testJar", "testJar", "???"),
+            new Binary("123"));
+    pipeInfo.getPipePluginInfo().createPipePlugin(createPipePluginPlan);
+
+    pipeInfo.processTakeSnapshot(snapshotDir);
+
+    PipeInfo pipeInfo1 = new PipeInfo();
+    pipeInfo1.processLoadSnapshot(snapshotDir);
+
+    Assert.assertEquals(pipeInfo.toString(), pipeInfo1.toString());
+    Assert.assertEquals(pipeInfo, pipeInfo1);
+  }
+}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/CreatePipePluginProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java
similarity index 93%
copy from confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/CreatePipePluginProcedureTest.java
copy to confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java
index cda15ec592..729e3a01f7 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/CreatePipePluginProcedureTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java
@@ -17,10 +17,9 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.procedure.impl.pipe;
+package org.apache.iotdb.confignode.procedure.impl.pipe.plugin;
 
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
-import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedureTest.java
similarity index 92%
copy from confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java
copy to confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedureTest.java
index 02f7c856f2..6135aa0ab6 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedureTest.java
@@ -17,9 +17,8 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.procedure.impl.pipe;
+package org.apache.iotdb.confignode.procedure.impl.pipe.plugin;
 
-import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/CreatePipePluginProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java
similarity index 57%
rename from confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/CreatePipePluginProcedureTest.java
rename to confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java
index cda15ec592..cf151fe595 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/CreatePipePluginProcedureTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java
@@ -17,42 +17,51 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.procedure.impl.pipe;
+package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
-import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
-import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
-import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import org.junit.Test;
 
 import java.io.DataOutputStream;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-public class CreatePipePluginProcedureTest {
+public class CreatePipeProcedureV2Test {
   @Test
   public void serializeDeserializeTest() {
     PublicBAOS byteArrayOutputStream = new PublicBAOS();
     DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
 
-    PipePluginMeta pipePluginMeta =
-        new PipePluginMeta("test", "test.class", "test.jar", "testMD5test");
-    CreatePipePluginProcedure proc =
-        new CreatePipePluginProcedure(pipePluginMeta, new byte[] {1, 2, 3});
+    Map<String, String> collectorAttributes = new HashMap<>();
+    Map<String, String> processorAttributes = new HashMap<>();
+    Map<String, String> connectorAttributes = new HashMap<>();
+    collectorAttributes.put("collector", "org.apache.iotdb.pipe.collector.DefaultCollector");
+    processorAttributes.put("processor", "org.apache.iotdb.pipe.processor.SDTFilterProcessor");
+    connectorAttributes.put("connector", "org.apache.iotdb.pipe.protocal.ThriftTransporter");
+
+    CreatePipeProcedureV2 proc =
+        new CreatePipeProcedureV2(
+            new TCreatePipeReq()
+                .setPipeName("testPipe")
+                .setCollectorAttributes(collectorAttributes)
+                .setProcessorAttributes(processorAttributes)
+                .setConnectorAttributes(connectorAttributes));
 
     try {
       proc.serialize(outputStream);
       ByteBuffer buffer =
           ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
-      CreatePipePluginProcedure proc2 =
-          (CreatePipePluginProcedure) ProcedureFactory.getInstance().create(buffer);
+      CreatePipeProcedureV2 proc2 =
+          (CreatePipeProcedureV2) ProcedureFactory.getInstance().create(buffer);
 
       assertEquals(proc, proc2);
-      assertEquals(new Binary(proc.getJarFile()), new Binary(proc2.getJarFile()));
     } catch (Exception e) {
       fail();
     }
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2Test.java
similarity index 80%
copy from confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java
copy to confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2Test.java
index 02f7c856f2..427d147833 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2Test.java
@@ -17,9 +17,8 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.procedure.impl.pipe;
+package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
-import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
@@ -31,20 +30,21 @@ import java.nio.ByteBuffer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-public class DropPipePluginProcedureTest {
+public class DropPipeProcedureV2Test {
   @Test
   public void serializeDeserializeTest() {
     PublicBAOS byteArrayOutputStream = new PublicBAOS();
     DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
 
-    DropPipePluginProcedure proc = new DropPipePluginProcedure("test");
+    DropPipeProcedureV2 proc = new DropPipeProcedureV2("testPipe");
 
     try {
       proc.serialize(outputStream);
       ByteBuffer buffer =
           ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
-      DropPipePluginProcedure proc2 =
-          (DropPipePluginProcedure) ProcedureFactory.getInstance().create(buffer);
+      DropPipeProcedureV2 proc2 =
+          (DropPipeProcedureV2) ProcedureFactory.getInstance().create(buffer);
+
       assertEquals(proc, proc2);
     } catch (Exception e) {
       fail();
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2Test.java
similarity index 80%
copy from confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java
copy to confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2Test.java
index 02f7c856f2..2f47718514 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2Test.java
@@ -17,9 +17,8 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.procedure.impl.pipe;
+package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
-import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
@@ -31,20 +30,21 @@ import java.nio.ByteBuffer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-public class DropPipePluginProcedureTest {
+public class StartPipeProcedureV2Test {
   @Test
   public void serializeDeserializeTest() {
     PublicBAOS byteArrayOutputStream = new PublicBAOS();
     DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
 
-    DropPipePluginProcedure proc = new DropPipePluginProcedure("test");
+    StartPipeProcedureV2 proc = new StartPipeProcedureV2("testPipe");
 
     try {
       proc.serialize(outputStream);
       ByteBuffer buffer =
           ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
-      DropPipePluginProcedure proc2 =
-          (DropPipePluginProcedure) ProcedureFactory.getInstance().create(buffer);
+      StartPipeProcedureV2 proc2 =
+          (StartPipeProcedureV2) ProcedureFactory.getInstance().create(buffer);
+
       assertEquals(proc, proc2);
     } catch (Exception e) {
       fail();
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2Test.java
similarity index 80%
rename from confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java
rename to confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2Test.java
index 02f7c856f2..c02417cde1 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2Test.java
@@ -17,9 +17,8 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.procedure.impl.pipe;
+package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
-import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
@@ -31,20 +30,21 @@ import java.nio.ByteBuffer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-public class DropPipePluginProcedureTest {
+public class StopPipeProcedureV2Test {
   @Test
   public void serializeDeserializeTest() {
     PublicBAOS byteArrayOutputStream = new PublicBAOS();
     DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
 
-    DropPipePluginProcedure proc = new DropPipePluginProcedure("test");
+    StopPipeProcedureV2 proc = new StopPipeProcedureV2("testPipe");
 
     try {
       proc.serialize(outputStream);
       ByteBuffer buffer =
           ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
-      DropPipePluginProcedure proc2 =
-          (DropPipePluginProcedure) ProcedureFactory.getInstance().create(buffer);
+      StopPipeProcedureV2 proc2 =
+          (StopPipeProcedureV2) ProcedureFactory.getInstance().create(buffer);
+
       assertEquals(proc, proc2);
     } catch (Exception e) {
       fail();
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java
index d112eff23f..c02fdeadb6 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java
@@ -30,12 +30,12 @@ import java.util.Map;
 public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
 
   protected final Map<String, String> jarNameToMd5Map;
-  protected final Map<String, Integer> jarNameToReferrenceCountMap;
+  protected final Map<String, Integer> jarNameToReferenceCountMap;
 
   public ConfigNodePipePluginMetaKeeper() {
     super();
     jarNameToMd5Map = new HashMap<>();
-    jarNameToReferrenceCountMap = new HashMap<>();
+    jarNameToReferenceCountMap = new HashMap<>();
   }
 
   public synchronized boolean containsJar(String jarName) {
@@ -47,22 +47,22 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
   }
 
   public synchronized void addJarNameAndMd5(String jarName, String md5) {
-    if (jarNameToReferrenceCountMap.containsKey(jarName)) {
-      jarNameToReferrenceCountMap.put(jarName, jarNameToReferrenceCountMap.get(jarName) + 1);
+    if (jarNameToReferenceCountMap.containsKey(jarName)) {
+      jarNameToReferenceCountMap.put(jarName, jarNameToReferenceCountMap.get(jarName) + 1);
     } else {
-      jarNameToReferrenceCountMap.put(jarName, 1);
+      jarNameToReferenceCountMap.put(jarName, 1);
       jarNameToMd5Map.put(jarName, md5);
     }
   }
 
   public synchronized void removeJarNameAndMd5IfPossible(String jarName) {
-    if (jarNameToReferrenceCountMap.containsKey(jarName)) {
-      int count = jarNameToReferrenceCountMap.get(jarName);
+    if (jarNameToReferenceCountMap.containsKey(jarName)) {
+      int count = jarNameToReferenceCountMap.get(jarName);
       if (count == 1) {
-        jarNameToReferrenceCountMap.remove(jarName);
+        jarNameToReferenceCountMap.remove(jarName);
         jarNameToMd5Map.remove(jarName);
       } else {
-        jarNameToReferrenceCountMap.put(jarName, count - 1);
+        jarNameToReferenceCountMap.put(jarName, count - 1);
       }
     }
   }
@@ -72,7 +72,7 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
     for (Map.Entry<String, String> entry : jarNameToMd5Map.entrySet()) {
       ReadWriteIOUtils.write(entry.getKey(), outputStream);
       ReadWriteIOUtils.write(entry.getValue(), outputStream);
-      ReadWriteIOUtils.write(jarNameToReferrenceCountMap.get(entry.getKey()), outputStream);
+      ReadWriteIOUtils.write(jarNameToReferenceCountMap.get(entry.getKey()), outputStream);
     }
 
     ReadWriteIOUtils.write(pipeNameToPipeMetaMap.size(), outputStream);
@@ -90,7 +90,7 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
       final String md5 = ReadWriteIOUtils.readString(inputStream);
       final int count = ReadWriteIOUtils.readInt(inputStream);
       jarNameToMd5Map.put(jarName, md5);
-      jarNameToReferrenceCountMap.put(jarName, count);
+      jarNameToReferenceCountMap.put(jarName, count);
     }
 
     final int pipePluginMetaSize = ReadWriteIOUtils.readInt(inputStream);
@@ -100,9 +100,9 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
     }
   }
 
-  private void clear() {
+  public void clear() {
     pipeNameToPipeMetaMap.clear();
     jarNameToMd5Map.clear();
-    jarNameToReferrenceCountMap.clear();
+    jarNameToReferenceCountMap.clear();
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
index d413c1632f..4417141106 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.commons.pipe.plugin.meta;
 
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 
 public abstract class PipePluginMetaKeeper {
@@ -49,4 +50,25 @@ public abstract class PipePluginMetaKeeper {
   public boolean containsPipePlugin(String pluginName) {
     return pipeNameToPipeMetaMap.containsKey(pluginName.toUpperCase());
   }
+
+  public void clear() {
+    pipeNameToPipeMetaMap.clear();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    PipePluginMetaKeeper that = (PipePluginMetaKeeper) o;
+    return pipeNameToPipeMetaMap.equals(that.pipeNameToPipeMetaMap);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(pipeNameToPipeMetaMap);
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeConsensusGroupTaskMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeConsensusGroupTaskMeta.java
new file mode 100644
index 0000000000..39bc9d3c5f
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeConsensusGroupTaskMeta.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.task.meta;
+
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class PipeConsensusGroupTaskMeta {
+
+  // TODO: replace it with consensus index
+  private final AtomicLong index = new AtomicLong(0L);
+  private final AtomicInteger regionLeader = new AtomicInteger(0);
+
+  private PipeConsensusGroupTaskMeta() {}
+
+  public PipeConsensusGroupTaskMeta(long index, int regionLeader) {
+    this.index.set(index);
+    this.regionLeader.set(regionLeader);
+  }
+
+  public long getIndex() {
+    return index.get();
+  }
+
+  public int getRegionLeader() {
+    return regionLeader.get();
+  }
+
+  public void setIndex(long index) {
+    this.index.set(index);
+  }
+
+  public void setRegionLeader(int regionLeader) {
+    this.regionLeader.set(regionLeader);
+  }
+
+  public void serialize(DataOutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write(index.get(), outputStream);
+    ReadWriteIOUtils.write(regionLeader.get(), outputStream);
+  }
+
+  public static PipeConsensusGroupTaskMeta deserialize(ByteBuffer byteBuffer) {
+    final PipeConsensusGroupTaskMeta PipeConsensusGroupTaskMeta = new PipeConsensusGroupTaskMeta();
+    PipeConsensusGroupTaskMeta.index.set(ReadWriteIOUtils.readLong(byteBuffer));
+    PipeConsensusGroupTaskMeta.regionLeader.set(ReadWriteIOUtils.readInt(byteBuffer));
+    return PipeConsensusGroupTaskMeta;
+  }
+
+  public static PipeConsensusGroupTaskMeta deserialize(InputStream inputStream) throws IOException {
+    return deserialize(
+        ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(inputStream)));
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    PipeConsensusGroupTaskMeta that = (PipeConsensusGroupTaskMeta) obj;
+    return index.get() == that.index.get() && regionLeader.get() == that.regionLeader.get();
+  }
+
+  @Override
+  public int hashCode() {
+    return (int) (index.get() * 31 + regionLeader.get());
+  }
+
+  @Override
+  public String toString() {
+    return "PipeTask{" + "index='" + index + '\'' + ", regionLeader='" + regionLeader + '\'' + '}';
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
new file mode 100644
index 0000000000..f75237bbdc
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.task.meta;
+
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class PipeMeta {
+
+  private final PipeStaticMeta staticMeta;
+  private final PipeRuntimeMeta runtimeMeta;
+
+  public PipeMeta(PipeStaticMeta staticMeta, PipeRuntimeMeta runtimeMeta) {
+    this.staticMeta = staticMeta;
+    this.runtimeMeta = runtimeMeta;
+  }
+
+  public PipeStaticMeta getStaticMeta() {
+    return staticMeta;
+  }
+
+  public PipeRuntimeMeta getRuntimeMeta() {
+    return runtimeMeta;
+  }
+
+  public ByteBuffer serialize() throws IOException {
+    PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+    serialize(outputStream);
+    return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+  }
+
+  public void serialize(DataOutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write(staticMeta.serialize(), outputStream);
+    ReadWriteIOUtils.write(runtimeMeta.serialize(), outputStream);
+  }
+
+  public void serialize(FileOutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write(staticMeta.serialize(), outputStream);
+    ReadWriteIOUtils.write(runtimeMeta.serialize(), outputStream);
+  }
+
+  public static PipeMeta deserialize(FileInputStream fileInputStream) throws IOException {
+    PipeStaticMeta staticMeta = PipeStaticMeta.deserialize(fileInputStream);
+    PipeRuntimeMeta runtimeMeta = PipeRuntimeMeta.deserialize(fileInputStream);
+    return new PipeMeta(staticMeta, runtimeMeta);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    PipeMeta pipeMeta = (PipeMeta) o;
+    return Objects.equals(staticMeta, pipeMeta.staticMeta)
+        && Objects.equals(runtimeMeta, pipeMeta.runtimeMeta);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(staticMeta, runtimeMeta);
+  }
+
+  @Override
+  public String toString() {
+    return "PipeMeta{" + "staticMeta=" + staticMeta + ", runtimeMeta=" + runtimeMeta + '}';
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
new file mode 100644
index 0000000000..b45286abaf
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.pipe.task.meta;
+
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PipeMetaKeeper {
+
+  protected final Map<String, PipeMeta> pipeNameToPipeMetaMap;
+
+  public PipeMetaKeeper() {
+    pipeNameToPipeMetaMap = new ConcurrentHashMap<>();
+  }
+
+  public void addPipeMeta(String pipeName, PipeMeta pipeMeta) {
+    pipeNameToPipeMetaMap.put(pipeName, pipeMeta);
+  }
+
+  public PipeMeta getPipeMeta(String pipeName) {
+    return pipeNameToPipeMetaMap.get(pipeName);
+  }
+
+  public void removePipeMeta(String pipeName) {
+    pipeNameToPipeMetaMap.remove(pipeName);
+  }
+
+  public boolean containsPipeMeta(String pipeName) {
+    return pipeNameToPipeMetaMap.containsKey(pipeName);
+  }
+
+  public void clear() {
+    this.pipeNameToPipeMetaMap.clear();
+  }
+
+  public void processTakeSnapshot(FileOutputStream fileOutputStream) throws IOException {
+    ReadWriteIOUtils.write(pipeNameToPipeMetaMap.size(), fileOutputStream);
+    for (Map.Entry<String, PipeMeta> entry : pipeNameToPipeMetaMap.entrySet()) {
+      ReadWriteIOUtils.write(entry.getKey(), fileOutputStream);
+      entry.getValue().serialize(fileOutputStream);
+    }
+  }
+
+  public void processLoadSnapshot(FileInputStream fileInputStream) throws IOException {
+    clear();
+
+    final int size = ReadWriteIOUtils.readInt(fileInputStream);
+    for (int i = 0; i < size; i++) {
+      final String pipeName = ReadWriteIOUtils.readString(fileInputStream);
+      pipeNameToPipeMetaMap.put(pipeName, PipeMeta.deserialize(fileInputStream));
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    PipeMetaKeeper that = (PipeMetaKeeper) o;
+    return Objects.equals(pipeNameToPipeMetaMap, that.pipeNameToPipeMetaMap);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(pipeNameToPipeMetaMap);
+  }
+
+  @Override
+  public String toString() {
+    return "PipeMetaKeeper{" + "pipeNameToPipeMetaMap=" + pipeNameToPipeMetaMap + '}';
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMeta.java
new file mode 100644
index 0000000000..faa01c67a6
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMeta.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.task.meta;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class PipeRuntimeMeta {
+
+  private final AtomicReference<PipeStatus> status;
+  private final List<String> exceptionMessages;
+  private final Map<TConsensusGroupId, PipeConsensusGroupTaskMeta> consensusGroupIdToTaskMetaMap;
+
+  public PipeRuntimeMeta() {
+    status = new AtomicReference<>(PipeStatus.STOPPED);
+    exceptionMessages = new LinkedList<>();
+    consensusGroupIdToTaskMetaMap = new ConcurrentHashMap<>();
+  }
+
+  public PipeRuntimeMeta(
+      Map<TConsensusGroupId, PipeConsensusGroupTaskMeta> consensusGroupIdToTaskMetaMap) {
+    status = new AtomicReference<>(PipeStatus.STOPPED);
+    exceptionMessages = new LinkedList<>();
+    this.consensusGroupIdToTaskMetaMap = consensusGroupIdToTaskMetaMap;
+  }
+
+  public AtomicReference<PipeStatus> getStatus() {
+    return status;
+  }
+
+  public List<String> getExceptionMessages() {
+    return exceptionMessages;
+  }
+
+  public Map<TConsensusGroupId, PipeConsensusGroupTaskMeta> getConsensusGroupIdToTaskMetaMap() {
+    return consensusGroupIdToTaskMetaMap;
+  }
+
+  public ByteBuffer serialize() throws IOException {
+    PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+    serialize(outputStream);
+    return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+  }
+
+  public void serialize(DataOutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write(status.get().getType(), outputStream);
+
+    // ignore exception messages
+
+    ReadWriteIOUtils.write(consensusGroupIdToTaskMetaMap.size(), outputStream);
+    for (Map.Entry<TConsensusGroupId, PipeConsensusGroupTaskMeta> entry :
+        consensusGroupIdToTaskMetaMap.entrySet()) {
+      ReadWriteIOUtils.write(entry.getKey().getId(), outputStream);
+      entry.getValue().serialize(outputStream);
+    }
+  }
+
+  public static PipeRuntimeMeta deserialize(InputStream inputStream) throws IOException {
+    return deserialize(
+        ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(inputStream)));
+  }
+
+  public static PipeRuntimeMeta deserialize(ByteBuffer byteBuffer) throws IOException {
+    final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta();
+
+    pipeRuntimeMeta.status.set(PipeStatus.getPipeStatus(ReadWriteIOUtils.readByte(byteBuffer)));
+
+    // ignore exception messages
+
+    final int size = ReadWriteIOUtils.readInt(byteBuffer);
+    for (int i = 0; i < size; ++i) {
+      pipeRuntimeMeta.consensusGroupIdToTaskMetaMap.put(
+          new TConsensusGroupId(
+              TConsensusGroupType.DataRegion, ReadWriteIOUtils.readInt(byteBuffer)),
+          PipeConsensusGroupTaskMeta.deserialize(byteBuffer));
+    }
+
+    return pipeRuntimeMeta;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    PipeRuntimeMeta that = (PipeRuntimeMeta) o;
+    return Objects.equals(status.get().getType(), that.status.get().getType())
+        && exceptionMessages.equals(that.exceptionMessages)
+        && consensusGroupIdToTaskMetaMap.equals(that.consensusGroupIdToTaskMetaMap);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(status, exceptionMessages, consensusGroupIdToTaskMetaMap);
+  }
+
+  @Override
+  public String toString() {
+    return "PipeRuntimeMeta{"
+        + "status="
+        + status
+        + ", exceptionMessages="
+        + exceptionMessages
+        + ", consensusGroupIdToTaskMetaMap="
+        + consensusGroupIdToTaskMetaMap
+        + '}';
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
new file mode 100644
index 0000000000..0165899f74
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.pipe.task.meta;
+
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PipeStaticMeta {
+
+  private String pipeName;
+  private long createTime;
+
+  private Map<String, String> collectorAttributes = new HashMap<>();
+  private Map<String, String> processorAttributes = new HashMap<>();
+  private Map<String, String> connectorAttributes = new HashMap<>();
+
+  private PipeStaticMeta() {}
+
+  public PipeStaticMeta(
+      String pipeName,
+      long createTime,
+      Map<String, String> collectorAttributes,
+      Map<String, String> processorAttributes,
+      Map<String, String> connectorAttributes) {
+    this.pipeName = pipeName.toUpperCase();
+    this.createTime = createTime;
+    this.collectorAttributes = collectorAttributes;
+    this.processorAttributes = processorAttributes;
+    this.connectorAttributes = connectorAttributes;
+  }
+
+  public String getPipeName() {
+    return pipeName;
+  }
+
+  public long getCreateTime() {
+    return createTime;
+  }
+
+  public Map<String, String> getCollectorAttributes() {
+    return collectorAttributes;
+  }
+
+  public Map<String, String> getProcessorAttributes() {
+    return collectorAttributes;
+  }
+
+  public Map<String, String> getConnectorAttributes() {
+    return collectorAttributes;
+  }
+
+  public ByteBuffer serialize() throws IOException {
+    PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+    serialize(outputStream);
+    return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+  }
+
+  public void serialize(DataOutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write(pipeName, outputStream);
+    ReadWriteIOUtils.write(createTime, outputStream);
+
+    outputStream.writeInt(collectorAttributes.size());
+    for (Map.Entry<String, String> entry : collectorAttributes.entrySet()) {
+      ReadWriteIOUtils.write(entry.getKey(), outputStream);
+      ReadWriteIOUtils.write(entry.getValue(), outputStream);
+    }
+    outputStream.writeInt(processorAttributes.size());
+    for (Map.Entry<String, String> entry : processorAttributes.entrySet()) {
+      ReadWriteIOUtils.write(entry.getKey(), outputStream);
+      ReadWriteIOUtils.write(entry.getValue(), outputStream);
+    }
+    outputStream.writeInt(connectorAttributes.size());
+    for (Map.Entry<String, String> entry : connectorAttributes.entrySet()) {
+      ReadWriteIOUtils.write(entry.getKey(), outputStream);
+      ReadWriteIOUtils.write(entry.getValue(), outputStream);
+    }
+  }
+
+  public static PipeStaticMeta deserialize(InputStream inputStream) throws IOException {
+    return deserialize(
+        ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(inputStream)));
+  }
+
+  public static PipeStaticMeta deserialize(ByteBuffer byteBuffer) {
+    final PipeStaticMeta pipeStaticMeta = new PipeStaticMeta();
+
+    pipeStaticMeta.pipeName = ReadWriteIOUtils.readString(byteBuffer);
+    pipeStaticMeta.createTime = ReadWriteIOUtils.readLong(byteBuffer);
+
+    int size = byteBuffer.getInt();
+    for (int i = 0; i < size; ++i) {
+      pipeStaticMeta.collectorAttributes.put(
+          ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
+    }
+    size = byteBuffer.getInt();
+    for (int i = 0; i < size; ++i) {
+      pipeStaticMeta.processorAttributes.put(
+          ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
+    }
+    size = byteBuffer.getInt();
+    for (int i = 0; i < size; ++i) {
+      pipeStaticMeta.connectorAttributes.put(
+          ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
+    }
+
+    return pipeStaticMeta;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    PipeStaticMeta that = (PipeStaticMeta) obj;
+    return pipeName.equals(that.pipeName)
+        && createTime == that.createTime
+        && collectorAttributes.equals(that.collectorAttributes)
+        && processorAttributes.equals(that.processorAttributes)
+        && connectorAttributes.equals(that.connectorAttributes);
+  }
+
+  @Override
+  public int hashCode() {
+    return pipeName.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "PipeStaticMeta{"
+        + "pipeName='"
+        + pipeName
+        + '\''
+        + ", createTime="
+        + createTime
+        + ", collectorAttributes="
+        + collectorAttributes
+        + ", processorAttributes="
+        + processorAttributes
+        + ", connectorAttributes="
+        + connectorAttributes
+        + '}';
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStatus.java
similarity index 61%
rename from node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStatus.java
index 04653122c2..74f6c3403f 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMetaAccessor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStatus.java
@@ -19,4 +19,32 @@
 
 package org.apache.iotdb.commons.pipe.task.meta;
 
-public class PipeTaskMetaAccessor {}
+public enum PipeStatus {
+  RUNNING((byte) 0),
+  STOPPED((byte) 1),
+  DROPPED((byte) 2),
+  ;
+
+  private final byte type;
+
+  PipeStatus(byte type) {
+    this.type = type;
+  }
+
+  public byte getType() {
+    return type;
+  }
+
+  public static PipeStatus getPipeStatus(byte type) {
+    switch (type) {
+      case 0:
+        return PipeStatus.RUNNING;
+      case 1:
+        return PipeStatus.STOPPED;
+      case 2:
+        return PipeStatus.DROPPED;
+      default:
+        throw new IllegalArgumentException("Invalid input: " + type);
+    }
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
index ec935e4c94..5b5fcf5e9a 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/pipe/SyncOperation.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.commons.sync.pipe;
 
 public enum SyncOperation {
-  // PIPESINK
   CREATE_PIPESINK,
   DROP_PIPESINK,
   // PIPE
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
index 3be5d25f10..d31ea9b98a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
@@ -93,7 +93,7 @@ public class InsertRowStatement extends InsertBaseStatement {
     this.values = new Object[measurements.length];
     this.dataTypes = new TSDataType[measurements.length];
     for (int i = 0; i < dataTypes.length; i++) {
-      // types are not determined, the situation mainly occurs when the plan uses string values
+      // Types are not determined, the situation mainly occurs when the plan uses string values
       // and is forwarded to other nodes
       byte typeNum = (byte) ReadWriteIOUtils.read(buffer);
       if (typeNum == TYPE_RAW_STRING || typeNum == TYPE_NULL) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 56b172bfa0..34db75c702 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -38,14 +38,12 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.exception.sync.PipeException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
-import org.apache.iotdb.commons.sync.pipe.PipeInfo;
 import org.apache.iotdb.commons.sync.pipe.SyncOperation;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
 import org.apache.iotdb.commons.udf.UDFInformation;
@@ -113,7 +111,6 @@ import org.apache.iotdb.db.quotas.DataNodeSpaceQuotaManager;
 import org.apache.iotdb.db.quotas.DataNodeThrottleQuotaManager;
 import org.apache.iotdb.db.service.DataNode;
 import org.apache.iotdb.db.service.RegionMigrateService;
-import org.apache.iotdb.db.sync.SyncService;
 import org.apache.iotdb.db.trigger.executor.TriggerExecutor;
 import org.apache.iotdb.db.trigger.executor.TriggerFireResult;
 import org.apache.iotdb.db.trigger.service.TriggerManagementService;
@@ -744,13 +741,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
 
   @Override
   public TSStatus createPipeOnDataNode(TCreatePipeOnDataNodeReq req) {
-    try {
-      PipeInfo pipeInfo = PipeInfo.deserializePipeInfo(req.pipeInfo);
-      SyncService.getInstance().addPipe(pipeInfo);
-      return RpcUtils.SUCCESS_STATUS;
-    } catch (PipeException e) {
-      return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
-    }
+    throw new NotImplementedException("TODO: createPipeOnDataNode");
   }
 
   @Override
@@ -758,44 +749,18 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
     try {
       switch (SyncOperation.values()[req.getOperation()]) {
         case START_PIPE:
-          SyncService.getInstance().startPipe(req.getPipeName());
-          break;
         case STOP_PIPE:
-          SyncService.getInstance().stopPipe(req.getPipeName());
-          break;
         case DROP_PIPE:
-          SyncService.getInstance().dropPipe(req.getPipeName());
-          break;
+          throw new NotImplementedException("TODO: operatePipeOnDataNode");
         default:
           return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
               .setMessage("Unsupported operation.");
       }
-      return RpcUtils.SUCCESS_STATUS;
-    } catch (PipeException e) {
+    } catch (Exception e) {
       return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
     }
   }
 
-  @Override
-  public TSStatus operatePipeOnDataNodeForRollback(TOperatePipeOnDataNodeReq req) {
-    // Operate PIPE on DataNode for rollback, createTime in req is required.
-    switch (SyncOperation.values()[req.getOperation()]) {
-      case START_PIPE:
-        SyncService.getInstance().startPipe(req.getPipeName(), req.getCreateTime());
-        break;
-      case STOP_PIPE:
-        SyncService.getInstance().stopPipe(req.getPipeName(), req.getCreateTime());
-        break;
-      case DROP_PIPE:
-        SyncService.getInstance().dropPipe(req.getPipeName(), req.getCreateTime());
-        break;
-      default:
-        return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
-            .setMessage("Unsupported operation.");
-    }
-    return RpcUtils.SUCCESS_STATUS;
-  }
-
   @Override
   public TSStatus executeCQ(TExecuteCQ req) {
 
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index c9558af83d..3b8d18de89 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -356,14 +356,13 @@ struct TCountPathsUsingTemplateResp{
 }
 
 struct TCreatePipeOnDataNodeReq{
-  1: required binary pipeInfo
+  1: required binary pipeMeta
 }
 
 struct TOperatePipeOnDataNodeReq {
     1: required string pipeName
     // ordinal of {@linkplain SyncOperation}
     2: required i8 operation
-    3: optional i64 createTime
 }
 
 // ====================================================
@@ -751,11 +750,6 @@ service IDataNodeRPCService {
   */
   common.TSStatus operatePipeOnDataNode(TOperatePipeOnDataNodeReq req)
 
- /**
-  * Start, stop or drop PIPE on DataNode for rollback
-  */
-  common.TSStatus operatePipeOnDataNodeForRollback(TOperatePipeOnDataNodeReq req)
-
  /**
   * Execute CQ on DataNode
   */