You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/03/04 19:02:29 UTC

[iotdb] branch master updated: [IOTDB-5598] Pipe Plugins Management: from SQL to CN (#9175)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new dfdb844142 [IOTDB-5598] Pipe Plugins Management: from SQL to CN (#9175)
dfdb844142 is described below

commit dfdb84414288d4bc006deb3a9fda99c053c2d5af
Author: Itami Sho <42...@users.noreply.github.com>
AuthorDate: Sun Mar 5 03:02:23 2023 +0800

    [IOTDB-5598] Pipe Plugins Management: from SQL to CN (#9175)
    
    * init
    
    * add licenses
    
    * format code style in the antlr file
    
    * add auth control
    
    * pipe plugin common package structure minor adjustment
    
    * fix build errors
    
    ---------
    
    Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
 .../org/apache/iotdb/db/qp/sql/IdentifierParser.g4 |   2 +
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  27 +++-
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |   8 +
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |  26 +++
 .../confignode/conf/ConfigNodeDescriptor.java      |   2 +
 .../iotdb/confignode/manager/ConfigManager.java    |  24 +++
 .../apache/iotdb/confignode/manager/IManager.java  |  11 ++
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  18 +++
 .../iotdb/commons/auth/entity/PrivilegeType.java   |   6 +-
 .../apache/iotdb/commons/conf/IoTDBConstant.java   |   1 +
 .../commons/pipe/plugin/meta/PipePluginMeta.java   | 136 ++++++++++++++++
 .../pipe/plugin/service/PipePluginClassLoader.java |  90 +++++++++++
 .../service/PipePluginClassLoaderManager.java      | 103 ++++++++++++
 .../service/PipePluginExecutableManager.java       |  50 ++++++
 .../apache/iotdb/commons/service/ServiceType.java  |   5 +-
 .../org/apache/iotdb/db/audit/AuditLogger.java     |   3 +
 .../org/apache/iotdb/db/auth/AuthorityChecker.java |   6 +
 .../apache/iotdb/db/client/ConfigNodeClient.java   |  66 ++++++++
 .../db/mpp/common/header/ColumnHeaderConstant.java |  10 ++
 .../db/mpp/common/header/DatasetHeaderFactory.java |   4 +
 .../plan/execution/config/ConfigTaskVisitor.java   |  24 +++
 .../config/executor/ClusterConfigTaskExecutor.java | 178 ++++++++++++++++++++-
 .../config/executor/IConfigTaskExecutor.java       |   7 +
 .../config/metadata/CreatePipePluginTask.java      |  42 +++++
 .../config/metadata/DropPipePluginTask.java        |  42 +++++
 .../config/metadata/ShowPipePluginsTask.java       |  80 +++++++++
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       |  34 +++-
 .../iotdb/db/mpp/plan/statement/StatementType.java |   6 +-
 .../db/mpp/plan/statement/StatementVisitor.java    |  16 ++
 .../metadata/CreatePipePluginStatement.java        |  72 +++++++++
 .../metadata/DropPipePluginStatement.java          |  60 +++++++
 .../metadata/ShowPipePluginsStatement.java         |  52 ++++++
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |  10 +-
 .../src/main/thrift/confignode.thrift              |  44 +++++
 34 files changed, 1244 insertions(+), 21 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
index 5889464db9..11450e868c 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
@@ -131,6 +131,8 @@ keyWords
     | PIPESINK
     | PIPESINKS
     | PIPESINKTYPE
+    | PIPEPLUGIN
+    | PIPEPLUGINS
     | POLICY
     | PREVIOUS
     | PREVIOUSUNTILLAST
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 5f04e5406d..d805cb9b9a 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -37,12 +37,12 @@ statement
 
 ddlStatement
     : createStorageGroup | createTimeseries | createSchemaTemplate | createTimeseriesOfSchemaTemplate
-    | createFunction | createTrigger | createContinuousQuery
+    | createFunction | createTrigger | createPipePlugin | createContinuousQuery
     | alterTimeseries | alterStorageGroup | deleteStorageGroup | deleteTimeseries | deletePartition | deleteTimeseriesOfSchemaTemplate
-    | dropFunction | dropTrigger | dropContinuousQuery | dropSchemaTemplate
+    | dropFunction | dropTrigger | dropPipePlugin | dropContinuousQuery | dropSchemaTemplate
     | setTTL | unsetTTL | startTrigger | stopTrigger | setSchemaTemplate | unsetSchemaTemplate
     | showStorageGroup | showDevices | showTimeseries | showChildPaths | showChildNodes
-    | showFunctions | showTriggers | showContinuousQueries | showTTL | showAllTTL | showCluster | showVariables | showRegion | showDataNodes | showConfigNodes
+    | showFunctions | showTriggers | showPipePlugins | showContinuousQueries | showTTL | showAllTTL | showCluster | showVariables | showRegion | showDataNodes | showConfigNodes
     | showSchemaTemplates | showNodesInSchemaTemplate
     | showPathsUsingSchemaTemplate | showPathsSetSchemaTemplate
     | countStorageGroup | countDevices | countTimeseries | countNodes
@@ -119,10 +119,10 @@ createTimeseriesOfSchemaTemplate
 
 // Create Function
 createFunction
-    : CREATE FUNCTION udfName=identifier AS className=STRING_LITERAL uriClasue?
+    : CREATE FUNCTION udfName=identifier AS className=STRING_LITERAL uriClause?
     ;
 
-uriClasue
+uriClause
     : USING URI uri
     ;
 
@@ -136,7 +136,7 @@ createTrigger
         triggerEventClause
         ON prefixPath
         AS className=STRING_LITERAL
-        uriClasue?
+        uriClause?
         triggerAttributeClause?
     ;
 
@@ -156,6 +156,11 @@ triggerAttribute
     : key=attributeKey operator_eq value=attributeValue
     ;
 
+// Create Pipe Plugin
+createPipePlugin
+    : CREATE PIPEPLUGIN pluginName=identifier AS className=STRING_LITERAL uriClause
+    ;
+
 // Create Continuous Query
 createContinuousQuery
     : CREATE (CONTINUOUS QUERY | CQ) cqId=identifier
@@ -225,6 +230,11 @@ dropFunction
     : DROP FUNCTION udfName=identifier
     ;
 
+// Drop Pipe Plugin
+dropPipePlugin
+    : DROP PIPEPLUGIN pluginName=identifier
+    ;
+
 // Drop Trigger
 dropTrigger
     : DROP TRIGGER triggerName=identifier
@@ -329,6 +339,11 @@ showTriggers
     : SHOW TRIGGERS
     ;
 
+// Show Pipe Plugins
+showPipePlugins
+    : SHOW PIPEPLUGINS
+    ;
+
 // Show Continuous Queries
 showContinuousQueries
     : SHOW (CONTINUOUS QUERIES | CQS)
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index 24ba612b93..cdfcea1ad3 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -454,6 +454,14 @@ PIPESINKTYPE
     : P I P E S I N K T Y P E
     ;
 
+PIPEPLUGIN
+    : P I P E P L U G I N
+    ;
+
+PIPEPLUGINS
+    : P I P E P L U G I N S
+    ;
+
 POLICY
     : P O L I C Y
     ;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 71cac276b7..0a725b42b5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -168,6 +168,13 @@ public class ConfigNodeConfig {
   private String triggerTemporaryLibDir =
       triggerDir + File.separator + IoTDBConstant.TMP_FOLDER_NAME;
 
+  /** External lib directory for pipe, stores user-uploaded JAR files */
+  private String pipeDir =
+      IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.PIPE_FOLDER_NAME;
+
+  /** External temporary lib directory for storing downloaded pipe JAR files */
+  private String pipeTemporaryLibDir = pipeDir + File.separator + IoTDBConstant.TMP_FOLDER_NAME;
+
   /** Time partition interval in milliseconds */
   private long timePartitionInterval = 604_800_000;
 
@@ -305,6 +312,8 @@ public class ConfigNodeConfig {
     udfTemporaryLibDir = addHomeDir(udfTemporaryLibDir);
     triggerDir = addHomeDir(triggerDir);
     triggerTemporaryLibDir = addHomeDir(triggerTemporaryLibDir);
+    pipeDir = addHomeDir(pipeDir);
+    pipeTemporaryLibDir = addHomeDir(pipeTemporaryLibDir);
   }
 
   private String addHomeDir(String dir) {
@@ -620,6 +629,23 @@ public class ConfigNodeConfig {
     this.triggerTemporaryLibDir = triggerDir + File.separator + IoTDBConstant.TMP_FOLDER_NAME;
   }
 
+  public String getPipeDir() {
+    return pipeDir;
+  }
+
+  public void setPipeDir(String pipeDir) {
+    this.pipeDir = pipeDir;
+    updatePipeTemporaryLibDir();
+  }
+
+  public String getPipeTemporaryLibDir() {
+    return pipeTemporaryLibDir;
+  }
+
+  public void updatePipeTemporaryLibDir() {
+    this.pipeTemporaryLibDir = pipeDir + File.separator + IoTDBConstant.TMP_FOLDER_NAME;
+  }
+
   public int getSchemaReplicationFactor() {
     return schemaReplicationFactor;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 4aa8d54522..d212c6819c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -330,6 +330,8 @@ public class ConfigNodeDescriptor {
 
     conf.setTriggerDir(properties.getProperty("trigger_lib_dir", conf.getTriggerDir()).trim());
 
+    conf.setPipeDir(properties.getProperty("pipe_lib_dir", conf.getPipeDir()).trim());
+
     conf.setTimePartitionInterval(
         Long.parseLong(
             properties
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 1c3023588f..8680b11ff7 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -107,6 +107,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateModelReq;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
@@ -127,6 +128,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
@@ -1222,6 +1224,28 @@ public class ConfigManager implements IManager {
         : new TGetJarInListResp(status, Collections.emptyList());
   }
 
+  @Override
+  public TSStatus createPipePlugin(TCreatePipePluginReq req) {
+    // TODO: implement PipeManager
+    TSStatus status = confirmLeader();
+    LOGGER.info("createPipePlugin: {}", req);
+    return status;
+  }
+
+  @Override
+  public TSStatus dropPipePlugin(String pipePluginName) {
+    // TODO: implement PipeManager
+    TSStatus status = confirmLeader();
+    return status;
+  }
+
+  @Override
+  public TGetPipePluginTableResp getPipePluginTable() {
+    // TODO: implement PipeManager
+    TSStatus status = confirmLeader();
+    return new TGetPipePluginTableResp(status, Collections.emptyList());
+  }
+
   @Override
   public TSStatus merge() {
     TSStatus status = confirmLeader();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index b82577df94..9b1e1d55f6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -53,6 +53,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateModelReq;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
@@ -72,6 +73,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
@@ -417,6 +419,15 @@ public interface IManager {
   /** Get Trigger jar */
   TGetJarInListResp getTriggerJar(TGetJarInListReq req);
 
+  /** Create pipe plugin */
+  TSStatus createPipePlugin(TCreatePipePluginReq req);
+
+  /** Drop pipe plugin */
+  TSStatus dropPipePlugin(String pluginName);
+
+  /** Show pipe plugins */
+  TGetPipePluginTableResp getPipePluginTable();
+
   /** Merge on all DataNodes */
   TSStatus merge();
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 248426f1c5..f8dc8c7efa 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -77,6 +77,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCountDatabaseResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateModelReq;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
@@ -98,6 +99,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
@@ -107,6 +109,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
@@ -691,6 +694,21 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
     return configManager.getTriggerJar(req);
   }
 
+  @Override
+  public TSStatus createPipePlugin(TCreatePipePluginReq req) {
+    return configManager.createPipePlugin(req);
+  }
+
+  @Override
+  public TSStatus dropPipePlugin(TDropPipePluginReq req) {
+    return configManager.dropPipePlugin(req.getPluginName());
+  }
+
+  @Override
+  public TGetPipePluginTableResp getPipePluginTable() {
+    return configManager.getPipePluginTable();
+  }
+
   @Override
   public TSStatus merge() throws TException {
     return configManager.merge();
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/auth/entity/PrivilegeType.java b/node-commons/src/main/java/org/apache/iotdb/commons/auth/entity/PrivilegeType.java
index 36c2ecadca..34a7b68472 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/auth/entity/PrivilegeType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/auth/entity/PrivilegeType.java
@@ -56,7 +56,11 @@ public enum PrivilegeType {
   READ_TEMPLATE,
   APPLY_TEMPLATE(true),
   READ_TEMPLATE_APPLICATION,
-  SHOW_CONTINUOUS_QUERIES;
+  SHOW_CONTINUOUS_QUERIES,
+  CREATE_PIPEPLUGIN,
+  DROP_PIPEPLUGIN,
+  SHOW_PIPEPLUGINS,
+  ;
 
   private static final int PRIVILEGE_COUNT = values().length;
 
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index 628fdbf8a2..92c6a6aa77 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -223,6 +223,7 @@ public class IoTDBConstant {
   public static final String EXT_FOLDER_NAME = "ext";
   public static final String UDF_FOLDER_NAME = "udf";
   public static final String TRIGGER_FOLDER_NAME = "trigger";
+  public static final String PIPE_FOLDER_NAME = "pipe";
   public static final String TMP_FOLDER_NAME = "tmp";
 
   public static final String MQTT_FOLDER_NAME = "mqtt";
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMeta.java
new file mode 100644
index 0000000000..c4ab9ef69d
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMeta.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.meta;
+
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class PipePluginMeta {
+  private String pluginName;
+  private String className;
+  private String pluginType;
+  private String jarName;
+  private String jarMD5;
+
+  private PipePluginMeta() {}
+
+  public PipePluginMeta(String pluginName, String className, String pluginType) {
+    this.pluginName = pluginName;
+    this.className = className;
+    this.pluginType = pluginType;
+  }
+
+  public String getPluginName() {
+    return pluginName;
+  }
+
+  public String getClassName() {
+    return className;
+  }
+
+  public String getPluginType() {
+    return pluginType;
+  }
+
+  public String getJarName() {
+    return jarName;
+  }
+
+  public String getJarMD5() {
+    return jarMD5;
+  }
+
+  public void setPluginName(String pluginName) {
+    this.pluginName = pluginName.toUpperCase();
+  }
+
+  public void setClassName(String className) {
+    this.className = className;
+  }
+
+  public void setPluginType(String pluginType) {
+    this.pluginType = pluginType;
+  }
+
+  public void setJarName(String jarName) {
+    this.jarName = jarName;
+  }
+
+  public void setJarMD5(String jarMD5) {
+    this.jarMD5 = jarMD5;
+  }
+
+  public ByteBuffer serialize() throws IOException {
+    PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+    serialize(outputStream);
+    return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+  }
+
+  public void serialize(DataOutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write(pluginName, outputStream);
+    ReadWriteIOUtils.write(className, outputStream);
+    ReadWriteIOUtils.write(pluginType, outputStream);
+    ReadWriteIOUtils.write(jarName, outputStream);
+    ReadWriteIOUtils.write(jarMD5, outputStream);
+  }
+
+  public static PipePluginMeta deserialize(ByteBuffer buffer) {
+    PipePluginMeta pipePluginMeta = new PipePluginMeta();
+    pipePluginMeta.setPluginName(Objects.requireNonNull(ReadWriteIOUtils.readString(buffer)));
+    pipePluginMeta.setClassName(ReadWriteIOUtils.readString(buffer));
+    pipePluginMeta.setPluginType(ReadWriteIOUtils.readString(buffer));
+    pipePluginMeta.setJarName(ReadWriteIOUtils.readString(buffer));
+    pipePluginMeta.setJarMD5(ReadWriteIOUtils.readString(buffer));
+    return pipePluginMeta;
+  }
+
+  public static PipePluginMeta deserialize(InputStream inputStream) throws IOException {
+    return deserialize(
+        ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(inputStream)));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    PipePluginMeta that = (PipePluginMeta) o;
+    return Objects.equals(pluginName, that.pluginName)
+        && Objects.equals(className, that.className)
+        && Objects.equals(pluginType, that.pluginType)
+        && Objects.equals(jarName, that.jarName)
+        && Objects.equals(jarMD5, that.jarMD5);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(pluginName);
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoader.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoader.java
new file mode 100644
index 0000000000..d221e21a96
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoader.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.service;
+
+import org.apache.iotdb.commons.file.SystemFileFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@ThreadSafe
+public class PipePluginClassLoader extends URLClassLoader {
+
+  private final String libRoot;
+
+  /**
+   * If activeInstanceCount is equals to 0, it means that there is no instance using this
+   * classloader. This classloader can only be closed when activeInstanceCount is equals to 0.
+   */
+  private final AtomicLong activeInstanceCount;
+
+  /**
+   * If this classloader is marked as deprecated, then this classloader can be closed after all
+   * instances that use this classloader are closed.
+   */
+  private volatile boolean deprecated;
+
+  public PipePluginClassLoader(String libRoot) throws IOException {
+    super(new URL[0]);
+    this.libRoot = libRoot;
+    activeInstanceCount = new AtomicLong(0);
+    deprecated = false;
+    addURLs();
+  }
+
+  private void addURLs() throws IOException {
+    try (Stream<Path> pathStream =
+        Files.walk(SystemFileFactory.INSTANCE.getFile(libRoot).toPath())) {
+      // skip directory
+      for (Path path :
+          pathStream.filter(path -> !path.toFile().isDirectory()).collect(Collectors.toList())) {
+        super.addURL(path.toUri().toURL());
+      }
+    }
+  }
+
+  public synchronized void acquire() {
+    activeInstanceCount.incrementAndGet();
+  }
+
+  public synchronized void release() throws IOException {
+    activeInstanceCount.decrementAndGet();
+    closeIfPossible();
+  }
+
+  public synchronized void markAsDeprecated() throws IOException {
+    deprecated = true;
+    closeIfPossible();
+  }
+
+  private void closeIfPossible() throws IOException {
+    if (deprecated && activeInstanceCount.get() == 0) {
+      close();
+    }
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoaderManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoaderManager.java
new file mode 100644
index 0000000000..3766af8bd1
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoaderManager.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.service;
+
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.commons.service.IService;
+import org.apache.iotdb.commons.service.ServiceType;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+
+@NotThreadSafe
+public class PipePluginClassLoaderManager implements IService {
+
+  private final String libRoot;
+
+  /**
+   * activeClassLoader is used to load all classes under libRoot. libRoot may be updated before the
+   * user executes CREATE PIPEPLUGIN or after the user executes DROP PIPEPLUGIN. Therefore, we need
+   * to continuously maintain the activeClassLoader so that the classes it loads are always
+   * up-to-date.
+   */
+  private volatile PipePluginClassLoader activeClassLoader;
+
+  private PipePluginClassLoaderManager(String libRoot) throws IOException {
+    this.libRoot = libRoot;
+    activeClassLoader = new PipePluginClassLoader(libRoot);
+  }
+
+  public PipePluginClassLoader updateAndGetActiveClassLoader() throws IOException {
+    PipePluginClassLoader deprecatedClassLoader = activeClassLoader;
+    activeClassLoader = new PipePluginClassLoader(libRoot);
+    if (deprecatedClassLoader != null) {
+      deprecatedClassLoader.markAsDeprecated();
+    }
+    return activeClassLoader;
+  }
+
+  public PipePluginClassLoader getActiveClassLoader() {
+    return activeClassLoader;
+  }
+
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+  // IService
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public void start() throws StartupException {
+    try {
+      SystemFileFactory.INSTANCE.makeDirIfNecessary(libRoot);
+      activeClassLoader = new PipePluginClassLoader(libRoot);
+    } catch (IOException e) {
+      throw new StartupException(this.getID().getName(), e.getMessage());
+    }
+  }
+
+  @Override
+  public void stop() {
+    // nothing to do
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.PIPE_PLUGIN_CLASSLOADER_MANAGER_SERVICE;
+  }
+
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+  // singleton instance holder
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+
+  private static PipePluginClassLoaderManager INSTANCE = null;
+
+  public static synchronized PipePluginClassLoaderManager getInstance(String libRoot)
+      throws IOException {
+    if (INSTANCE == null) {
+      INSTANCE = new PipePluginClassLoaderManager(libRoot);
+    }
+    return INSTANCE;
+  }
+
+  public static PipePluginClassLoaderManager getInstance() {
+    return INSTANCE;
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java
new file mode 100644
index 0000000000..0466f888ee
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.service;
+
+import org.apache.iotdb.commons.executable.ExecutableManager;
+import org.apache.iotdb.commons.file.SystemFileFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+public class PipePluginExecutableManager extends ExecutableManager {
+
+  private static PipePluginExecutableManager INSTANCE = null;
+
+  public PipePluginExecutableManager(String temporaryLibRoot, String libRoot) {
+    super(temporaryLibRoot, libRoot);
+  }
+
+  public static synchronized PipePluginExecutableManager setupAndGetInstance(
+      String temporaryLibRoot, String libRoot) throws IOException {
+    if (INSTANCE == null) {
+      SystemFileFactory.INSTANCE.makeDirIfNecessary(temporaryLibRoot);
+      SystemFileFactory.INSTANCE.makeDirIfNecessary(libRoot);
+      SystemFileFactory.INSTANCE.makeDirIfNecessary(libRoot + File.separator + INSTALL_DIR);
+      INSTANCE = new PipePluginExecutableManager(temporaryLibRoot, libRoot);
+    }
+    return INSTANCE;
+  }
+
+  public static PipePluginExecutableManager getInstance() {
+    return INSTANCE;
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index 4dcfd16a3d..101b50405f 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -72,8 +72,9 @@ public enum ServiceType {
   FRAGMENT_INSTANCE_MANAGER_SERVICE("Fragment instance manager", "FragmentInstanceManager"),
   MPP_DATA_EXCHANGE_SERVICE("MPP Data exchange manager", "MPPDataExchangeManager"),
   INTERNAL_SERVICE("Internal Service", "InternalService"),
-  IOT_CONSENSUS_SERVICE("IoTConsensus Service", "IoTConsensusRPCService");
-
+  IOT_CONSENSUS_SERVICE("IoTConsensus Service", "IoTConsensusRPCService"),
+  PIPE_PLUGIN_CLASSLOADER_MANAGER_SERVICE(
+      "Pipe Plugin Classloader Manager Service", "PipePluginClassLoader");
   private final String name;
   private final String jmxName;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java b/server/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
index 3d1fe71dea..c3e05f4ffc 100644
--- a/server/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
@@ -186,6 +186,8 @@ public class AuditLogger {
       case STOP_PIPE:
       case DROP_PIPE:
       case DEACTIVATE_TEMPLATE:
+      case CREATE_PIPEPLUGIN:
+      case DROP_PIPEPLUGIN:
         return AuditLogOperation.DDL;
       case LOAD_DATA:
       case INSERT:
@@ -229,6 +231,7 @@ public class AuditLogger {
       case FETCH_SCHEMA:
       case COUNT:
       case SHOW_TRIGGERS:
+      case SHOW_PIPEPLUGINS:
         return AuditLogOperation.QUERY;
       default:
         logger.error("Unrecognizable operator type ({}) for audit log", type);
diff --git a/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java b/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
index c661c27421..094328ea79 100644
--- a/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
+++ b/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
@@ -237,6 +237,12 @@ public class AuthorityChecker {
         return PrivilegeType.READ_TEMPLATE_APPLICATION.ordinal();
       case SHOW_CONTINUOUS_QUERIES:
         return PrivilegeType.SHOW_CONTINUOUS_QUERIES.ordinal();
+      case CREATE_PIPEPLUGIN:
+        return PrivilegeType.CREATE_PIPEPLUGIN.ordinal();
+      case DROP_PIPEPLUGIN:
+        return PrivilegeType.DROP_PIPEPLUGIN.ordinal();
+      case SHOW_PIPEPLUGINS:
+        return PrivilegeType.SHOW_PIPEPLUGINS.ordinal();
       default:
         logger.error("Unrecognizable operator type ({}) for AuthorityChecker.", type);
         return -1;
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 7ebe15a043..a0673904c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCountDatabaseResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateModelReq;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
@@ -64,6 +65,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
@@ -73,6 +75,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
@@ -1401,6 +1404,69 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
     throw new TException(MSG_RECONNECTION_FAIL);
   }
 
+  @Override
+  public TSStatus createPipePlugin(TCreatePipePluginReq req) throws TException {
+    for (int i = 0; i < RETRY_NUM; i++) {
+      try {
+        TSStatus status = client.createPipePlugin(req);
+        if (!updateConfigNodeLeader(status)) {
+          return status;
+        }
+      } catch (TException e) {
+        logger.warn(
+            "Failed to connect to ConfigNode {} from DataNode {} when executing {}",
+            configNode,
+            config.getAddressAndPort(),
+            Thread.currentThread().getStackTrace()[1].getMethodName());
+        configLeader = null;
+      }
+      waitAndReconnect();
+    }
+    throw new TException(MSG_RECONNECTION_FAIL);
+  }
+
+  @Override
+  public TSStatus dropPipePlugin(TDropPipePluginReq req) throws TException {
+    for (int i = 0; i < RETRY_NUM; i++) {
+      try {
+        TSStatus status = client.dropPipePlugin(req);
+        if (!updateConfigNodeLeader(status)) {
+          return status;
+        }
+      } catch (TException e) {
+        logger.warn(
+            "Failed to connect to ConfigNode {} from DataNode {} when executing {}",
+            configNode,
+            config.getAddressAndPort(),
+            Thread.currentThread().getStackTrace()[1].getMethodName());
+        configLeader = null;
+      }
+      waitAndReconnect();
+    }
+    throw new TException(MSG_RECONNECTION_FAIL);
+  }
+
+  @Override
+  public TGetPipePluginTableResp getPipePluginTable() throws TException {
+    for (int i = 0; i < RETRY_NUM; i++) {
+      try {
+        TGetPipePluginTableResp resp = client.getPipePluginTable();
+        if (!updateConfigNodeLeader(resp.getStatus())) {
+          return resp;
+        }
+      } catch (TException e) {
+        logger.warn(
+            "Failed to connect to ConfigNode {} from DataNode {} when executing {}",
+            configNode,
+            config.getAddressAndPort(),
+            Thread.currentThread().getStackTrace()[1].getMethodName());
+        configLeader = null;
+      }
+      waitAndReconnect();
+    }
+    throw new TException(MSG_RECONNECTION_FAIL);
+  }
+
   @Override
   public TSStatus createSchemaTemplate(TCreateSchemaTemplateReq req) throws TException {
     for (int i = 0; i < RETRY_NUM; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
index baaf9a773c..61713b5e34 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
@@ -96,6 +96,10 @@ public class ColumnHeaderConstant {
   public static final String PATH_PATTERN = "PathPattern";
   public static final String CLASS_NAME = "ClassName";
 
+  // column names for show pipe plugins statement
+  public static final String PLUGIN_NAME = "PluginName";
+  public static final String PLUGIN_TYPE = "PluginType";
+
   // show cluster status
   public static final String NODE_TYPE_CONFIG_NODE = "ConfigNode";
   public static final String NODE_TYPE_DATA_NODE = "DataNode";
@@ -325,6 +329,12 @@ public class ColumnHeaderConstant {
           new ColumnHeader(CLASS_NAME, TSDataType.TEXT),
           new ColumnHeader(NODE_ID, TSDataType.TEXT));
 
+  public static final List<ColumnHeader> showPipePluginsColumnHeaders =
+      ImmutableList.of(
+          new ColumnHeader(PLUGIN_NAME, TSDataType.TEXT),
+          new ColumnHeader(PLUGIN_TYPE, TSDataType.TEXT),
+          new ColumnHeader(CLASS_NAME, TSDataType.TEXT));
+
   public static final List<ColumnHeader> showSchemaTemplateHeaders =
       ImmutableList.of(new ColumnHeader(TEMPLATE_NAME, TSDataType.TEXT));
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
index 7e8eb19b05..e47e4370ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
@@ -99,6 +99,10 @@ public class DatasetHeaderFactory {
     return new DatasetHeader(ColumnHeaderConstant.showTriggersColumnHeaders, true);
   }
 
+  public static DatasetHeader getShowPipePluginsHeader() {
+    return new DatasetHeader(ColumnHeaderConstant.showPipePluginsColumnHeaders, true);
+  }
+
   public static DatasetHeader getShowRegionHeader() {
     return new DatasetHeader(ColumnHeaderConstant.showRegionColumnHeaders, true);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
index b5d71185e6..8aa4efdfac 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
@@ -22,12 +22,14 @@ package org.apache.iotdb.db.mpp.plan.execution.config;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.CountStorageGroupTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.CreateContinuousQueryTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.CreateFunctionTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.metadata.CreatePipePluginTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.CreateTriggerTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.DatabaseSchemaTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.DeleteStorageGroupTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.DeleteTimeSeriesTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.DropContinuousQueryTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.DropFunctionTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.metadata.DropPipePluginTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.DropTriggerTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.GetRegionIdTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.GetSeriesSlotListTask;
@@ -40,6 +42,7 @@ import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowConfigNodesTas
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowContinuousQueriesTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowDataNodesTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowFunctionsTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowPipePluginsTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowRegionTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowStorageGroupTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowTTLTask;
@@ -75,12 +78,14 @@ import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateContinuousQueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateFunctionStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreatePipePluginStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTriggerStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DatabaseSchemaStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DropContinuousQueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DropFunctionStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.DropPipePluginStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DropTriggerStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.GetRegionIdStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.GetSeriesSlotListStatement;
@@ -92,6 +97,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowConfigNodesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowContinuousQueriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDataNodesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowFunctionsStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowPipePluginsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowRegionStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTTLStatement;
@@ -269,6 +275,24 @@ public class ConfigTaskVisitor
     return new ShowTriggersTask();
   }
 
+  @Override
+  public IConfigTask visitCreatePipePlugin(
+      CreatePipePluginStatement createPipePluginStatement, TaskContext context) {
+    return new CreatePipePluginTask(createPipePluginStatement);
+  }
+
+  @Override
+  public IConfigTask visitDropPipePlugin(
+      DropPipePluginStatement dropPipePluginStatement, TaskContext context) {
+    return new DropPipePluginTask(dropPipePluginStatement);
+  }
+
+  @Override
+  public IConfigTask visitShowPipePlugins(
+      ShowPipePluginsStatement showPipePluginStatement, TaskContext context) {
+    return new ShowPipePluginsTask();
+  }
+
   @Override
   public IConfigTask visitShowRegion(ShowRegionStatement showRegionStatement, TaskContext context) {
     return new ShowRegionTask(showRegionStatement);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index a079c260e4..14838989c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -32,12 +32,15 @@ import org.apache.iotdb.commons.executable.ExecutableManager;
 import org.apache.iotdb.commons.executable.ExecutableResource;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoader;
+import org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
 import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
 import org.apache.iotdb.commons.udf.service.UDFClassLoader;
 import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
 import org.apache.iotdb.confignode.rpc.thrift.TCountDatabaseResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
@@ -47,8 +50,10 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteDatabasesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
@@ -109,6 +114,7 @@ import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeTask;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateContinuousQueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateFunctionStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreatePipePluginStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTriggerStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DatabaseSchemaStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
@@ -336,7 +342,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
           future.setException(
               new IoTDBException(
                   "URI is empty, please specify the URI.",
-                  TSStatusCode.TRIGGER_DOWNLOAD_ERROR.getStatusCode()));
+                  TSStatusCode.UDF_DOWNLOAD_ERROR.getStatusCode()));
           return future;
         }
         jarFileName = new File(uriString).getName();
@@ -346,7 +352,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
             future.setException(
                 new IoTDBException(
                     "The scheme of URI is not set, please specify the scheme of URI.",
-                    TSStatusCode.TRIGGER_DOWNLOAD_ERROR.getStatusCode()));
+                    TSStatusCode.UDF_DOWNLOAD_ERROR.getStatusCode()));
             return future;
           }
           if (!uri.getScheme().equals("file")) {
@@ -470,7 +476,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
                 getUDFTableResp.getStatus().message, getUDFTableResp.getStatus().code));
         return future;
       }
-      // convert triggerTable and buildTsBlock
+      // convert UDFTable and buildTsBlock
       ShowFunctionsTask.buildTsBlock(getUDFTableResp.getAllUDFInformation(), future);
     } catch (ClientManagerException | TException e) {
       future.setException(e);
@@ -507,7 +513,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
           future.setException(
               new IoTDBException(
                   "URI is empty, please specify the URI.",
-                  TSStatusCode.UDF_DOWNLOAD_ERROR.getStatusCode()));
+                  TSStatusCode.TRIGGER_DOWNLOAD_ERROR.getStatusCode()));
           return future;
         }
         jarFileName = new File(uriString).getName();
@@ -654,6 +660,170 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
     return future;
   }
 
+  @Override
+  public SettableFuture<ConfigTaskResult> createPipePlugin(
+      CreatePipePluginStatement createPipePluginStatement) {
+    final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+    final String pluginName = createPipePluginStatement.getPluginName();
+    final String className = createPipePluginStatement.getClassName();
+    final String uriString = createPipePluginStatement.getUriString();
+
+    if (uriString == null || uriString.isEmpty()) {
+      future.setException(
+          new IoTDBException(
+              "Failed to create pipe plugin, because the URI is empty.",
+              TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode()));
+      return future;
+    }
+
+    try (final ConfigNodeClient client =
+        CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+      String libRoot;
+      ByteBuffer jarFile;
+      String jarMd5;
+
+      final String jarFileName = new File(uriString).getName();
+      try {
+        URI uri = new URI(uriString);
+        if (uri.getScheme() == null) {
+          future.setException(
+              new IoTDBException(
+                  "The scheme of URI is not set, please specify the scheme of URI.",
+                  TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode()));
+          return future;
+        }
+        if (!uri.getScheme().equals("file")) {
+          // Download executable
+          ExecutableResource resource =
+              PipePluginExecutableManager.getInstance()
+                  .request(Collections.singletonList(uriString));
+          String jarFilePathUnderTempDir =
+              PipePluginExecutableManager.getInstance()
+                      .getDirStringUnderTempRootByRequestId(resource.getRequestId())
+                  + File.separator
+                  + jarFileName;
+          // libRoot should be the path of the specified jar
+          libRoot = jarFilePathUnderTempDir;
+          jarFile = ExecutableManager.transferToBytebuffer(jarFilePathUnderTempDir);
+          jarMd5 = DigestUtils.md5Hex(Files.newInputStream(Paths.get(jarFilePathUnderTempDir)));
+        } else {
+          // libRoot should be the path of the specified jar
+          libRoot = new File(new URI(uriString)).getAbsolutePath();
+          // If jarPath is a file path on datanode, we transfer it to ByteBuffer and send it to
+          // ConfigNode.
+          jarFile = ExecutableManager.transferToBytebuffer(libRoot);
+          // Set md5 of the jar file
+          jarMd5 = DigestUtils.md5Hex(Files.newInputStream(Paths.get(libRoot)));
+        }
+      } catch (IOException | URISyntaxException e) {
+        LOGGER.warn(
+            "Failed to get executable for PipePlugin({}) using URI: {}, the cause is: {}",
+            createPipePluginStatement.getPluginName(),
+            createPipePluginStatement.getUriString(),
+            e);
+        future.setException(
+            new IoTDBException(
+                "Failed to get executable for PipePlugin"
+                    + createPipePluginStatement.getPluginName()
+                    + "', please check the URI.",
+                TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode()));
+        return future;
+      }
+
+      // try to create instance, this request will fail if creation is not successful
+      try (PipePluginClassLoader classLoader = new PipePluginClassLoader(libRoot)) {
+        // ensure that jar file contains the class and the class is a pipe plugin
+        Class<?> clazz = Class.forName(createPipePluginStatement.getClassName(), true, classLoader);
+        clazz.getDeclaredConstructor().newInstance();
+      } catch (ClassNotFoundException
+          | NoSuchMethodException
+          | InstantiationException
+          | IllegalAccessException
+          | InvocationTargetException
+          | ClassCastException e) {
+        LOGGER.warn(
+            "Failed to create function when try to create PipePlugin({}) instance first, the cause is: {}",
+            createPipePluginStatement.getPluginName(),
+            e);
+        future.setException(
+            new IoTDBException(
+                "Failed to load class '"
+                    + createPipePluginStatement.getClassName()
+                    + "', because it's not found in jar file: "
+                    + createPipePluginStatement.getUriString(),
+                TSStatusCode.PIPE_PLUGIN_LOAD_CLASS_ERROR.getStatusCode()));
+        return future;
+      }
+
+      final TSStatus executionStatus =
+          client.createPipePlugin(
+              new TCreatePipePluginReq()
+                  .setPluginName(pluginName)
+                  .setClassName(className)
+                  .setJarFile(jarFile)
+                  .setJarMD5(jarMd5)
+                  .setJarName(
+                      String.format(
+                          "%s-%s.%s",
+                          jarFileName.substring(0, jarFileName.lastIndexOf(".")),
+                          jarMd5,
+                          jarFileName.substring(jarFileName.lastIndexOf(".") + 1))));
+      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
+        LOGGER.warn(
+            "Failed to create PipePlugin {}({}) because {}",
+            pluginName,
+            className,
+            executionStatus.getMessage());
+        future.setException(new IoTDBException(executionStatus.message, executionStatus.code));
+      } else {
+        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+      }
+    } catch (ClientManagerException | TException | IOException e) {
+      future.setException(e);
+    }
+    return future;
+  }
+
+  @Override
+  public SettableFuture<ConfigTaskResult> dropPipePlugin(String pluginName) {
+    final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+    try (final ConfigNodeClient client =
+        CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+      final TSStatus executionStatus = client.dropPipePlugin(new TDropPipePluginReq(pluginName));
+      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
+        LOGGER.warn("[{}] Failed to drop pipe plugin {}.", executionStatus, pluginName);
+        future.setException(new IoTDBException(executionStatus.message, executionStatus.code));
+      } else {
+        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+      }
+    } catch (ClientManagerException | TException e) {
+      future.setException(e);
+    }
+    return future;
+  }
+
+  @Override
+  public SettableFuture<ConfigTaskResult> showPipePlugins() {
+    final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+    try (final ConfigNodeClient client =
+        CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+      TGetPipePluginTableResp getPipePluginTableResp = client.getPipePluginTable();
+      if (getPipePluginTableResp.getStatus().getCode()
+          != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        future.setException(
+            new IoTDBException(
+                getPipePluginTableResp.getStatus().message,
+                getPipePluginTableResp.getStatus().code));
+        return future;
+      }
+      // convert PipePluginTable and buildTsBlock
+      ShowFunctionsTask.buildTsBlock(getPipePluginTableResp.getAllPipePluginInformation(), future);
+    } catch (ClientManagerException | TException e) {
+      future.setException(e);
+    }
+    return future;
+  }
+
   @Override
   public SettableFuture<ConfigTaskResult> setTTL(SetTTLStatement setTTLStatement, String taskName) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
index 03ac9fa524..08260d55a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateContinuousQueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateFunctionStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreatePipePluginStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTriggerStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DatabaseSchemaStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
@@ -86,6 +87,12 @@ public interface IConfigTaskExecutor {
 
   SettableFuture<ConfigTaskResult> showTriggers();
 
+  SettableFuture<ConfigTaskResult> createPipePlugin(CreatePipePluginStatement createPipeStatement);
+
+  SettableFuture<ConfigTaskResult> dropPipePlugin(String pluginName);
+
+  SettableFuture<ConfigTaskResult> showPipePlugins();
+
   SettableFuture<ConfigTaskResult> setTTL(SetTTLStatement setTTLStatement, String taskName);
 
   SettableFuture<ConfigTaskResult> merge(boolean onCluster);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreatePipePluginTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreatePipePluginTask.java
new file mode 100644
index 0000000000..91c0d68dbb
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreatePipePluginTask.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.config.metadata;
+
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreatePipePluginStatement;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class CreatePipePluginTask implements IConfigTask {
+
+  private final CreatePipePluginStatement createPipePluginStatement;
+
+  public CreatePipePluginTask(CreatePipePluginStatement createPipePluginStatement) {
+    this.createPipePluginStatement = createPipePluginStatement;
+  }
+
+  @Override
+  public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
+      throws InterruptedException {
+    return configTaskExecutor.createPipePlugin(createPipePluginStatement);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/DropPipePluginTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/DropPipePluginTask.java
new file mode 100644
index 0000000000..a73bfff76b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/DropPipePluginTask.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.config.metadata;
+
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.DropPipePluginStatement;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class DropPipePluginTask implements IConfigTask {
+
+  private final String pluginName;
+
+  public DropPipePluginTask(DropPipePluginStatement dropPipePluginStatement) {
+    this.pluginName = dropPipePluginStatement.getPluginName();
+  }
+
+  @Override
+  public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
+      throws InterruptedException {
+    return configTaskExecutor.dropPipePlugin(pluginName);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowPipePluginsTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowPipePluginsTask.java
new file mode 100644
index 0000000000..bab468ee77
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowPipePluginsTask.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.config.metadata;
+
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory;
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ShowPipePluginsTask implements IConfigTask {
+
+  @Override
+  public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
+      throws InterruptedException {
+    return configTaskExecutor.showPipePlugins();
+  }
+
+  public static void buildTsBlock(
+      List<ByteBuffer> allPipePluginsInformation, SettableFuture<ConfigTaskResult> future) {
+    final List<PipePluginMeta> pipePluginMetaList = new ArrayList<>();
+    if (allPipePluginsInformation != null) {
+      for (final ByteBuffer pipePluginInformationByteBuffer : allPipePluginsInformation) {
+        pipePluginMetaList.add(PipePluginMeta.deserialize(pipePluginInformationByteBuffer));
+      }
+    }
+    pipePluginMetaList.sort(Comparator.comparing(PipePluginMeta::getPluginName));
+
+    final List<TSDataType> outputDataTypes =
+        ColumnHeaderConstant.showPipePluginsColumnHeaders.stream()
+            .map(ColumnHeader::getColumnType)
+            .collect(Collectors.toList());
+    final TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
+    for (final PipePluginMeta pipePluginMeta : pipePluginMetaList) {
+      builder.getTimeColumnBuilder().writeLong(0L);
+      builder.getColumnBuilder(0).writeBinary(Binary.valueOf(pipePluginMeta.getPluginName()));
+      builder.getColumnBuilder(1).writeBinary(Binary.valueOf(pipePluginMeta.getPluginType()));
+      builder.getColumnBuilder(2).writeBinary(Binary.valueOf(pipePluginMeta.getClassName()));
+      builder.declarePosition();
+    }
+
+    future.set(
+        new ConfigTaskResult(
+            TSStatusCode.SUCCESS_STATUS,
+            builder.build(),
+            DatasetHeaderFactory.getShowPipePluginsHeader()));
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index c5ac196776..b31ccbd50f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -106,6 +106,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateContinuousQueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateFunctionStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreatePipePluginStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTriggerStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DatabaseSchemaStatement;
@@ -113,6 +114,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStateme
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DropContinuousQueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DropFunctionStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.DropPipePluginStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DropTriggerStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.GetRegionIdStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.GetSeriesSlotListStatement;
@@ -127,6 +129,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowContinuousQueriesStat
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDataNodesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowFunctionsStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowPipePluginsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowRegionStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTTLStatement;
@@ -713,13 +716,13 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
   // Create Function
   @Override
   public Statement visitCreateFunction(CreateFunctionContext ctx) {
-    if (ctx.uriClasue() == null) {
+    if (ctx.uriClause() == null) {
       return new CreateFunctionStatement(
           parseIdentifier(ctx.udfName.getText()),
           parseStringLiteral(ctx.className.getText()),
           false);
     } else {
-      String uriString = parseAndValidateURI(ctx.uriClasue());
+      String uriString = parseAndValidateURI(ctx.uriClause());
       return new CreateFunctionStatement(
           parseIdentifier(ctx.udfName.getText()),
           parseStringLiteral(ctx.className.getText()),
@@ -728,7 +731,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
     }
   }
 
-  private String parseAndValidateURI(IoTDBSqlParser.UriClasueContext ctx) {
+  private String parseAndValidateURI(IoTDBSqlParser.UriClauseContext ctx) {
     String uriString = parseStringLiteral(ctx.uri().getText());
     try {
       URI uri = new URI(uriString);
@@ -768,7 +771,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
             parseAttributeValue(triggerAttributeContext.value));
       }
     }
-    if (ctx.uriClasue() == null) {
+    if (ctx.uriClause() == null) {
       return new CreateTriggerStatement(
           parseIdentifier(ctx.triggerName.getText()),
           parseStringLiteral(ctx.className.getText()),
@@ -781,7 +784,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
           parsePrefixPath(ctx.prefixPath()),
           attributes);
     } else {
-      String uriString = parseAndValidateURI(ctx.uriClasue());
+      String uriString = parseAndValidateURI(ctx.uriClause());
       return new CreateTriggerStatement(
           parseIdentifier(ctx.triggerName.getText()),
           parseStringLiteral(ctx.className.getText()),
@@ -808,6 +811,27 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
     return new ShowTriggersStatement();
   }
 
+  // Create PipePlugin =====================================================================
+  @Override
+  public Statement visitCreatePipePlugin(IoTDBSqlParser.CreatePipePluginContext ctx) {
+    return new CreatePipePluginStatement(
+        parseIdentifier(ctx.pluginName.getText()),
+        parseStringLiteral(ctx.className.getText()),
+        parseAndValidateURI(ctx.uriClause()));
+  }
+
+  // Drop PipePlugin =====================================================================
+  @Override
+  public Statement visitDropPipePlugin(IoTDBSqlParser.DropPipePluginContext ctx) {
+    return new DropPipePluginStatement(parseIdentifier(ctx.pluginName.getText()));
+  }
+
+  // Show PipePlugins =====================================================================
+  @Override
+  public Statement visitShowPipePlugins(IoTDBSqlParser.ShowPipePluginsContext ctx) {
+    return new ShowPipePluginsStatement();
+  }
+
   // Show Child Paths =====================================================================
   @Override
   public Statement visitShowChildPaths(IoTDBSqlParser.ShowChildPathsContext ctx) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementType.java
index c4d4fdfeec..df70af6c4e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementType.java
@@ -152,5 +152,9 @@ public enum StatementType {
   DEACTIVATE_TEMPLATE,
 
   INTERNAL_BATCH_ACTIVATE_TEMPLATE,
-  INTERNAL_CREATE_MULTI_TIMESERIES
+  INTERNAL_CREATE_MULTI_TIMESERIES,
+
+  CREATE_PIPEPLUGIN,
+  DROP_PIPEPLUGIN,
+  SHOW_PIPEPLUGINS,
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index 9cac1d0646..c9304ece7e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesSt
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateContinuousQueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateFunctionStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreatePipePluginStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTriggerStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DatabaseSchemaStatement;
@@ -49,6 +50,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStateme
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DropContinuousQueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DropFunctionStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.DropPipePluginStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DropTriggerStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.GetRegionIdStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.GetSeriesSlotListStatement;
@@ -63,6 +65,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowContinuousQueriesStat
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDataNodesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowFunctionsStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowPipePluginsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowRegionStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTTLStatement;
@@ -214,6 +217,19 @@ public abstract class StatementVisitor<R, C> {
     return visitStatement(showTriggersStatement, context);
   }
 
+  // Pipe Plugin
+  public R visitCreatePipePlugin(CreatePipePluginStatement createPipePluginStatement, C context) {
+    return visitStatement(createPipePluginStatement, context);
+  }
+
+  public R visitDropPipePlugin(DropPipePluginStatement dropPipePluginStatement, C context) {
+    return visitStatement(dropPipePluginStatement, context);
+  }
+
+  public R visitShowPipePlugins(ShowPipePluginsStatement showPipePluginsStatement, C context) {
+    return visitStatement(showPipePluginsStatement, context);
+  }
+
   /** Data Manipulation Language (DML) */
 
   // Select Statement
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreatePipePluginStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreatePipePluginStatement.java
new file mode 100644
index 0000000000..332ee48c41
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreatePipePluginStatement.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.metadata;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+
+import java.util.Collections;
+import java.util.List;
+
+public class CreatePipePluginStatement extends Statement implements IConfigStatement {
+
+  private final String pluginName;
+  private final String className;
+  private final String uriString;
+
+  public CreatePipePluginStatement(String pluginName, String className, String uriString) {
+    super();
+    statementType = StatementType.CREATE_PIPEPLUGIN;
+    this.pluginName = pluginName;
+    this.className = className;
+    this.uriString = uriString;
+  }
+
+  public String getPluginName() {
+    return pluginName;
+  }
+
+  public String getClassName() {
+    return className;
+  }
+
+  public String getUriString() {
+    return uriString;
+  }
+
+  @Override
+  public QueryType getQueryType() {
+    return QueryType.WRITE;
+  }
+
+  @Override
+  public List<? extends PartialPath> getPaths() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitCreatePipePlugin(this, context);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/DropPipePluginStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/DropPipePluginStatement.java
new file mode 100644
index 0000000000..a5d883962e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/DropPipePluginStatement.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.metadata;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+
+import java.util.Collections;
+import java.util.List;
+
+public class DropPipePluginStatement extends Statement implements IConfigStatement {
+
+  private final String pluginName;
+
+  public DropPipePluginStatement(String pluginName) {
+    super();
+    statementType = StatementType.DROP_PIPEPLUGIN;
+    this.pluginName = pluginName;
+  }
+
+  public String getPluginName() {
+    return pluginName;
+  }
+
+  @Override
+  public QueryType getQueryType() {
+    return QueryType.WRITE;
+  }
+
+  @Override
+  public List<? extends PartialPath> getPaths() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitDropPipePlugin(this, context);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowPipePluginsStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowPipePluginsStatement.java
new file mode 100644
index 0000000000..c5466b3e15
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowPipePluginsStatement.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.metadata;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+
+import java.util.Collections;
+import java.util.List;
+
+public class ShowPipePluginsStatement extends ShowStatement implements IConfigStatement {
+
+  public ShowPipePluginsStatement() {
+    super();
+    statementType = StatementType.SHOW_PIPEPLUGINS;
+  }
+
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitShowPipePlugins(this, context);
+  }
+
+  @Override
+  public QueryType getQueryType() {
+    return QueryType.READ;
+  }
+
+  @Override
+  public List<PartialPath> getPaths() {
+    return Collections.emptyList();
+  }
+}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 1702db68fe..b089d9248e 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -184,8 +184,14 @@ public enum TSStatusCode {
   CQ_UPDATE_LAST_EXEC_TIME_ERROR(1403),
 
   // ML Model
-  CREATE_MODEL_ERROR(1400),
-  DROP_MODEL_ERROR(1401);
+  CREATE_MODEL_ERROR(1500),
+  DROP_MODEL_ERROR(1501),
+
+  // Pipe Plugin
+  CREATE_PIPE_PLUGIN_ERROR(1600),
+  DROP_PIPE_PLUGIN_ERROR(1601),
+  PIPE_PLUGIN_LOAD_CLASS_ERROR(1602),
+  PIPE_PLUGIN_DOWNLOAD_ERROR(1603);
 
   private final int statusCode;
 
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index f27532fac5..72797d0119 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -435,6 +435,25 @@ struct TGetDataNodeLocationsResp {
   2: required list<common.TDataNodeLocation> dataNodeLocationList
 }
 
+// Pipe Plugin
+struct TCreatePipePluginReq {
+  1: required string pluginName
+  2: required string className
+  4: required string jarName
+  5: required binary jarFile
+  6: required string jarMD5
+}
+
+struct TDropPipePluginReq {
+  1: required string pluginName
+}
+
+// Get PipePlugin table from config node
+struct TGetPipePluginTableResp {
+  1: required common.TSStatus status
+  2: required list<binary> allPipePluginInformation
+}
+
 // Show cluster
 struct TShowClusterResp {
   1: required common.TSStatus status
@@ -1055,6 +1074,31 @@ service IConfigNodeRPCService {
      */
   TGetJarInListResp getTriggerJar(TGetJarInListReq req)
 
+  // ======================================================
+  // Pipe Plugin
+  // ======================================================
+
+  /**
+   * Create a pipe plugin on the specified DataNode
+   *
+   * @return SUCCESS_STATUS if the pipe plugin was created successfully
+   *         EXECUTE_STATEMENT_ERROR if operations on any node failed
+   */
+  common.TSStatus createPipePlugin(TCreatePipePluginReq req)
+
+  /**
+   * Remove a pipe plugin on the DataNodes
+   *
+   * @return SUCCESS_STATUS if the pipe plugin was removed successfully
+   *         EXECUTE_STATEMENT_ERROR if operations on any node failed
+   */
+  common.TSStatus dropPipePlugin(TDropPipePluginReq req)
+
+  /**
+   * Return the pipe plugin table
+   */
+  TGetPipePluginTableResp getPipePluginTable();
+
   // ======================================================
   // Maintenance Tools
   // ======================================================