You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/03/18 05:05:11 UTC

[iotdb] branch master updated: [IOTDB-5690] PipePlugin: CreatePipePluginProcedure & DropPipePluginProcedure (#9363)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d0e2b3f108 [IOTDB-5690] PipePlugin: CreatePipePluginProcedure & DropPipePluginProcedure (#9363)
d0e2b3f108 is described below

commit d0e2b3f108ab6caa6389b24923e96788edf42274
Author: Itami Sho <42...@users.noreply.github.com>
AuthorDate: Sat Mar 18 13:05:04 2023 +0800

    [IOTDB-5690] PipePlugin: CreatePipePluginProcedure & DropPipePluginProcedure (#9363)
    
    Signed-off-by: Steve Yurong Su <ro...@apache.org>
    Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
 .../client/async/AsyncDataNodeClientPool.java      |  14 +
 .../consensus/request/ConfigPhysicalPlan.java      |  16 ++
 .../write/pipe/plugin/CreatePipePluginPlan.java    |   4 +
 .../iotdb/confignode/manager/ConfigManager.java    |   5 +
 .../apache/iotdb/confignode/manager/IManager.java  |   8 +
 .../iotdb/confignode/manager/ProcedureManager.java |  46 ++++
 .../iotdb/confignode/manager/node/NodeManager.java |   9 +
 .../manager/pipe/PipePluginCoordinator.java        | 120 +-------
 .../procedure/env/ConfigNodeProcedureEnv.java      |  31 +++
 .../pipe/plugin/CreatePipePluginProcedure.java     | 306 +++++++++++++++++++++
 .../impl/pipe/plugin/DropPipePluginProcedure.java  | 251 +++++++++++++++++
 .../state/pipe/plugin/CreatePipePluginState.java   |  27 ++
 .../state/pipe/plugin/DropPipePluginState.java     |  27 ++
 .../procedure/store/ProcedureFactory.java          |  12 +
 .../confignode/procedure/store/ProcedureType.java  |   6 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   5 +
 .../impl/pipe/CreatePipePluginProcedureTest.java   |  60 ++++
 .../impl/pipe/DropPipePluginProcedureTest.java     |  53 ++++
 .../confignode1conf/iotdb-common.properties        |   1 +
 .../confignode2conf/iotdb-common.properties        |   1 +
 .../confignode3conf/iotdb-common.properties        |   1 +
 .../resources/conf/iotdb-common.properties         |   9 +
 .../service/PipePluginClassLoaderManager.java      |   2 +-
 .../commons/udf/service/UDFManagementService.java  |   2 +-
 .../apache/iotdb/db/client/ConfigNodeClient.java   |  16 ++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  24 ++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   7 +
 .../iotdb/db/pipe/agent/PipePluginAgent.java       |  10 +-
 .../java/org/apache/iotdb/db/service/DataNode.java | 110 +++++++-
 .../db/service/ResourcesInformationHolder.java     |  12 +
 .../datanode1conf/iotdb-common.properties          |   3 +-
 .../datanode2conf/iotdb-common.properties          |   3 +-
 .../datanode3conf/iotdb-common.properties          |   3 +-
 .../src/main/thrift/confignode.thrift              |   6 +
 34 files changed, 1087 insertions(+), 123 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index ce07676532..79496235f5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -41,12 +41,14 @@ import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreatePipeOnDataNodeReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreatePipePluginInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDeactivateTemplateReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteSchemaReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDeleteTimeSeriesReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TDropPipePluginInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
@@ -203,6 +205,18 @@ public class AsyncDataNodeClientPool {
               (AsyncTSStatusRPCHandler)
                   clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
           break;
+        case CREATE_PIPE_PLUGIN:
+          client.createPipePlugin(
+              (TCreatePipePluginInstanceReq) clientHandler.getRequest(requestId),
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
+        case DROP_PIPE_PLUGIN:
+          client.dropPipePlugin(
+              (TDropPipePluginInstanceReq) clientHandler.getRequest(requestId),
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
         case MERGE:
         case FULL_MERGE:
           client.merge(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index 62d83e48f3..bb3dc7d4e3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -33,6 +33,8 @@ import org.apache.iotdb.confignode.consensus.request.read.partition.GetOrCreateS
 import org.apache.iotdb.confignode.consensus.request.read.partition.GetSchemaPartitionPlan;
 import org.apache.iotdb.confignode.consensus.request.read.partition.GetSeriesSlotListPlan;
 import org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotListPlan;
+import org.apache.iotdb.confignode.consensus.request.read.pipe.plugin.GetPipePluginJarPlan;
+import org.apache.iotdb.confignode.consensus.request.read.pipe.plugin.GetPipePluginTablePlan;
 import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionIdPlan;
 import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
 import org.apache.iotdb.confignode.consensus.request.read.template.CheckTemplateSettablePlan;
@@ -73,6 +75,8 @@ import org.apache.iotdb.confignode.consensus.request.write.model.UpdateModelStat
 import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
 import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
 import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
 import org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
 import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
 import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
@@ -414,6 +418,18 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest {
         case ShowTrail:
           plan = new ShowTrailPlan();
           break;
+        case CreatePipePlugin:
+          plan = new CreatePipePluginPlan();
+          break;
+        case DropPipePlugin:
+          plan = new DropPipePluginPlan();
+          break;
+        case GetPipePluginTable:
+          plan = new GetPipePluginTablePlan();
+          break;
+        case GetPipePluginJar:
+          plan = new GetPipePluginJarPlan();
+          break;
         default:
           throw new IOException("unknown PhysicalPlan configPhysicalPlanType: " + planType);
       }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/plugin/CreatePipePluginPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/plugin/CreatePipePluginPlan.java
index 35fb68272d..ce4984228c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/plugin/CreatePipePluginPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/plugin/CreatePipePluginPlan.java
@@ -34,6 +34,10 @@ public class CreatePipePluginPlan extends ConfigPhysicalPlan {
 
   private Binary jarFile;
 
+  public CreatePipePluginPlan() {
+    super(ConfigPhysicalPlanType.CreatePipePlugin);
+  }
+
   public CreatePipePluginPlan(PipePluginMeta pipePluginMeta, Binary jarFile) {
     super(ConfigPhysicalPlanType.CreatePipePlugin);
     this.pipePluginMeta = pipePluginMeta;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 1f1d825e88..d92883c14f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -927,6 +927,11 @@ public class ConfigManager implements IManager {
     return modelManager;
   }
 
+  @Override
+  public PipeManager getPipeManager() {
+    return pipeManager;
+  }
+
   @Override
   public TSStatus operatePermission(AuthorPlan authorPlan) {
     TSStatus status = confirmLeader();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index eaa7ccbe92..75aaaf9a5b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.confignode.manager.cq.CQManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
+import org.apache.iotdb.confignode.manager.pipe.PipeManager;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
@@ -187,6 +188,13 @@ public interface IManager {
    */
   CQManager getCQManager();
 
+  /**
+   * Get PipeManager
+   *
+   * @return PipeManager instance
+   */
+  PipeManager getPipeManager();
+
   /**
    * Get RetryFailedTasksThread
    *
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 7084c01aac..63a78d36c1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.commons.exception.sync.PipeException;
 import org.apache.iotdb.commons.model.ModelInformation;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
@@ -51,6 +52,8 @@ import org.apache.iotdb.confignode.procedure.impl.model.DropModelProcedure;
 import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
 import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
 import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
+import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
 import org.apache.iotdb.confignode.procedure.impl.schema.DeactivateTemplateProcedure;
 import org.apache.iotdb.confignode.procedure.impl.schema.DeleteDatabaseProcedure;
 import org.apache.iotdb.confignode.procedure.impl.schema.DeleteTimeSeriesProcedure;
@@ -576,6 +579,49 @@ public class ProcedureManager {
     }
   }
 
+  public TSStatus createPipePlugin(PipePluginMeta pipePluginMeta, byte[] jarFile) {
+    final CreatePipePluginProcedure createPipePluginProcedure =
+        new CreatePipePluginProcedure(pipePluginMeta, jarFile);
+    try {
+      if (jarFile != null
+          && new UpdateProcedurePlan(createPipePluginProcedure).getSerializedSize()
+              > planSizeLimit) {
+        return new TSStatus(TSStatusCode.CREATE_PIPE_PLUGIN_ERROR.getStatusCode())
+            .setMessage(
+                String.format(
+                    "Fail to create pipe plugin[%s], the size of Jar is too large, you can increase the value of property 'config_node_ratis_log_appender_buffer_size_max' on ConfigNode",
+                    pipePluginMeta.getPluginName()));
+      }
+    } catch (IOException e) {
+      return new TSStatus(TSStatusCode.CREATE_PIPE_PLUGIN_ERROR.getStatusCode())
+          .setMessage(e.getMessage());
+    }
+
+    long procedureId = executor.submitProcedure(createPipePluginProcedure);
+    List<TSStatus> statusList = new ArrayList<>();
+    boolean isSucceed =
+        waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
+    if (isSucceed) {
+      return RpcUtils.SUCCESS_STATUS;
+    } else {
+      return new TSStatus(TSStatusCode.CREATE_PIPE_PLUGIN_ERROR.getStatusCode())
+          .setMessage(statusList.get(0).getMessage());
+    }
+  }
+
+  public TSStatus dropPipePlugin(String pluginName) {
+    long procedureId = executor.submitProcedure(new DropPipePluginProcedure(pluginName));
+    List<TSStatus> statusList = new ArrayList<>();
+    boolean isSucceed =
+        waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
+    if (isSucceed) {
+      return RpcUtils.SUCCESS_STATUS;
+    } else {
+      return new TSStatus(TSStatusCode.DROP_PIPE_PLUGIN_ERROR.getStatusCode())
+          .setMessage(statusList.get(0).getMessage());
+    }
+  }
+
   public TSStatus createPipe(TCreatePipeReq req) {
     try {
       long procedureId = executor.submitProcedure(new CreatePipeProcedure(req));
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index fb822a9abb..b502a25e3a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -64,6 +64,7 @@ import org.apache.iotdb.confignode.manager.node.heartbeat.ConfigNodeHeartbeatCac
 import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
+import org.apache.iotdb.confignode.manager.pipe.PipeManager;
 import org.apache.iotdb.confignode.persistence.node.NodeInfo;
 import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
 import org.apache.iotdb.confignode.rpc.thrift.TCQConfig;
@@ -231,6 +232,7 @@ public class NodeManager {
   }
 
   private TRuntimeConfiguration getRuntimeConfiguration() {
+    getPipeManager().getPipePluginCoordinator().getPipePluginInfo().acquirePipePluginInfoLock();
     getTriggerManager().getTriggerInfo().acquireTriggerTableLock();
     getUDFManager().getUdfInfo().acquireUDFTableLock();
 
@@ -241,12 +243,15 @@ public class NodeManager {
           getTriggerManager().getTriggerTable(false).getAllTriggerInformation());
       runtimeConfiguration.setAllUDFInformation(
           getUDFManager().getUDFTable().getAllUDFInformation());
+      runtimeConfiguration.setAllPipeInformation(
+          getPipeManager().getPipePluginCoordinator().getPipePluginTable().getAllPipePluginMeta());
       runtimeConfiguration.setAllTTLInformation(
           DataNodeRegisterResp.convertAllTTLInformation(getClusterSchemaManager().getAllTTLInfo()));
       return runtimeConfiguration;
     } finally {
       getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
       getUDFManager().getUdfInfo().releaseUDFTableLock();
+      getPipeManager().getPipePluginCoordinator().getPipePluginInfo().releasePipePluginInfoLock();
     }
   }
 
@@ -978,6 +983,10 @@ public class NodeManager {
     return configManager.getTriggerManager();
   }
 
+  private PipeManager getPipeManager() {
+    return configManager.getPipeManager();
+  }
+
   private UDFManager getUDFManager() {
     return configManager.getUDFManager();
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
index c2cee0cc74..9e45a41dce 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
@@ -19,18 +19,10 @@
 
 package org.apache.iotdb.confignode.manager.pipe;
 
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
-import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.read.pipe.plugin.GetPipePluginJarPlan;
 import org.apache.iotdb.confignode.consensus.request.read.pipe.plugin.GetPipePluginTablePlan;
-import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
-import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
 import org.apache.iotdb.confignode.consensus.response.pipe.plugin.PipePluginTableResp;
 import org.apache.iotdb.confignode.consensus.response.udf.JarResp;
 import org.apache.iotdb.confignode.manager.ConfigManager;
@@ -39,31 +31,18 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
-import org.apache.iotdb.mpp.rpc.thrift.TCreatePipePluginInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDropPipePluginInstanceReq;
-import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.utils.Binary;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Collections;
-import java.util.List;
-import java.util.Map;
 
 public class PipePluginCoordinator {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PipePluginCoordinator.class);
 
-  private final long planSizeLimit =
-      ConfigNodeDescriptor.getInstance()
-              .getConf()
-              .getConfigNodeRatisConsensusLogAppenderBufferSize()
-          - IoTDBConstant.RAFT_LOG_BASIC_SIZE;
-
   private final ConfigManager configManager;
   private final PipePluginInfo pipePluginInfo;
 
@@ -72,108 +51,23 @@ public class PipePluginCoordinator {
     this.pipePluginInfo = pipePluginInfo;
   }
 
-  public TSStatus createPipePlugin(TCreatePipePluginReq req) {
-    pipePluginInfo.acquirePipePluginInfoLock();
-    try {
-      return doCreatePipePlugin(req);
-    } catch (Exception e) {
-      LOGGER.warn(e.getMessage(), e);
-      return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
-          .setMessage(e.getMessage());
-    } finally {
-      pipePluginInfo.releasePipePluginInfoLock();
-    }
+  public PipePluginInfo getPipePluginInfo() {
+    return pipePluginInfo;
   }
 
-  private TSStatus doCreatePipePlugin(TCreatePipePluginReq req) throws IOException {
+  public TSStatus createPipePlugin(TCreatePipePluginReq req) {
     final String pluginName = req.getPluginName().toUpperCase();
+    final String className = req.getClassName();
     final String jarName = req.getJarName();
     final String jarMD5 = req.getJarMD5();
-
-    pipePluginInfo.validateBeforeCreatingPipePlugin(pluginName, jarName, jarMD5);
-    final boolean needToSaveJar =
-        pipePluginInfo.isJarNeededToBeSavedWhenCreatingPipePlugin(jarName);
-    LOGGER.info(
-        "Start to create PipePlugin [{}] on Data Nodes, needToSaveJar[{}]",
-        pluginName,
-        needToSaveJar);
-
-    final byte[] jarFile = req.getJarFile();
     final PipePluginMeta pipePluginMeta =
-        new PipePluginMeta(pluginName, req.getClassName(), jarName, jarMD5);
-
-    // data nodes
-    final TSStatus dataNodesStatus =
-        RpcUtils.squashResponseStatusList(
-            createPipePluginOnDataNodes(pipePluginMeta, needToSaveJar ? jarFile : null));
-    if (dataNodesStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      return dataNodesStatus;
-    }
+        new PipePluginMeta(pluginName, className, jarName, jarMD5);
 
-    // config nodes
-    final CreatePipePluginPlan createPluginPlan =
-        new CreatePipePluginPlan(pipePluginMeta, needToSaveJar ? new Binary(jarFile) : null);
-    if (needToSaveJar && createPluginPlan.getSerializedSize() > planSizeLimit) {
-      return new TSStatus(TSStatusCode.CREATE_PIPE_PLUGIN_ERROR.getStatusCode())
-          .setMessage(
-              String.format(
-                  "Fail to create PipePlugin[%s], the size of Jar is too large, you should increase the value of property 'config_node_ratis_log_appender_buffer_size_max' on ConfigNode",
-                  pluginName));
-    }
-    LOGGER.info("Start to add PipePlugin [{}] to PipePluginTable", pluginName);
-    return configManager.getConsensusManager().write(createPluginPlan).getStatus();
-  }
-
-  private List<TSStatus> createPipePluginOnDataNodes(PipePluginMeta pipePluginMeta, byte[] jarFile)
-      throws IOException {
-    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
-        configManager.getNodeManager().getRegisteredDataNodeLocations();
-    final TCreatePipePluginInstanceReq req =
-        new TCreatePipePluginInstanceReq(pipePluginMeta.serialize(), ByteBuffer.wrap(jarFile));
-
-    final AsyncClientHandler<TCreatePipePluginInstanceReq, TSStatus> clientHandler =
-        new AsyncClientHandler<>(DataNodeRequestType.CREATE_PIPE_PLUGIN, req, dataNodeLocationMap);
-    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
-    return clientHandler.getResponseList();
+    return configManager.getProcedureManager().createPipePlugin(pipePluginMeta, req.getJarFile());
   }
 
   public TSStatus dropPipePlugin(String pluginName) {
-    pipePluginInfo.acquirePipePluginInfoLock();
-    try {
-      return doDropPipePlugin(pluginName);
-    } catch (Exception e) {
-      LOGGER.warn(e.getMessage(), e);
-      return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
-          .setMessage(e.getMessage());
-    } finally {
-      pipePluginInfo.releasePipePluginInfoLock();
-    }
-  }
-
-  private TSStatus doDropPipePlugin(String pluginName) {
-    pluginName = pluginName.toUpperCase();
-    pipePluginInfo.validateBeforeDroppingPipePlugin(pluginName);
-
-    TSStatus result = RpcUtils.squashResponseStatusList(dropPipePluginOnDataNodes(pluginName));
-    if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      return result;
-    }
-
-    return configManager
-        .getConsensusManager()
-        .write(new DropPipePluginPlan(pluginName))
-        .getStatus();
-  }
-
-  private List<TSStatus> dropPipePluginOnDataNodes(String pluginName) {
-    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
-        configManager.getNodeManager().getRegisteredDataNodeLocations();
-    final TDropPipePluginInstanceReq req = new TDropPipePluginInstanceReq(pluginName, false);
-
-    final AsyncClientHandler<TDropPipePluginInstanceReq, TSStatus> clientHandler =
-        new AsyncClientHandler<>(DataNodeRequestType.DROP_PIPE_PLUGIN, req, dataNodeLocationMap);
-    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
-    return clientHandler.getResponseList();
+    return configManager.getProcedureManager().dropPipePlugin(pluginName);
   }
 
   public TGetPipePluginTableResp getPipePluginTable() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 4bb874c4ba..efa53d330d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
 import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
@@ -60,8 +61,10 @@ import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreatePipePluginInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TDropPipePluginInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
@@ -625,6 +628,34 @@ public class ConfigNodeProcedureEnv {
     return clientHandler.getResponseList();
   }
 
+  public List<TSStatus> createPipePluginOnDataNodes(PipePluginMeta pipePluginMeta, byte[] jarFile)
+      throws IOException {
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        configManager.getNodeManager().getRegisteredDataNodeLocations();
+    final TCreatePipePluginInstanceReq request =
+        new TCreatePipePluginInstanceReq(pipePluginMeta.serialize(), ByteBuffer.wrap(jarFile));
+
+    final AsyncClientHandler<TCreatePipePluginInstanceReq, TSStatus> clientHandler =
+        new AsyncClientHandler<>(
+            DataNodeRequestType.CREATE_PIPE_PLUGIN, request, dataNodeLocationMap);
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    return clientHandler.getResponseList();
+  }
+
+  public List<TSStatus> dropPipePluginOnDataNodes(
+      String pipePluginName, boolean needToDeleteJarFile) {
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        configManager.getNodeManager().getRegisteredDataNodeLocations();
+    final TDropPipePluginInstanceReq request =
+        new TDropPipePluginInstanceReq(pipePluginName, needToDeleteJarFile);
+
+    AsyncClientHandler<TDropPipePluginInstanceReq, TSStatus> clientHandler =
+        new AsyncClientHandler<>(
+            DataNodeRequestType.DROP_PIPE_PLUGIN, request, dataNodeLocationMap);
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    return clientHandler.getResponseList();
+  }
+
   public LockQueue getNodeLock() {
     return nodeLock;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
new file mode 100644
index 0000000000..7d2a494fc7
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.pipe.plugin;
+
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.persistence.pipe.PipePluginInfo;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
+import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
+import org.apache.iotdb.confignode.procedure.state.pipe.plugin.CreatePipePluginState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * This class extends {@link AbstractNodeProcedure} to make sure that when a {@link
+ * CreatePipePluginProcedure} is executed, the {@link AddConfigNodeProcedure}, {@link
+ * RemoveConfigNodeProcedure} or {@link RemoveDataNodeProcedure} will not be executed at the same
+ * time.
+ */
+public class CreatePipePluginProcedure extends AbstractNodeProcedure<CreatePipePluginState> {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(CreatePipePluginProcedure.class);
+
+  private static final int RETRY_THRESHOLD = 5;
+
+  private PipePluginMeta pipePluginMeta;
+  private byte[] jarFile;
+
+  public CreatePipePluginProcedure() {
+    super();
+  }
+
+  public CreatePipePluginProcedure(PipePluginMeta pipePluginMeta, byte[] jarFile) {
+    super();
+    this.pipePluginMeta = pipePluginMeta;
+    this.jarFile = jarFile;
+  }
+
+  @Override
+  protected Flow executeFromState(ConfigNodeProcedureEnv env, CreatePipePluginState state)
+      throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+    if (pipePluginMeta == null) {
+      return Flow.NO_MORE_STATE;
+    }
+
+    try {
+      switch (state) {
+        case LOCK:
+          return executeFromLock(env);
+        case CREATE_ON_CONFIG_NODES:
+          return executeFromCreateOnConfigNodes(env);
+        case CREATE_ON_DATA_NODES:
+          return executeFromCreateOnDataNodes(env);
+        case UNLOCK:
+          return executeFromUnlock(env);
+      }
+    } catch (Exception e) {
+      if (isRollbackSupported(state)) {
+        LOGGER.error("CreatePipePluginProcedure failed in state {}, will rollback", state, e);
+        setFailure(new ProcedureException(e.getMessage()));
+      } else {
+        LOGGER.error(
+            "Retrievable error trying to create pipe plugin [{}], state: {}",
+            pipePluginMeta.getPluginName(),
+            state,
+            e);
+        if (getCycles() > RETRY_THRESHOLD) {
+          LOGGER.error(
+              "Fail to create pipe plugin [{}] after {} retries",
+              pipePluginMeta.getPluginName(),
+              getCycles());
+          setFailure(new ProcedureException(e.getMessage()));
+        }
+      }
+    }
+    return Flow.HAS_MORE_STATE;
+  }
+
+  private Flow executeFromLock(ConfigNodeProcedureEnv env) {
+    LOGGER.info("CreatePipePluginProcedure: executeFromLock({})", pipePluginMeta.getPluginName());
+    final PipePluginInfo pipePluginInfo =
+        env.getConfigManager().getPipeManager().getPipePluginCoordinator().getPipePluginInfo();
+
+    pipePluginInfo.acquirePipePluginInfoLock();
+
+    try {
+      pipePluginInfo.validateBeforeCreatingPipePlugin(
+          pipePluginMeta.getPluginName(), pipePluginMeta.getJarName(), pipePluginMeta.getJarMD5());
+    } catch (PipeManagementException e) {
+      // The pipe plugin has already created, we should end the procedure
+      LOGGER.warn(
+          "Pipe plugin {} is already created, end the CreatePipePluginProcedure({})",
+          pipePluginMeta.getPluginName(),
+          pipePluginMeta.getPluginName());
+      setFailure(new ProcedureException(e.getMessage()));
+      pipePluginInfo.releasePipePluginInfoLock();
+      return Flow.NO_MORE_STATE;
+    }
+
+    setNextState(CreatePipePluginState.CREATE_ON_CONFIG_NODES);
+    return Flow.HAS_MORE_STATE;
+  }
+
+  private Flow executeFromCreateOnConfigNodes(ConfigNodeProcedureEnv env) {
+    LOGGER.info(
+        "CreatePipePluginProcedure: executeFromCreateOnConfigNodes({})",
+        pipePluginMeta.getPluginName());
+
+    final ConfigManager configNodeManager = env.getConfigManager();
+
+    final boolean needToSaveJar =
+        configNodeManager
+            .getPipeManager()
+            .getPipePluginCoordinator()
+            .getPipePluginInfo()
+            .isJarNeededToBeSavedWhenCreatingPipePlugin(pipePluginMeta.getJarName());
+    final CreatePipePluginPlan createPluginPlan =
+        new CreatePipePluginPlan(pipePluginMeta, needToSaveJar ? new Binary(jarFile) : null);
+
+    final ConsensusWriteResponse response =
+        configNodeManager.getConsensusManager().write(createPluginPlan);
+    if (!response.isSuccessful()) {
+      throw new PipeManagementException(response.getErrorMessage());
+    }
+
+    setNextState(CreatePipePluginState.CREATE_ON_DATA_NODES);
+    return Flow.HAS_MORE_STATE;
+  }
+
+  private Flow executeFromCreateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException {
+    LOGGER.info(
+        "CreatePipePluginProcedure: executeFromCreateOnDataNodes({})",
+        pipePluginMeta.getPluginName());
+
+    if (RpcUtils.squashResponseStatusList(env.createPipePluginOnDataNodes(pipePluginMeta, jarFile))
+            .getCode()
+        == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      setNextState(CreatePipePluginState.UNLOCK);
+      return Flow.HAS_MORE_STATE;
+    }
+
+    throw new PipeManagementException(
+        String.format(
+            "Failed to create pipe plugin instance [%s] on data nodes",
+            pipePluginMeta.getPluginName()));
+  }
+
+  private Flow executeFromUnlock(ConfigNodeProcedureEnv env) {
+    LOGGER.info("CreatePipePluginProcedure: executeFromUnlock({})", pipePluginMeta.getPluginName());
+
+    env.getConfigManager()
+        .getPipeManager()
+        .getPipePluginCoordinator()
+        .getPipePluginInfo()
+        .releasePipePluginInfoLock();
+
+    return Flow.NO_MORE_STATE;
+  }
+
+  @Override
+  protected void rollbackState(ConfigNodeProcedureEnv env, CreatePipePluginState state)
+      throws IOException, InterruptedException, ProcedureException {
+    switch (state) {
+      case LOCK:
+        rollbackFromLock(env);
+        break;
+      case CREATE_ON_CONFIG_NODES:
+        rollbackFromCreateOnConfigNodes(env);
+        break;
+      case CREATE_ON_DATA_NODES:
+        rollbackFromCreateOnDataNodes(env);
+        break;
+    }
+  }
+
+  private void rollbackFromLock(ConfigNodeProcedureEnv env) {
+    LOGGER.info("CreatePipePluginProcedure: rollbackFromLock({})", pipePluginMeta.getPluginName());
+
+    env.getConfigManager()
+        .getPipeManager()
+        .getPipePluginCoordinator()
+        .getPipePluginInfo()
+        .releasePipePluginInfoLock();
+  }
+
+  private void rollbackFromCreateOnConfigNodes(ConfigNodeProcedureEnv env) {
+    LOGGER.info(
+        "CreatePipePluginProcedure: rollbackFromCreateOnConfigNodes({})",
+        pipePluginMeta.getPluginName());
+
+    env.getConfigManager()
+        .getConsensusManager()
+        .write(new DropPipePluginPlan(pipePluginMeta.getPluginName()));
+  }
+
+  private void rollbackFromCreateOnDataNodes(ConfigNodeProcedureEnv env) throws ProcedureException {
+    LOGGER.info(
+        "CreatePipePluginProcedure: rollbackFromCreateOnDataNodes({})",
+        pipePluginMeta.getPluginName());
+
+    if (RpcUtils.squashResponseStatusList(
+                env.dropPipePluginOnDataNodes(pipePluginMeta.getPluginName(), false))
+            .getCode()
+        != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new ProcedureException(
+          String.format(
+              "Failed to rollback pipe plugin [%s] on data nodes", pipePluginMeta.getPluginName()));
+    }
+  }
+
+  @Override
+  protected boolean isRollbackSupported(CreatePipePluginState state) {
+    switch (state) {
+      case LOCK:
+      case CREATE_ON_CONFIG_NODES:
+      case CREATE_ON_DATA_NODES:
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  @Override
+  protected CreatePipePluginState getState(int stateId) {
+    return CreatePipePluginState.values()[stateId];
+  }
+
+  @Override
+  protected int getStateId(CreatePipePluginState createPipePluginState) {
+    return createPipePluginState.ordinal();
+  }
+
+  @Override
+  protected CreatePipePluginState getInitialState() {
+    return CreatePipePluginState.LOCK;
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    stream.writeShort(ProcedureType.CREATE_PIPE_PLUGIN_PROCEDURE.getTypeCode());
+    super.serialize(stream);
+    pipePluginMeta.serialize(stream);
+    ReadWriteIOUtils.write(ByteBuffer.wrap(jarFile), stream);
+  }
+
+  @Override
+  public void deserialize(ByteBuffer byteBuffer) {
+    super.deserialize(byteBuffer);
+    pipePluginMeta = PipePluginMeta.deserialize(byteBuffer);
+    jarFile = ReadWriteIOUtils.readBinary(byteBuffer).getValues();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that instanceof CreatePipePluginProcedure) {
+      CreatePipePluginProcedure thatProcedure = (CreatePipePluginProcedure) that;
+      return thatProcedure.getProcId() == getProcId()
+          && thatProcedure.getState() == this.getState()
+          && thatProcedure.pipePluginMeta.equals(pipePluginMeta);
+    }
+    return false;
+  }
+
+  @TestOnly
+  public byte[] getJarFile() {
+    return jarFile;
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
new file mode 100644
index 0000000000..815782a0ac
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.pipe.plugin;
+
+import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
+import org.apache.iotdb.confignode.persistence.pipe.PipePluginInfo;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
+import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
+import org.apache.iotdb.confignode.procedure.state.pipe.plugin.DropPipePluginState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * This class extends {@link AbstractNodeProcedure} to make sure that when a {@link
+ * DropPipePluginProcedure} is executed, the {@link AddConfigNodeProcedure}, {@link
+ * RemoveConfigNodeProcedure} or {@link RemoveDataNodeProcedure} will not be executed at the same
+ * time.
+ */
+public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePluginState> {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(DropPipePluginProcedure.class);
+
+  private static final int RETRY_THRESHOLD = 5;
+
+  private String pluginName;
+
+  public DropPipePluginProcedure() {
+    super();
+  }
+
+  public DropPipePluginProcedure(String pluginName) {
+    super();
+    this.pluginName = pluginName;
+  }
+
+  @Override
+  protected Flow executeFromState(ConfigNodeProcedureEnv env, DropPipePluginState state)
+      throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+    if (pluginName == null) {
+      return Flow.NO_MORE_STATE;
+    }
+
+    try {
+      switch (state) {
+        case LOCK:
+          return executeFromLock(env);
+        case DROP_ON_DATA_NODES:
+          return executeFromDropOnDataNodes(env);
+        case DROP_ON_CONFIG_NODES:
+          return executeFromDropOnConfigNodes(env);
+        case UNLOCK:
+          return executeFromUnlock(env);
+      }
+    } catch (Exception e) {
+      if (isRollbackSupported(state)) {
+        LOGGER.warn("DropPipePluginProcedure failed in state {}, will rollback", state, e);
+        setFailure(new ProcedureException(e.getMessage()));
+      } else {
+        LOGGER.error(
+            "Retrievable error trying to drop pipe plugin [{}], state: {}", pluginName, state, e);
+        if (getCycles() > RETRY_THRESHOLD) {
+          LOGGER.error("Fail to drop pipe plugin [{}] after {} retries", pluginName, getCycles());
+          setFailure(new ProcedureException(e.getMessage()));
+        }
+      }
+    }
+    return Flow.HAS_MORE_STATE;
+  }
+
+  private Flow executeFromLock(ConfigNodeProcedureEnv env) {
+    LOGGER.info("DropPipePluginProcedure: executeFromLock({})", pluginName);
+    final PipePluginInfo pipePluginInfo =
+        env.getConfigManager().getPipeManager().getPipePluginCoordinator().getPipePluginInfo();
+
+    pipePluginInfo.acquirePipePluginInfoLock();
+
+    try {
+      pipePluginInfo.validateBeforeDroppingPipePlugin(pluginName);
+    } catch (PipeManagementException e) {
+      // if the pipe plugin is not exist, we should end the procedure
+      LOGGER.warn(e.getMessage());
+      setFailure(new ProcedureException(e.getMessage()));
+      pipePluginInfo.releasePipePluginInfoLock();
+      return Flow.NO_MORE_STATE;
+    }
+
+    env.getConfigManager().getConsensusManager().write(new DropPipePluginPlan(pluginName));
+
+    setNextState(DropPipePluginState.DROP_ON_DATA_NODES);
+    return Flow.HAS_MORE_STATE;
+  }
+
+  private Flow executeFromDropOnDataNodes(ConfigNodeProcedureEnv env) {
+    LOGGER.info("DropPipePluginProcedure: executeFromDropOnDataNodes({})", pluginName);
+
+    if (RpcUtils.squashResponseStatusList(env.dropPipePluginOnDataNodes(pluginName, false))
+            .getCode()
+        == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      setNextState(DropPipePluginState.DROP_ON_CONFIG_NODES);
+      return Flow.HAS_MORE_STATE;
+    }
+
+    throw new PipeManagementException(
+        String.format("Failed to drop pipe plugin %s on data nodes", pluginName));
+  }
+
+  private Flow executeFromDropOnConfigNodes(ConfigNodeProcedureEnv env) {
+    LOGGER.info("DropPipePluginProcedure: executeFromDropOnConfigNodes({})", pluginName);
+
+    env.getConfigManager().getConsensusManager().write(new DropPipePluginPlan(pluginName));
+
+    setNextState(DropPipePluginState.UNLOCK);
+    return Flow.HAS_MORE_STATE;
+  }
+
+  private Flow executeFromUnlock(ConfigNodeProcedureEnv env) {
+    LOGGER.info("DropPipePluginProcedure: executeFromUnlock({})", pluginName);
+
+    env.getConfigManager()
+        .getPipeManager()
+        .getPipePluginCoordinator()
+        .getPipePluginInfo()
+        .releasePipePluginInfoLock();
+
+    return Flow.NO_MORE_STATE;
+  }
+
+  @Override
+  protected void rollbackState(ConfigNodeProcedureEnv env, DropPipePluginState state)
+      throws IOException, InterruptedException, ProcedureException {
+    switch (state) {
+      case LOCK:
+        rollbackFromLock(env);
+        break;
+      case DROP_ON_DATA_NODES:
+        rollbackFromDropOnDataNodes(env);
+        break;
+      case DROP_ON_CONFIG_NODES:
+        rollbackFromDropOnConfigNodes(env);
+        break;
+    }
+  }
+
+  private void rollbackFromLock(ConfigNodeProcedureEnv env) {
+    LOGGER.info("DropPipePluginProcedure: rollbackFromLock({})", pluginName);
+
+    env.getConfigManager()
+        .getPipeManager()
+        .getPipePluginCoordinator()
+        .getPipePluginInfo()
+        .releasePipePluginInfoLock();
+  }
+
+  private void rollbackFromDropOnDataNodes(ConfigNodeProcedureEnv env) {
+    LOGGER.info("DropPipePluginProcedure: rollbackFromDropOnDataNodes({})", pluginName);
+
+    // do nothing but wait for rolling back to the previous state: LOCK
+    // TODO: we should drop the pipe plugin on data nodes
+  }
+
+  private void rollbackFromDropOnConfigNodes(ConfigNodeProcedureEnv env) {
+    LOGGER.info("DropPipePluginProcedure: rollbackFromDropOnConfigNodes({})", pluginName);
+
+    // do nothing but wait for rolling back to the previous state: DROP_ON_DATA_NODES
+    // TODO: we should drop the pipe plugin on config nodes
+  }
+
+  @Override
+  protected boolean isRollbackSupported(DropPipePluginState state) {
+    switch (state) {
+      case LOCK:
+      case DROP_ON_DATA_NODES:
+      case DROP_ON_CONFIG_NODES:
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  @Override
+  protected DropPipePluginState getState(int stateId) {
+    return DropPipePluginState.values()[stateId];
+  }
+
+  @Override
+  protected int getStateId(DropPipePluginState dropPipePluginState) {
+    return dropPipePluginState.ordinal();
+  }
+
+  @Override
+  protected DropPipePluginState getInitialState() {
+    return DropPipePluginState.LOCK;
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    stream.writeShort(ProcedureType.DROP_PIPE_PLUGIN_PROCEDURE.getTypeCode());
+    super.serialize(stream);
+    ReadWriteIOUtils.write(pluginName, stream);
+  }
+
+  @Override
+  public void deserialize(ByteBuffer byteBuffer) {
+    super.deserialize(byteBuffer);
+    pluginName = ReadWriteIOUtils.readString(byteBuffer);
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that instanceof DropPipePluginProcedure) {
+      final DropPipePluginProcedure thatProcedure = (DropPipePluginProcedure) that;
+      return thatProcedure.getProcId() == getProcId()
+          && thatProcedure.getState() == getState()
+          && (thatProcedure.pluginName).equals(pluginName);
+    }
+    return false;
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/pipe/plugin/CreatePipePluginState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/pipe/plugin/CreatePipePluginState.java
new file mode 100644
index 0000000000..81b7992cff
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/pipe/plugin/CreatePipePluginState.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.state.pipe.plugin;
+
+public enum CreatePipePluginState {
+  LOCK,
+  CREATE_ON_CONFIG_NODES,
+  CREATE_ON_DATA_NODES,
+  UNLOCK
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/pipe/plugin/DropPipePluginState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/pipe/plugin/DropPipePluginState.java
new file mode 100644
index 0000000000..189606307d
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/pipe/plugin/DropPipePluginState.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.state.pipe.plugin;
+
+public enum DropPipePluginState {
+  LOCK,
+  DROP_ON_DATA_NODES,
+  DROP_ON_CONFIG_NODES,
+  UNLOCK
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index 48a4cfd997..42a421891f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.confignode.procedure.impl.model.DropModelProcedure;
 import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
 import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
 import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
+import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
 import org.apache.iotdb.confignode.procedure.impl.schema.DeactivateTemplateProcedure;
 import org.apache.iotdb.confignode.procedure.impl.schema.DeleteDatabaseProcedure;
 import org.apache.iotdb.confignode.procedure.impl.schema.DeleteTimeSeriesProcedure;
@@ -117,6 +119,12 @@ public class ProcedureFactory implements IProcedureFactory {
       case DROP_MODEL_PROCEDURE:
         procedure = new DropModelProcedure();
         break;
+      case CREATE_PIPE_PLUGIN_PROCEDURE:
+        procedure = new CreatePipePluginProcedure();
+        break;
+      case DROP_PIPE_PLUGIN_PROCEDURE:
+        procedure = new DropPipePluginProcedure();
+        break;
       default:
         LOGGER.error("unknown Procedure type: " + typeCode);
         throw new IOException("unknown Procedure type: " + typeCode);
@@ -158,6 +166,10 @@ public class ProcedureFactory implements IProcedureFactory {
       return ProcedureType.DEACTIVATE_TEMPLATE_PROCEDURE;
     } else if (procedure instanceof UnsetTemplateProcedure) {
       return ProcedureType.UNSET_TEMPLATE_PROCEDURE;
+    } else if (procedure instanceof CreatePipePluginProcedure) {
+      return ProcedureType.CREATE_PIPE_PLUGIN_PROCEDURE;
+    } else if (procedure instanceof DropPipePluginProcedure) {
+      return ProcedureType.DROP_PIPE_PLUGIN_PROCEDURE;
     }
     return null;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
index a5c9d33eb9..c5d6054646 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
@@ -59,7 +59,11 @@ public enum ProcedureType {
 
   /** Ml Model */
   CREATE_MODEL_PROCEDURE((short) 800),
-  DROP_MODEL_PROCEDURE((short) 801);
+  DROP_MODEL_PROCEDURE((short) 801),
+
+  /** Pipe Plugin */
+  CREATE_PIPE_PLUGIN_PROCEDURE((short) 900),
+  DROP_PIPE_PLUGIN_PROCEDURE((short) 901);
 
   private final short typeCode;
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index ffa2c3749f..da9a7cd5e6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -708,6 +708,11 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
     return configManager.getPipePluginTable();
   }
 
+  @Override
+  public TGetJarInListResp getPipePluginJar(TGetJarInListReq req) throws TException {
+    return configManager.getPipePluginJar(req);
+  }
+
   @Override
   public TSStatus merge() throws TException {
     return configManager.merge();
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/CreatePipePluginProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/CreatePipePluginProcedureTest.java
new file mode 100644
index 0000000000..cda15ec592
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/CreatePipePluginProcedureTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.pipe;
+
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
+import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class CreatePipePluginProcedureTest {
+  @Test
+  public void serializeDeserializeTest() {
+    PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+
+    PipePluginMeta pipePluginMeta =
+        new PipePluginMeta("test", "test.class", "test.jar", "testMD5test");
+    CreatePipePluginProcedure proc =
+        new CreatePipePluginProcedure(pipePluginMeta, new byte[] {1, 2, 3});
+
+    try {
+      proc.serialize(outputStream);
+      ByteBuffer buffer =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+      CreatePipePluginProcedure proc2 =
+          (CreatePipePluginProcedure) ProcedureFactory.getInstance().create(buffer);
+
+      assertEquals(proc, proc2);
+      assertEquals(new Binary(proc.getJarFile()), new Binary(proc2.getJarFile()));
+    } catch (Exception e) {
+      fail();
+    }
+  }
+}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java
new file mode 100644
index 0000000000..02f7c856f2
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.pipe;
+
+import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class DropPipePluginProcedureTest {
+  @Test
+  public void serializeDeserializeTest() {
+    PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+
+    DropPipePluginProcedure proc = new DropPipePluginProcedure("test");
+
+    try {
+      proc.serialize(outputStream);
+      ByteBuffer buffer =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+      DropPipePluginProcedure proc2 =
+          (DropPipePluginProcedure) ProcedureFactory.getInstance().create(buffer);
+      assertEquals(proc, proc2);
+    } catch (Exception e) {
+      fail();
+    }
+  }
+}
diff --git a/confignode/src/test/resources/confignode1conf/iotdb-common.properties b/confignode/src/test/resources/confignode1conf/iotdb-common.properties
index bc77b25874..8981b21285 100644
--- a/confignode/src/test/resources/confignode1conf/iotdb-common.properties
+++ b/confignode/src/test/resources/confignode1conf/iotdb-common.properties
@@ -24,4 +24,5 @@ schema_replication_factor=3
 data_replication_factor=3
 udf_lib_dir=target/confignode1/ext/udf
 trigger_lib_dir=target/confignode1/ext/trigger
+pipe_lib_dir=target/confignode1/ext/pipe
 config_node_ratis_log_appender_buffer_size_max = 14194304
\ No newline at end of file
diff --git a/confignode/src/test/resources/confignode2conf/iotdb-common.properties b/confignode/src/test/resources/confignode2conf/iotdb-common.properties
index 1e431a85af..a9789fedab 100644
--- a/confignode/src/test/resources/confignode2conf/iotdb-common.properties
+++ b/confignode/src/test/resources/confignode2conf/iotdb-common.properties
@@ -24,4 +24,5 @@ schema_replication_factor=3
 data_replication_factor=3
 udf_lib_dir=target/confignode2/ext/udf
 trigger_lib_dir=target/confignode2/ext/trigger
+pipe_lib_dir=target/confignode2/ext/pipe
 config_node_ratis_log_appender_buffer_size_max = 14194304
\ No newline at end of file
diff --git a/confignode/src/test/resources/confignode3conf/iotdb-common.properties b/confignode/src/test/resources/confignode3conf/iotdb-common.properties
index e784ec66d3..6a95388bd7 100644
--- a/confignode/src/test/resources/confignode3conf/iotdb-common.properties
+++ b/confignode/src/test/resources/confignode3conf/iotdb-common.properties
@@ -24,4 +24,5 @@ schema_replication_factor=3
 data_replication_factor=3
 udf_lib_dir=target/confignode3/ext/udf
 trigger_lib_dir=target/confignode3/ext/trigger
+pipe_lib_dir=target/confignode3/ext/pipe
 config_node_ratis_log_appender_buffer_size_max = 14194304
\ No newline at end of file
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 14ff363c31..0ac0c6f4ce 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -892,6 +892,15 @@ cluster_name=defaultCluster
 ### Continuous Query Configuration
 ####################
 
+# Uncomment the following field to configure the pipe lib directory.
+# For Window platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is
+# absolute. Otherwise, it is relative.
+# pipe_lib_dir=ext\\pipe
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+# pipe_lib_dir=ext/pipe
+
 # The number of threads in the scheduled thread pool that submit continuous query tasks periodically
 # Datatype: int
 # continuous_query_submit_thread_count=2
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoaderManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoaderManager.java
index 3766af8bd1..4151af3bc2 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoaderManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoaderManager.java
@@ -89,7 +89,7 @@ public class PipePluginClassLoaderManager implements IService {
 
   private static PipePluginClassLoaderManager INSTANCE = null;
 
-  public static synchronized PipePluginClassLoaderManager getInstance(String libRoot)
+  public static synchronized PipePluginClassLoaderManager setupAndGetInstance(String libRoot)
       throws IOException {
     if (INSTANCE == null) {
       INSTANCE = new PipePluginClassLoaderManager(libRoot);
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java
index fedfa1c3d0..fbdf8684e9 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java
@@ -168,7 +168,7 @@ public class UDFManagementService {
                         UDFExecutableManager.getInstance().getInstallDir()
                             + File.separator
                             + udfInformation.getJarName())));
-        // save the md5 in a txt under trigger temporary lib
+        // save the md5 in a txt under UDF temporary lib
         UDFExecutableManager.getInstance()
             .saveTextAsFileUnderTemporaryRoot(existedMd5, md5FilePath);
       } catch (IOException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index a0673904c1..83d295f473 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -1467,6 +1467,22 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
     throw new TException(MSG_RECONNECTION_FAIL);
   }
 
+  @Override
+  public TGetJarInListResp getPipePluginJar(TGetJarInListReq req) throws TException {
+    for (int i = 0; i < RETRY_NUM; i++) {
+      try {
+        TGetJarInListResp resp = client.getPipePluginJar(req);
+        if (!updateConfigNodeLeader(resp.getStatus())) {
+          return resp;
+        }
+      } catch (TException e) {
+        configLeader = null;
+      }
+      waitAndReconnect();
+    }
+    throw new TException(MSG_RECONNECTION_FAIL);
+  }
+
   @Override
   public TSStatus createSchemaTemplate(TCreateSchemaTemplateReq req) throws TException {
     for (int i = 0; i < RETRY_NUM; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 47917ace72..f4ea5ba157 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -271,6 +271,13 @@ public class IoTDBConfig {
   private String triggerTemporaryLibDir =
       triggerDir + File.separator + IoTDBConstant.TMP_FOLDER_NAME;
 
+  /** External lib directory for Pipe Plugin, stores user-defined JAR files */
+  private String pipeDir =
+      IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.PIPE_FOLDER_NAME;
+
+  /** External temporary lib directory for storing downloaded pipe plugin JAR files */
+  private String pipeTemporaryLibDir = pipeDir + File.separator + IoTDBConstant.TMP_FOLDER_NAME;
+
   /** External lib directory for ext Pipe plugins, stores user-defined JAR files */
   private String extPipeDir =
       IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.EXT_PIPE_FOLDER_NAME;
@@ -1412,6 +1419,23 @@ public class IoTDBConfig {
     this.triggerTemporaryLibDir = triggerDir + File.separator + IoTDBConstant.TMP_FOLDER_NAME;
   }
 
+  public String getPipeDir() {
+    return pipeDir;
+  }
+
+  public void setPipeDir(String pipeDir) {
+    this.pipeDir = pipeDir;
+    updatePipeTemporaryLibDir();
+  }
+
+  public String getPipeTemporaryLibDir() {
+    return pipeTemporaryLibDir;
+  }
+
+  public void updatePipeTemporaryLibDir() {
+    this.pipeTemporaryLibDir = pipeDir + File.separator + IoTDBConstant.TMP_FOLDER_NAME;
+  }
+
   public String getMqttDir() {
     return mqttDir;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 6bd05d6f86..26e0544248 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1006,6 +1006,9 @@ public class IoTDBDescriptor {
     // CQ
     loadCQProps(properties);
 
+    // Pipe
+    loadPipeProps(properties);
+
     // cluster
     loadClusterProps(properties);
 
@@ -1787,6 +1790,10 @@ public class IoTDBDescriptor {
                 Integer.toString(conf.getTriggerForwardMQTTPoolSize()))));
   }
 
+  private void loadPipeProps(Properties properties) {
+    conf.setPipeDir(properties.getProperty("pipe_lib_dir", conf.getPipeDir()));
+  }
+
   private void loadCQProps(Properties properties) {
     conf.setContinuousQueryThreadNum(
         Integer.parseInt(
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java
index d820441dac..dc0565d4a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java
@@ -93,7 +93,15 @@ public class PipePluginAgent {
     }
   }
 
-  private void doRegister(PipePluginMeta pipePluginMeta) throws PipeManagementException {
+  /**
+   * Register a PipePlugin to the system without any meta checks. The PipePlugin will be loaded by
+   * the PipePluginClassLoader and its instance will be created to ensure that it can be loaded.
+   *
+   * @param pipePluginMeta the meta information of the PipePlugin
+   * @throws PipeManagementException if the PipePlugin can not be loaded or its instance can not be
+   *     created
+   */
+  public void doRegister(PipePluginMeta pipePluginMeta) throws PipeManagementException {
     final String pluginName = pipePluginMeta.getPluginName();
     final String className = pipePluginMeta.getClassName();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 7747bcdd08..c86e919e6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -30,6 +30,9 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
+import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
+import org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
 import org.apache.iotdb.commons.service.JMXService;
 import org.apache.iotdb.commons.service.RegisterManager;
 import org.apache.iotdb.commons.service.metric.MetricService;
@@ -67,6 +70,7 @@ import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
 import org.apache.iotdb.db.mpp.execution.schedule.DriverScheduler;
+import org.apache.iotdb.db.pipe.agent.PipePluginAgent;
 import org.apache.iotdb.db.protocol.rest.RestService;
 import org.apache.iotdb.db.service.metrics.DataNodeMetricsHelper;
 import org.apache.iotdb.db.service.metrics.IoTDBInternalLocalReporter;
@@ -80,6 +84,7 @@ import org.apache.iotdb.db.wal.WALManager;
 import org.apache.iotdb.db.wal.utils.WALMode;
 import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
 import org.apache.iotdb.metrics.utils.InternalReporterType;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.udf.api.exception.UDFManagementException;
 
@@ -312,7 +317,9 @@ public class DataNode implements DataNodeMBean {
    *
    * <p>4. All trigger information
    *
-   * <p>5. All TTL information
+   * <p>5. All Pipe information
+   *
+   * <p>6. All TTL information
    */
   private void storeRuntimeConfigurations(
       List<TConfigNodeLocation> configNodeLocations, TRuntimeConfiguration runtimeConfiguration) {
@@ -333,6 +340,9 @@ public class DataNode implements DataNodeMBean {
     /* Store triggerInformationList */
     getTriggerInformationList(runtimeConfiguration.getAllTriggerInformation());
 
+    /* Store pipeInformationList */
+    getPipeInformationList(runtimeConfiguration.getAllPipeInformation());
+
     /* Store ttl information */
     StorageEngine.getInstance().updateTTLInfo(runtimeConfiguration.getAllTTLInformation());
   }
@@ -451,6 +461,7 @@ public class DataNode implements DataNodeMBean {
   private void prepareResources() throws StartupException {
     prepareUDFResources();
     prepareTriggerResources();
+    preparePipePluginResources();
   }
 
   /** register services and set up DataNode */
@@ -638,7 +649,7 @@ public class DataNode implements DataNodeMBean {
       getJarOfUDFs(curList);
     }
 
-    // create instances of triggers and do registration
+    // create instances of udf and do registration
     try {
       for (UDFInformation udfInformation : resourcesInformationHolder.getUDFInformationList()) {
         UDFManagementService.getInstance().doRegister(udfInformation);
@@ -678,7 +689,7 @@ public class DataNode implements DataNodeMBean {
     List<UDFInformation> res = new ArrayList<>();
     for (UDFInformation udfInformation : resourcesInformationHolder.getUDFInformationList()) {
       if (udfInformation.isUsingURI()) {
-        // jar does not exist, add current triggerInformation to list
+        // jar does not exist, add current udfInformation to list
         if (!UDFExecutableManager.getInstance()
             .hasFileUnderInstallDir(udfInformation.getJarName())) {
           res.add(udfInformation);
@@ -821,6 +832,99 @@ public class DataNode implements DataNodeMBean {
     }
   }
 
+  private void preparePipePluginResources() throws StartupException {
+    initPipePluginRelatedInstance();
+    if (resourcesInformationHolder.getPipePluginMetaList() == null
+        || resourcesInformationHolder.getPipePluginMetaList().isEmpty()) {
+      return;
+    }
+
+    // get jars from config node
+    List<PipePluginMeta> pipePluginNeedJarList = getJarListForPipePlugin();
+    int index = 0;
+    while (index < pipePluginNeedJarList.size()) {
+      List<PipePluginMeta> curList = new ArrayList<>();
+      int offset = 0;
+      while (offset < ResourcesInformationHolder.getJarNumOfOneRpc()
+          && index + offset < pipePluginNeedJarList.size()) {
+        curList.add(pipePluginNeedJarList.get(index + offset));
+        offset++;
+      }
+      index += (offset + 1);
+      getJarOfPipePlugins(curList);
+    }
+
+    // create instances of pipe plugins and do registration
+    try {
+      for (PipePluginMeta meta : resourcesInformationHolder.getPipePluginMetaList()) {
+        PipePluginAgent.getInstance().doRegister(meta);
+      }
+    } catch (Exception e) {
+      throw new StartupException(e);
+    }
+  }
+
+  private void initPipePluginRelatedInstance() throws StartupException {
+    try {
+      PipePluginExecutableManager.setupAndGetInstance(
+          config.getPipeTemporaryLibDir(), config.getPipeDir());
+      PipePluginClassLoaderManager.setupAndGetInstance(config.getPipeDir());
+    } catch (IOException e) {
+      throw new StartupException(e);
+    }
+  }
+
+  private List<PipePluginMeta> getJarListForPipePlugin() {
+    List<PipePluginMeta> res = new ArrayList<>();
+    for (PipePluginMeta pipePluginMeta : resourcesInformationHolder.getPipePluginMetaList()) {
+      // If jar does not exist, add current pipePluginMeta to list
+      if (!PipePluginExecutableManager.getInstance()
+          .hasFileUnderInstallDir(pipePluginMeta.getJarName())) {
+        res.add(pipePluginMeta);
+      } else {
+        try {
+          // local jar has conflicts with jar on config node, add current pipePluginMeta to list
+          if (!PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) {
+            res.add(pipePluginMeta);
+          }
+        } catch (PipeManagementException e) {
+          res.add(pipePluginMeta);
+        }
+      }
+    }
+    return res;
+  }
+
+  private void getJarOfPipePlugins(List<PipePluginMeta> pipePluginMetaList)
+      throws StartupException {
+    try (ConfigNodeClient configNodeClient =
+        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+      List<String> jarNameList =
+          pipePluginMetaList.stream().map(PipePluginMeta::getJarName).collect(Collectors.toList());
+      TGetJarInListResp resp = configNodeClient.getPipePluginJar(new TGetJarInListReq(jarNameList));
+      if (resp.getStatus().getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
+        throw new StartupException("Failed to get pipe plugin jar from config node.");
+      }
+      List<ByteBuffer> jarList = resp.getJarList();
+      for (int i = 0; i < pipePluginMetaList.size(); i++) {
+        PipePluginExecutableManager.getInstance()
+            .saveToInstallDir(jarList.get(i), pipePluginMetaList.get(i).getJarName());
+      }
+    } catch (IOException | TException | ClientManagerException e) {
+      throw new StartupException(e);
+    }
+  }
+
+  private void getPipeInformationList(List<ByteBuffer> allPipeInformation) {
+    if (allPipeInformation != null && !allPipeInformation.isEmpty()) {
+      List<PipePluginMeta> list = new ArrayList<>();
+      for (ByteBuffer pipeInformationByteBuffer : allPipeInformation) {
+        list.add(PipePluginMeta.deserialize(pipeInformationByteBuffer));
+      }
+      resourcesInformationHolder.setPipePluginMetaList(list);
+    }
+  }
+
   private void initSchemaEngine() {
     long time = System.currentTimeMillis();
     SchemaEngine.getInstance().init();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ResourcesInformationHolder.java b/server/src/main/java/org/apache/iotdb/db/service/ResourcesInformationHolder.java
index f26436c1d3..e650ca4856 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ResourcesInformationHolder.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ResourcesInformationHolder.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.service;
 
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
 import org.apache.iotdb.commons.udf.UDFInformation;
 
@@ -33,6 +34,9 @@ public class ResourcesInformationHolder {
   /** store the list when registering in config node for preparing trigger related resources */
   private List<TriggerInformation> triggerInformationList;
 
+  /** store the list when registering in config node for preparing pipe plugin related resources */
+  private List<PipePluginMeta> pipePluginMetaList;
+
   public static int getJarNumOfOneRpc() {
     return JAR_NUM_OF_ONE_RPC;
   }
@@ -52,4 +56,12 @@ public class ResourcesInformationHolder {
   public void setTriggerInformationList(List<TriggerInformation> triggerInformationList) {
     this.triggerInformationList = triggerInformationList;
   }
+
+  public List<PipePluginMeta> getPipePluginMetaList() {
+    return pipePluginMetaList;
+  }
+
+  public void setPipePluginMetaList(List<PipePluginMeta> pipePluginMetaList) {
+    this.pipePluginMetaList = pipePluginMetaList;
+  }
 }
diff --git a/server/src/test/resources/datanode1conf/iotdb-common.properties b/server/src/test/resources/datanode1conf/iotdb-common.properties
index 553d89d795..ba22251554 100644
--- a/server/src/test/resources/datanode1conf/iotdb-common.properties
+++ b/server/src/test/resources/datanode1conf/iotdb-common.properties
@@ -19,4 +19,5 @@
 
 timestamp_precision=ms
 udf_lib_dir=target/datanode1/ext/udf
-trigger_lib_dir=target/datanode1/ext/trigger
\ No newline at end of file
+trigger_lib_dir=target/datanode1/ext/trigger
+pipe_lib_dir=target/datanode1/ext/pipe
diff --git a/server/src/test/resources/datanode2conf/iotdb-common.properties b/server/src/test/resources/datanode2conf/iotdb-common.properties
index 80add439f2..d75d1b5202 100644
--- a/server/src/test/resources/datanode2conf/iotdb-common.properties
+++ b/server/src/test/resources/datanode2conf/iotdb-common.properties
@@ -19,4 +19,5 @@
 
 timestamp_precision=ms
 udf_lib_dir=target/datanode2/ext/udf
-trigger_lib_dir=target/datanode2/ext/trigger
\ No newline at end of file
+trigger_lib_dir=target/datanode2/ext/trigger
+pipe_lib_dir=target/datanode2/ext/pipe
diff --git a/server/src/test/resources/datanode3conf/iotdb-common.properties b/server/src/test/resources/datanode3conf/iotdb-common.properties
index 7a06e6d735..ebd7d4fa59 100644
--- a/server/src/test/resources/datanode3conf/iotdb-common.properties
+++ b/server/src/test/resources/datanode3conf/iotdb-common.properties
@@ -19,4 +19,5 @@
 
 timestamp_precision=ms
 udf_lib_dir=target/datanode3/ext/udf
-trigger_lib_dir=target/datanode3/ext/trigger
\ No newline at end of file
+trigger_lib_dir=target/datanode3/ext/trigger
+pipe_lib_dir=target/datanode3/ext/pipe
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index d9b379e9a1..fc866714f2 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -90,6 +90,7 @@ struct TRuntimeConfiguration {
   2: required list<binary> allTriggerInformation
   3: required list<binary> allUDFInformation
   4: required binary allTTLInformation
+  5: required list<binary> allPipeInformation
 }
 
 struct TDataNodeRegisterReq {
@@ -1100,6 +1101,11 @@ service IConfigNodeRPCService {
    */
   TGetPipePluginTableResp getPipePluginTable();
 
+  /**
+   * Return the pipe plugin jar list of the plugin name list
+   */
+  TGetJarInListResp getPipePluginJar(TGetJarInListReq req)
+
   // ======================================================
   // Maintenance Tools
   // ======================================================