You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/04/24 03:11:47 UTC

[iotdb] branch master updated: [IOTDB-5788] Built-in pipe plug-in management mechanism (#9680)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e3e83fe8aa [IOTDB-5788] Built-in pipe plug-in management mechanism (#9680)
e3e83fe8aa is described below

commit e3e83fe8aae3733bc27845ade7ffa20df08c9a04
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon Apr 24 11:11:40 2023 +0800

    [IOTDB-5788] Built-in pipe plug-in management mechanism (#9680)
    
    1. support built-in pipe plugins. do_nothing_processor & do_nothing_connector are added as built-in plugins.
    2. allow to create / drop the same function multiple times without errors at PipePluginAgent / PipePluginInfo to adapt the retry policy of the Procedure mechanism.
    3. allow to drop a function that is not registered on leader config node so that user can have a chance to change the inconsistant registration state over different config / data nodes.
    4. fix ci build error: https://ci-builds.apache.org/job/IoTDB/job/IoTDB-pip-new/job/master/173/console
    
    ------
    
    1. 支持内置 Pipe 插件,为内置插件适配了增、删、启动流程
    2. 幂等操作支持:允许在 PipePluginAgent / PipePluginInfo 层面对同一个插件进行多次增、删(不对已经注册的插件进行检查,允许 procedure 的多次执行)
    3. 创建:仅仅在 coordinator 层面检查是否合规。删除:coordinator 不对是否创建过进行检查,允许用户多次 drop pipe plugin 来消除不一致状态(即使该插件已经在 leader confignode 上被 drop)
    4. 修复 ci build error: https://ci-builds.apache.org/job/IoTDB/job/IoTDB-pip-new/job/master/173/console
---
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  3 +-
 .../manager/pipe/PipePluginCoordinator.java        |  2 +-
 .../persistence/pipe/PipePluginInfo.java           | 37 ++++++-----
 .../impl/pipe/plugin/DropPipePluginProcedure.java  |  6 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  1 -
 .../request/ConfigPhysicalPlanSerDeTest.java       |  2 +-
 .../iotdb/confignode/persistence/PipeInfoTest.java |  2 +-
 .../pipe/plugin/CreatePipePluginProcedureTest.java |  2 +-
 .../iotdb/it/env/cluster/AbstractNodeWrapper.java  |  1 +
 .../pipe/plugin/builtin/BuiltinPipePlugin.java     | 55 +++++++++++++++++
 .../builtin/connector/DoNothingConnector.java      | 72 ++++++++++++++++++++++
 .../builtin/processor/DoNothingProcessor.java      | 68 ++++++++++++++++++++
 .../meta/ConfigNodePipePluginMetaKeeper.java       | 25 +++-----
 .../plugin/meta/DataNodePipePluginMetaKeeper.java  | 28 ++++++---
 .../commons/pipe/plugin/meta/PipePluginMeta.java   | 65 +++++++++++++------
 .../pipe/plugin/meta/PipePluginMetaKeeper.java     | 62 +++++++++++++++----
 .../db/mpp/common/header/ColumnHeaderConstant.java |  2 +
 .../config/executor/ClusterConfigTaskExecutor.java |  3 +-
 .../config/metadata/ShowPipePluginsTask.java       | 18 +++++-
 .../org/apache/iotdb/db/pipe/agent/PipeAgent.java  |  5 +-
 .../db/pipe/agent/plugin/PipePluginAgent.java      | 41 +++++++++---
 .../java/org/apache/iotdb/db/service/DataNode.java |  6 ++
 22 files changed, 408 insertions(+), 98 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index c145d4fe21..5da25971ce 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -52,6 +52,8 @@ ddlStatement
     | createFunction | dropFunction | showFunctions
     // Trigger
     | createTrigger | dropTrigger | showTriggers | startTrigger | stopTrigger
+    // Pipe Plugin
+    | createPipePlugin | dropPipePlugin | showPipePlugins
     // CQ
     | createContinuousQuery | dropContinuousQuery | showContinuousQueries
     // Cluster
@@ -490,7 +492,6 @@ showPipePlugins
     : SHOW PIPEPLUGINS
     ;
 
-
 // ML Model =========================================================================================
 // ---- Create Model
 createModel
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
index 91da899463..8943a28e8c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
@@ -69,7 +69,7 @@ public class PipePluginCoordinator {
     final String jarName = req.getJarName();
     final String jarMD5 = req.getJarMD5();
     final PipePluginMeta pipePluginMeta =
-        new PipePluginMeta(pluginName, className, jarName, jarMD5);
+        new PipePluginMeta(pluginName, className, false, jarName, jarMD5);
 
     return configManager.getProcedureManager().createPipePlugin(pipePluginMeta, req.getJarFile());
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 0b47dccb6c..3818070dac 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -84,6 +84,7 @@ public class PipePluginInfo implements SnapshotProcessor {
   /////////////////////////////// Validator ///////////////////////////////
 
   public void validateBeforeCreatingPipePlugin(String pluginName, String jarName, String jarMD5) {
+    // both build-in and user defined pipe plugin should be unique
     if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
       throw new PipeManagementException(
           String.format(
@@ -100,12 +101,13 @@ public class PipePluginInfo implements SnapshotProcessor {
   }
 
   public void validateBeforeDroppingPipePlugin(String pluginName) {
-    if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
-      return;
+    if (pipePluginMetaKeeper.containsPipePlugin(pluginName)
+        && pipePluginMetaKeeper.getPipePluginMeta(pluginName).isBuiltin()) {
+      throw new PipeManagementException(
+          String.format(
+              "Failed to drop PipePlugin [%s], the PipePlugin is a built-in PipePlugin",
+              pluginName));
     }
-
-    throw new PipeManagementException(
-        String.format("Failed to drop PipePlugin [%s], the PipePlugin does not exist", pluginName));
   }
 
   public boolean isJarNeededToBeSavedWhenCreatingPipePlugin(String jarName) {
@@ -119,32 +121,37 @@ public class PipePluginInfo implements SnapshotProcessor {
 
   /////////////////////////////// Pipe Plugin Management ///////////////////////////////
 
-  public TSStatus createPipePlugin(CreatePipePluginPlan physicalPlan) {
+  public TSStatus createPipePlugin(CreatePipePluginPlan createPipePluginPlan) {
     try {
-      final PipePluginMeta pipePluginMeta = physicalPlan.getPipePluginMeta();
+      final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta();
+
+      // try to drop the old pipe plugin if exists to reduce the effect of the inconsistency
+      dropPipePlugin(new DropPipePluginPlan(pipePluginMeta.getPluginName()));
+
       pipePluginMetaKeeper.addPipePluginMeta(pipePluginMeta.getPluginName(), pipePluginMeta);
       pipePluginMetaKeeper.addJarNameAndMd5(
           pipePluginMeta.getJarName(), pipePluginMeta.getJarMD5());
 
-      if (physicalPlan.getJarFile() != null) {
+      if (createPipePluginPlan.getJarFile() != null) {
         pipePluginExecutableManager.saveToInstallDir(
-            ByteBuffer.wrap(physicalPlan.getJarFile().getValues()), pipePluginMeta.getJarName());
+            ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()),
+            pipePluginMeta.getJarName());
       }
 
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } catch (Exception e) {
       final String errorMessage =
           String.format(
-              "Failed to add PipePlugin [%s] in PipePlugin_Table on Config Nodes, because of %s",
-              physicalPlan.getPipePluginMeta().getPluginName(), e);
+              "Failed to execute createPipePlugin(%s) on config nodes, because of %s",
+              createPipePluginPlan.getPipePluginMeta().getPluginName(), e);
       LOGGER.warn(errorMessage, e);
       return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
           .setMessage(errorMessage);
     }
   }
 
-  public TSStatus dropPipePlugin(DropPipePluginPlan physicalPlan) {
-    final String pluginName = physicalPlan.getPluginName();
+  public TSStatus dropPipePlugin(DropPipePluginPlan dropPipePluginPlan) {
+    final String pluginName = dropPipePluginPlan.getPluginName();
 
     if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
       pipePluginMetaKeeper.removeJarNameAndMd5IfPossible(
@@ -161,10 +168,10 @@ public class PipePluginInfo implements SnapshotProcessor {
         Arrays.asList(pipePluginMetaKeeper.getAllPipePluginMeta()));
   }
 
-  public JarResp getPipePluginJar(GetPipePluginJarPlan physicalPlan) {
+  public JarResp getPipePluginJar(GetPipePluginJarPlan getPipePluginJarPlan) {
     try {
       List<ByteBuffer> jarList = new ArrayList<>();
-      for (String jarName : physicalPlan.getJarNames()) {
+      for (String jarName : getPipePluginJarPlan.getJarNames()) {
         jarList.add(
             ExecutableManager.transferToBytebuffer(
                 PipePluginExecutableManager.getInstance()
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index 6aa4d6109d..7c8268ecd0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -110,7 +110,7 @@ public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePlugi
     try {
       pipePluginCoordinator.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName);
     } catch (PipeManagementException e) {
-      // if the pipe plugin is not exist, we should end the procedure
+      // if the pipe plugin is a built-in plugin, we should not drop it
       LOGGER.warn(e.getMessage());
       setFailure(new ProcedureException(e.getMessage()));
       pipePluginCoordinator.unlock();
@@ -180,14 +180,14 @@ public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePlugi
     LOGGER.info("DropPipePluginProcedure: rollbackFromDropOnDataNodes({})", pluginName);
 
     // do nothing but wait for rolling back to the previous state: LOCK
-    // TODO: we should drop the pipe plugin on data nodes
+    // TODO: we should drop the pipe plugin on data nodes properly with RuntimeAgent's help
   }
 
   private void rollbackFromDropOnConfigNodes(ConfigNodeProcedureEnv env) {
     LOGGER.info("DropPipePluginProcedure: rollbackFromDropOnConfigNodes({})", pluginName);
 
     // do nothing but wait for rolling back to the previous state: DROP_ON_DATA_NODES
-    // TODO: we should drop the pipe plugin on config nodes
+    // TODO: we should drop the pipe plugin on config nodes properly with RuntimeCoordinator's help
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 5413818646..04ddd4a019 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -903,7 +903,6 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
   }
 
   @Override
-  @Deprecated
   public TGetAllPipeInfoResp getAllPipeInfo() {
     return configManager.getAllPipeInfo();
   }
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 675858f1e0..55a2e622aa 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -1098,7 +1098,7 @@ public class ConfigPhysicalPlanSerDeTest {
   public void CreatePipePluginPlanTest() throws IOException {
     CreatePipePluginPlan createPipePluginPlan =
         new CreatePipePluginPlan(
-            new PipePluginMeta("testPlugin", "org.apache.iotdb.testJar", "testJar", "???"),
+            new PipePluginMeta("testPlugin", "org.apache.iotdb.TestJar", false, "test.jar", "???"),
             new Binary("123"));
     CreatePipePluginPlan createPipePluginPlan1 =
         (CreatePipePluginPlan)
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
index 9babc2d872..0246759c98 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
@@ -84,7 +84,7 @@ public class PipeInfoTest {
 
     CreatePipePluginPlan createPipePluginPlan =
         new CreatePipePluginPlan(
-            new PipePluginMeta("testPlugin", "org.apache.iotdb.testJar", "testJar", "???"),
+            new PipePluginMeta("testPlugin", "org.apache.iotdb.TestJar", false, "test.jar", "???"),
             new Binary("123"));
     pipeInfo.getPipePluginInfo().createPipePlugin(createPipePluginPlan);
 
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java
index 729e3a01f7..103ca7963f 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java
@@ -39,7 +39,7 @@ public class CreatePipePluginProcedureTest {
     DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
 
     PipePluginMeta pipePluginMeta =
-        new PipePluginMeta("test", "test.class", "test.jar", "testMD5test");
+        new PipePluginMeta("test", "test.class", false, "test.jar", "testMD5test");
     CreatePipePluginProcedure proc =
         new CreatePipePluginProcedure(pipePluginMeta, new byte[] {1, 2, 3});
 
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractNodeWrapper.java
index a7bd8513cb..62a859bd45 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractNodeWrapper.java
@@ -127,6 +127,7 @@ public abstract class AbstractNodeWrapper implements BaseNodeWrapper {
     // these properties can't be mutated.
     immutableCommonProperties.setProperty("udf_lib_dir", MppBaseConfig.NULL_VALUE);
     immutableCommonProperties.setProperty("trigger_lib_dir", MppBaseConfig.NULL_VALUE);
+    immutableCommonProperties.setProperty("pipe_lib_dir", MppBaseConfig.NULL_VALUE);
     immutableCommonProperties.setProperty("mqtt_host", MppBaseConfig.NULL_VALUE);
     immutableCommonProperties.setProperty("mqtt_port", MppBaseConfig.NULL_VALUE);
     immutableCommonProperties.setProperty("rest_service_port", MppBaseConfig.NULL_VALUE);
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
new file mode 100644
index 0000000000..d6972646d4
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.builtin;
+
+import org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector;
+import org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor;
+
+public enum BuiltinPipePlugin {
+
+  // processors
+  DO_NOTHING_PROCESSOR("do_nothing_processor", DoNothingProcessor.class),
+
+  // connectors
+  DO_NOTHING_CONNECTOR("do_nothing_connector", DoNothingConnector.class),
+  ;
+
+  private final String pipePluginName;
+  private final Class<?> pipePluginClass;
+  private final String className;
+
+  BuiltinPipePlugin(String functionName, Class<?> pipePluginClass) {
+    this.pipePluginName = functionName;
+    this.pipePluginClass = pipePluginClass;
+    this.className = pipePluginClass.getName();
+  }
+
+  public String getPipePluginName() {
+    return pipePluginName;
+  }
+
+  public Class<?> getPipePluginClass() {
+    return pipePluginClass;
+  }
+
+  public String getClassName() {
+    return className;
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
new file mode 100644
index 0000000000..503005fefa
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
+
+import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+
+public class DoNothingConnector implements PipeConnector {
+
+  @Override
+  public void validate(PipeParameterValidator validator) {
+    // do nothing
+  }
+
+  @Override
+  public void customize(
+      PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration) {
+    // do nothing
+  }
+
+  @Override
+  public void handshake() {
+    // do nothing
+  }
+
+  @Override
+  public void heartbeat() {
+    // do nothing
+  }
+
+  @Override
+  public void transfer(TabletInsertionEvent tabletInsertionEvent) {
+    // do nothing
+  }
+
+  @Override
+  public void transfer(TsFileInsertionEvent tsFileInsertionEvent) {
+    // do nothing
+  }
+
+  @Override
+  public void transfer(DeletionEvent deletionEvent) {
+    // do nothing
+  }
+
+  @Override
+  public void close() {
+    // do nothing
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
new file mode 100644
index 0000000000..45634119db
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.builtin.processor;
+
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+
+import java.io.IOException;
+
+public class DoNothingProcessor implements PipeProcessor {
+
+  @Override
+  public void validate(PipeParameterValidator validator) {
+    // do nothing
+  }
+
+  @Override
+  public void customize(
+      PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) {
+    // do nothing
+  }
+
+  @Override
+  public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector)
+      throws IOException {
+    eventCollector.collectTabletInsertionEvent(tabletInsertionEvent);
+  }
+
+  @Override
+  public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector)
+      throws IOException {
+    eventCollector.collectTsFileInsertionEvent(tsFileInsertionEvent);
+  }
+
+  @Override
+  public void process(DeletionEvent deletionEvent, EventCollector eventCollector)
+      throws IOException {
+    eventCollector.collectDeletionEvent(deletionEvent);
+  }
+
+  @Override
+  public void close() {
+    // do nothing
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java
index c02fdeadb6..7c3632e4b4 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java
@@ -33,7 +33,8 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
   protected final Map<String, Integer> jarNameToReferenceCountMap;
 
   public ConfigNodePipePluginMetaKeeper() {
-    super();
+    loadBuiltInPlugins();
+
     jarNameToMd5Map = new HashMap<>();
     jarNameToReferenceCountMap = new HashMap<>();
   }
@@ -67,6 +68,7 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
     }
   }
 
+  @Override
   public void processTakeSnapshot(OutputStream outputStream) throws IOException {
     ReadWriteIOUtils.write(jarNameToMd5Map.size(), outputStream);
     for (Map.Entry<String, String> entry : jarNameToMd5Map.entrySet()) {
@@ -75,14 +77,13 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
       ReadWriteIOUtils.write(jarNameToReferenceCountMap.get(entry.getKey()), outputStream);
     }
 
-    ReadWriteIOUtils.write(pipeNameToPipeMetaMap.size(), outputStream);
-    for (PipePluginMeta pipePluginMeta : pipeNameToPipeMetaMap.values()) {
-      ReadWriteIOUtils.write(pipePluginMeta.serialize(), outputStream);
-    }
+    super.processTakeSnapshot(outputStream);
   }
 
+  @Override
   public void processLoadSnapshot(InputStream inputStream) throws IOException {
-    clear();
+    jarNameToMd5Map.clear();
+    jarNameToReferenceCountMap.clear();
 
     final int jarSize = ReadWriteIOUtils.readInt(inputStream);
     for (int i = 0; i < jarSize; i++) {
@@ -93,16 +94,6 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
       jarNameToReferenceCountMap.put(jarName, count);
     }
 
-    final int pipePluginMetaSize = ReadWriteIOUtils.readInt(inputStream);
-    for (int i = 0; i < pipePluginMetaSize; i++) {
-      final PipePluginMeta pipePluginMeta = PipePluginMeta.deserialize(inputStream);
-      addPipePluginMeta(pipePluginMeta.getPluginName().toUpperCase(), pipePluginMeta);
-    }
-  }
-
-  public void clear() {
-    pipeNameToPipeMetaMap.clear();
-    jarNameToMd5Map.clear();
-    jarNameToReferenceCountMap.clear();
+    super.processLoadSnapshot(inputStream);
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/DataNodePipePluginMetaKeeper.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/DataNodePipePluginMetaKeeper.java
index 64d7c8ef94..42656ad870 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/DataNodePipePluginMetaKeeper.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/DataNodePipePluginMetaKeeper.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.commons.pipe.plugin.meta;
 
+import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoader;
 
 import java.util.Map;
@@ -26,28 +27,39 @@ import java.util.concurrent.ConcurrentHashMap;
 
 public class DataNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
 
-  private final Map<String, Class<?>> pipeNameToPipeClassMap;
+  private final Map<String, Class<?>> pipePluginNameToClassMap;
 
   public DataNodePipePluginMetaKeeper() {
-    super();
-    pipeNameToPipeClassMap = new ConcurrentHashMap<>();
+    pipePluginNameToClassMap = new ConcurrentHashMap<>();
+
+    loadBuiltInPlugins();
+  }
+
+  @Override
+  protected void loadBuiltInPlugins() {
+    super.loadBuiltInPlugins();
+
+    for (final BuiltinPipePlugin builtinPipePlugin : BuiltinPipePlugin.values()) {
+      addPluginAndClass(
+          builtinPipePlugin.getPipePluginName(), builtinPipePlugin.getPipePluginClass());
+    }
   }
 
   public void addPluginAndClass(String pluginName, Class<?> clazz) {
-    pipeNameToPipeClassMap.put(pluginName.toUpperCase(), clazz);
+    pipePluginNameToClassMap.put(pluginName.toUpperCase(), clazz);
   }
 
   public Class<?> getPluginClass(String pluginName) {
-    return pipeNameToPipeClassMap.get(pluginName.toUpperCase());
+    return pipePluginNameToClassMap.get(pluginName.toUpperCase());
   }
 
   public void removePluginClass(String pluginName) {
-    pipeNameToPipeClassMap.remove(pluginName.toUpperCase());
+    pipePluginNameToClassMap.remove(pluginName.toUpperCase());
   }
 
   public void updatePluginClass(PipePluginMeta pipePluginMeta, PipePluginClassLoader classLoader)
       throws ClassNotFoundException {
-    Class<?> functionClass = Class.forName(pipePluginMeta.getClassName(), true, classLoader);
-    pipeNameToPipeClassMap.put(pipePluginMeta.getPluginName().toUpperCase(), functionClass);
+    final Class<?> functionClass = Class.forName(pipePluginMeta.getClassName(), true, classLoader);
+    pipePluginNameToClassMap.put(pipePluginMeta.getPluginName().toUpperCase(), functionClass);
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMeta.java
index 08bcc614f0..517d5f6384 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMeta.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMeta.java
@@ -26,24 +26,45 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.util.Objects;
 
 public class PipePluginMeta {
 
-  private String pluginName;
-
-  private String className;
-
-  private String jarName;
+  private final String pluginName;
+  private final String className;
+
+  // jarName and jarMD5 are used to identify the jar file.
+  // they could be null if the plugin is built-in. they should be both null or both not null.
+  private final boolean isBuiltin;
+  private final String jarName;
+  private final String jarMD5;
+
+  public PipePluginMeta(
+      String pluginName, String className, boolean isBuiltin, String jarName, String jarMD5) {
+    this.pluginName = Objects.requireNonNull(pluginName).toUpperCase();
+    this.className = Objects.requireNonNull(className);
+
+    this.isBuiltin = isBuiltin;
+    if (isBuiltin) {
+      this.jarName = jarName;
+      this.jarMD5 = jarMD5;
+    } else {
+      this.jarName = Objects.requireNonNull(jarName);
+      this.jarMD5 = Objects.requireNonNull(jarMD5);
+    }
+  }
 
-  private String jarMD5;
+  public PipePluginMeta(String pluginName, String className) {
+    this.pluginName = Objects.requireNonNull(pluginName).toUpperCase();
+    this.className = Objects.requireNonNull(className);
 
-  private PipePluginMeta() {}
+    this.isBuiltin = true;
+    this.jarName = null;
+    this.jarMD5 = null;
+  }
 
-  public PipePluginMeta(String pluginName, String className, String jarName, String jarMD5) {
-    this.pluginName = pluginName.toUpperCase();
-    this.className = className;
-    this.jarName = jarName;
-    this.jarMD5 = jarMD5;
+  public boolean isBuiltin() {
+    return isBuiltin;
   }
 
   public String getPluginName() {
@@ -72,17 +93,18 @@ public class PipePluginMeta {
   public void serialize(DataOutputStream outputStream) throws IOException {
     ReadWriteIOUtils.write(pluginName, outputStream);
     ReadWriteIOUtils.write(className, outputStream);
+    ReadWriteIOUtils.write(isBuiltin, outputStream);
     ReadWriteIOUtils.write(jarName, outputStream);
     ReadWriteIOUtils.write(jarMD5, outputStream);
   }
 
   public static PipePluginMeta deserialize(ByteBuffer byteBuffer) {
-    PipePluginMeta pipePluginMeta = new PipePluginMeta();
-    pipePluginMeta.pluginName = ReadWriteIOUtils.readString(byteBuffer);
-    pipePluginMeta.className = ReadWriteIOUtils.readString(byteBuffer);
-    pipePluginMeta.jarName = ReadWriteIOUtils.readString(byteBuffer);
-    pipePluginMeta.jarMD5 = ReadWriteIOUtils.readString(byteBuffer);
-    return pipePluginMeta;
+    final String pluginName = ReadWriteIOUtils.readString(byteBuffer);
+    final String className = ReadWriteIOUtils.readString(byteBuffer);
+    final boolean isBuiltin = ReadWriteIOUtils.readBool(byteBuffer);
+    final String jarName = ReadWriteIOUtils.readString(byteBuffer);
+    final String jarMD5 = ReadWriteIOUtils.readString(byteBuffer);
+    return new PipePluginMeta(pluginName, className, isBuiltin, jarName, jarMD5);
   }
 
   public static PipePluginMeta deserialize(InputStream inputStream) throws IOException {
@@ -101,8 +123,9 @@ public class PipePluginMeta {
     PipePluginMeta that = (PipePluginMeta) obj;
     return pluginName.equals(that.pluginName)
         && className.equals(that.className)
-        && jarName.equals(that.jarName)
-        && jarMD5.equals(that.jarMD5);
+        && isBuiltin == that.isBuiltin
+        && Objects.equals(jarName, that.jarName)
+        && Objects.equals(jarMD5, that.jarMD5);
   }
 
   @Override
@@ -119,6 +142,8 @@ public class PipePluginMeta {
         + ", className='"
         + className
         + '\''
+        + ", isBuiltin="
+        + isBuiltin
         + ", jarName='"
         + jarName
         + '\''
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
index 4417141106..9a47fcfb89 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
@@ -19,40 +19,78 @@
 
 package org.apache.iotdb.commons.pipe.plugin.meta;
 
+import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 
 public abstract class PipePluginMetaKeeper {
 
-  protected final Map<String, PipePluginMeta> pipeNameToPipeMetaMap;
+  protected final Map<String, PipePluginMeta> pipePluginNameToMetaMap = new ConcurrentHashMap<>();
 
-  public PipePluginMetaKeeper() {
-    pipeNameToPipeMetaMap = new ConcurrentHashMap<>();
+  protected void loadBuiltInPlugins() {
+    for (final BuiltinPipePlugin builtinPipePlugin : BuiltinPipePlugin.values()) {
+      addPipePluginMeta(
+          builtinPipePlugin.getPipePluginName(),
+          new PipePluginMeta(
+              builtinPipePlugin.getPipePluginName(), builtinPipePlugin.getClassName()));
+    }
   }
 
   public void addPipePluginMeta(String pluginName, PipePluginMeta pipePluginMeta) {
-    pipeNameToPipeMetaMap.put(pluginName.toUpperCase(), pipePluginMeta);
+    pipePluginNameToMetaMap.put(pluginName.toUpperCase(), pipePluginMeta);
   }
 
   public void removePipePluginMeta(String pluginName) {
-    pipeNameToPipeMetaMap.remove(pluginName.toUpperCase());
+    pipePluginNameToMetaMap.remove(pluginName.toUpperCase());
   }
 
   public PipePluginMeta getPipePluginMeta(String pluginName) {
-    return pipeNameToPipeMetaMap.get(pluginName.toUpperCase());
+    return pipePluginNameToMetaMap.get(pluginName.toUpperCase());
   }
 
   public PipePluginMeta[] getAllPipePluginMeta() {
-    return pipeNameToPipeMetaMap.values().toArray(new PipePluginMeta[0]);
+    return pipePluginNameToMetaMap.values().toArray(new PipePluginMeta[0]);
   }
 
   public boolean containsPipePlugin(String pluginName) {
-    return pipeNameToPipeMetaMap.containsKey(pluginName.toUpperCase());
+    return pipePluginNameToMetaMap.containsKey(pluginName.toUpperCase());
+  }
+
+  protected void processTakeSnapshot(OutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write(
+        (int)
+            pipePluginNameToMetaMap.values().stream()
+                .filter(pipePluginMeta -> !pipePluginMeta.isBuiltin())
+                .count(),
+        outputStream);
+
+    for (PipePluginMeta pipePluginMeta : pipePluginNameToMetaMap.values()) {
+      if (pipePluginMeta.isBuiltin()) {
+        continue;
+      }
+      ReadWriteIOUtils.write(pipePluginMeta.serialize(), outputStream);
+    }
   }
 
-  public void clear() {
-    pipeNameToPipeMetaMap.clear();
+  protected void processLoadSnapshot(InputStream inputStream) throws IOException {
+    pipePluginNameToMetaMap.forEach(
+        (pluginName, pluginMeta) -> {
+          if (!pluginMeta.isBuiltin()) {
+            pipePluginNameToMetaMap.remove(pluginName);
+          }
+        });
+
+    final int pipePluginMetaSize = ReadWriteIOUtils.readInt(inputStream);
+    for (int i = 0; i < pipePluginMetaSize; i++) {
+      final PipePluginMeta pipePluginMeta = PipePluginMeta.deserialize(inputStream);
+      addPipePluginMeta(pipePluginMeta.getPluginName().toUpperCase(), pipePluginMeta);
+    }
   }
 
   @Override
@@ -64,11 +102,11 @@ public abstract class PipePluginMetaKeeper {
       return false;
     }
     PipePluginMetaKeeper that = (PipePluginMetaKeeper) o;
-    return pipeNameToPipeMetaMap.equals(that.pipeNameToPipeMetaMap);
+    return pipePluginNameToMetaMap.equals(that.pipePluginNameToMetaMap);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(pipeNameToPipeMetaMap);
+    return Objects.hash(pipePluginNameToMetaMap);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
index bab90568f3..7f2e41bf82 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
@@ -98,6 +98,7 @@ public class ColumnHeaderConstant {
 
   // column names for show pipe plugins statement
   public static final String PLUGIN_NAME = "PluginName";
+  public static final String PLUGIN_TYPE = "PluginType";
   public static final String PLUGIN_JAR = "PluginJar";
 
   // show cluster status
@@ -354,6 +355,7 @@ public class ColumnHeaderConstant {
   public static final List<ColumnHeader> showPipePluginsColumnHeaders =
       ImmutableList.of(
           new ColumnHeader(PLUGIN_NAME, TSDataType.TEXT),
+          new ColumnHeader(PLUGIN_TYPE, TSDataType.TEXT),
           new ColumnHeader(CLASS_NAME, TSDataType.TEXT),
           new ColumnHeader(PLUGIN_JAR, TSDataType.TEXT));
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 577b5b0f86..cecd844275 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -175,6 +175,7 @@ import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement;
 import org.apache.iotdb.db.trigger.service.TriggerClassLoader;
+import org.apache.iotdb.pipe.api.PipePlugin;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -762,7 +763,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
       try (PipePluginClassLoader classLoader = new PipePluginClassLoader(libRoot)) {
         // ensure that jar file contains the class and the class is a pipe plugin
         Class<?> clazz = Class.forName(createPipePluginStatement.getClassName(), true, classLoader);
-        clazz.getDeclaredConstructor().newInstance();
+        PipePlugin ignored = (PipePlugin) clazz.getDeclaredConstructor().newInstance();
       } catch (ClassNotFoundException
           | NoSuchMethodException
           | InstantiationException
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowPipePluginsTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowPipePluginsTask.java
index a2900d6d97..d2bdcc9fee 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowPipePluginsTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowPipePluginsTask.java
@@ -42,6 +42,11 @@ import java.util.stream.Collectors;
 
 public class ShowPipePluginsTask implements IConfigTask {
 
+  private static final Binary PIPE_PLUGIN_TYPE_BUILTIN = Binary.valueOf("Builtin");
+  private static final Binary PIPE_PLUGIN_TYPE_EXTERNAL = Binary.valueOf("External");
+
+  private static final Binary PIPE_JAR_NAME_EMPTY_FIELD = Binary.valueOf("");
+
   @Override
   public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
       throws InterruptedException {
@@ -66,8 +71,17 @@ public class ShowPipePluginsTask implements IConfigTask {
     for (final PipePluginMeta pipePluginMeta : pipePluginMetaList) {
       builder.getTimeColumnBuilder().writeLong(0L);
       builder.getColumnBuilder(0).writeBinary(Binary.valueOf(pipePluginMeta.getPluginName()));
-      builder.getColumnBuilder(1).writeBinary(Binary.valueOf(pipePluginMeta.getClassName()));
-      builder.getColumnBuilder(2).writeBinary(Binary.valueOf(pipePluginMeta.getJarName()));
+      builder
+          .getColumnBuilder(1)
+          .writeBinary(
+              pipePluginMeta.isBuiltin() ? PIPE_PLUGIN_TYPE_BUILTIN : PIPE_PLUGIN_TYPE_EXTERNAL);
+      builder.getColumnBuilder(2).writeBinary(Binary.valueOf(pipePluginMeta.getClassName()));
+      builder
+          .getColumnBuilder(3)
+          .writeBinary(
+              pipePluginMeta.getJarName() == null
+                  ? PIPE_JAR_NAME_EMPTY_FIELD
+                  : Binary.valueOf(pipePluginMeta.getJarName()));
       builder.declarePosition();
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
index 1c50fe5b40..a38d86f974 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.pipe.agent;
 
-import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
 import org.apache.iotdb.db.pipe.agent.plugin.PipePluginAgent;
 import org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent;
 import org.apache.iotdb.db.pipe.agent.task.PipeTaskAgent;
@@ -33,9 +32,7 @@ public class PipeAgent {
 
   /** Private constructor to prevent users from creating a new instance. */
   private PipeAgent() {
-    final DataNodePipePluginMetaKeeper pipePluginMetaKeeper = new DataNodePipePluginMetaKeeper();
-
-    pipePluginAgent = PipePluginAgent.setupAndGetInstance(pipePluginMetaKeeper);
+    pipePluginAgent = PipePluginAgent.setupAndGetInstance();
     pipeTaskAgent = PipeTaskAgent.setupAndGetInstance();
     pipeRuntimeAgent = PipeRuntimeAgent.setupAndGetInstance();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index d4c8359bcc..1f5f70bd3d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -58,6 +58,10 @@ public class PipePluginAgent {
   public void register(PipePluginMeta pipePluginMeta, ByteBuffer jarFile) throws Exception {
     acquireLock();
     try {
+      // try to deregister first to avoid inconsistent state
+      deregister(pipePluginMeta.getPluginName(), false);
+
+      // register process from here
       checkIfRegistered(pipePluginMeta);
       saveJarFileIfNeeded(pipePluginMeta.getJarName(), jarFile);
       doRegister(pipePluginMeta);
@@ -73,6 +77,15 @@ public class PipePluginAgent {
       return;
     }
 
+    if (information.isBuiltin()) {
+      String errorMessage =
+          String.format(
+              "Failed to register PipePlugin %s, because the given PipePlugin name is the same as a built-in PipePlugin name.",
+              pluginName);
+      LOGGER.warn(errorMessage);
+      throw new PipeManagementException(errorMessage);
+    }
+
     if (PipePluginExecutableManager.getInstance()
             .hasFileUnderInstallDir(pipePluginMeta.getJarName())
         && !PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) {
@@ -84,6 +97,9 @@ public class PipePluginAgent {
       LOGGER.warn(errMsg);
       throw new PipeManagementException(errMsg);
     }
+
+    // if the pipe plugin is already registered and the jar file is the same, do nothing
+    // we allow users to register the same pipe plugin multiple times without any error
   }
 
   private void saveJarFileIfNeeded(String jarName, ByteBuffer byteBuffer) throws IOException {
@@ -111,7 +127,7 @@ public class PipePluginAgent {
 
       final Class<?> pluginClass = Class.forName(className, true, currentActiveClassLoader);
       // ensure that it is a PipePlugin class
-      PipePlugin ignored = (PipePlugin) pluginClass.getDeclaredConstructor().newInstance();
+      final PipePlugin ignored = (PipePlugin) pluginClass.getDeclaredConstructor().newInstance();
 
       pipePluginMetaKeeper.addPipePluginMeta(pluginName, pipePluginMeta);
       pipePluginMetaKeeper.addPluginAndClass(pluginName, pluginClass);
@@ -141,15 +157,21 @@ public class PipePluginAgent {
   public void deregister(String pluginName, boolean needToDeleteJar) throws Exception {
     acquireLock();
     try {
-      PipePluginMeta information = pipePluginMetaKeeper.getPipePluginMeta(pluginName);
-      if (information == null) {
-        return;
+      final PipePluginMeta information = pipePluginMetaKeeper.getPipePluginMeta(pluginName);
+
+      if (information != null && information.isBuiltin()) {
+        String errorMessage =
+            String.format("Failed to deregister builtin PipePlugin %s.", pluginName);
+        LOGGER.warn(errorMessage);
+        throw new PipeManagementException(errorMessage);
       }
 
+      // remove anyway
       pipePluginMetaKeeper.removePipePluginMeta(pluginName);
       pipePluginMetaKeeper.removePluginClass(pluginName);
 
-      if (needToDeleteJar) {
+      // if it is needed to delete jar file of the pipe plugin, delete both jar file and md5
+      if (information != null && needToDeleteJar) {
         PipePluginExecutableManager.getInstance().removeFileUnderLibRoot(information.getJarName());
         PipePluginExecutableManager.getInstance()
             .removeFileUnderTemporaryRoot(pluginName.toUpperCase() + ".txt");
@@ -188,18 +210,17 @@ public class PipePluginAgent {
 
   /////////////////////////  Singleton Instance Holder  /////////////////////////
 
-  private PipePluginAgent(DataNodePipePluginMetaKeeper pipePluginMetaKeeper) {
-    this.pipePluginMetaKeeper = pipePluginMetaKeeper;
+  private PipePluginAgent() {
+    this.pipePluginMetaKeeper = new DataNodePipePluginMetaKeeper();
   }
 
   private static class PipePluginAgentServiceHolder {
     private static PipePluginAgent instance = null;
   }
 
-  public static PipePluginAgent setupAndGetInstance(
-      DataNodePipePluginMetaKeeper pipePluginMetaKeeper) {
+  public static PipePluginAgent setupAndGetInstance() {
     if (PipePluginAgentServiceHolder.instance == null) {
-      PipePluginAgentServiceHolder.instance = new PipePluginAgent(pipePluginMetaKeeper);
+      PipePluginAgentServiceHolder.instance = new PipePluginAgent();
     }
     return PipePluginAgentServiceHolder.instance;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 365eae6310..3e0f39ac4f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -861,6 +861,9 @@ public class DataNode implements DataNodeMBean {
     // create instances of pipe plugins and do registration
     try {
       for (PipePluginMeta meta : resourcesInformationHolder.getPipePluginMetaList()) {
+        if (meta.isBuiltin()) {
+          continue;
+        }
         PipeAgent.plugin().doRegister(meta);
       }
     } catch (Exception e) {
@@ -881,6 +884,9 @@ public class DataNode implements DataNodeMBean {
   private List<PipePluginMeta> getJarListForPipePlugin() {
     List<PipePluginMeta> res = new ArrayList<>();
     for (PipePluginMeta pipePluginMeta : resourcesInformationHolder.getPipePluginMetaList()) {
+      if (pipePluginMeta.isBuiltin()) {
+        continue;
+      }
       // If jar does not exist, add current pipePluginMeta to list
       if (!PipePluginExecutableManager.getInstance()
           .hasFileUnderInstallDir(pipePluginMeta.getJarName())) {