You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/04/23 14:24:12 UTC

[iotdb] 02/04: 1. support built-in pipe plugins. 2. allow to create / drop the same function multiple times without errors on PipePluginAgent / PipePluginInfo to enable retry policy. 3. allow to drop a function that is not registered on leader config node so that user can have a chance to change the inconsistant registration state over different config/data nodes.

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5788
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 270802123d38f6bab29027964526522b46dafe14
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun Apr 23 20:44:55 2023 +0800

    1. support built-in pipe plugins.
    2. allow to create / drop the same function multiple times without errors on PipePluginAgent / PipePluginInfo to enable retry policy.
    3. allow to drop a function that is not registered on leader config node so that user can have a chance to change the inconsistant registration state over different config/data nodes.
---
 .../persistence/pipe/PipePluginInfo.java           | 37 +++++++------
 .../impl/pipe/plugin/DropPipePluginProcedure.java  |  6 +--
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  1 -
 .../pipe/plugin/builtin/BuiltinPipePlugin.java     | 36 ++++++++++++-
 .../builtin/connector/DoNothingConnector.java      | 53 +++++++++++++++++-
 .../builtin/processor/DoNothingProcessor.java      | 49 ++++++++++++++++-
 .../meta/ConfigNodePipePluginMetaKeeper.java       | 25 +++------
 .../plugin/meta/DataNodePipePluginMetaKeeper.java  | 28 +++++++---
 .../commons/pipe/plugin/meta/PipePluginMeta.java   | 57 +++++++++++++-------
 .../pipe/plugin/meta/PipePluginMetaKeeper.java     | 62 +++++++++++++++++-----
 .../config/executor/ClusterConfigTaskExecutor.java |  3 +-
 .../org/apache/iotdb/db/pipe/agent/PipeAgent.java  |  5 +-
 .../db/pipe/agent/plugin/PipePluginAgent.java      | 41 ++++++++++----
 13 files changed, 311 insertions(+), 92 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 0b47dccb6c..3818070dac 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -84,6 +84,7 @@ public class PipePluginInfo implements SnapshotProcessor {
   /////////////////////////////// Validator ///////////////////////////////
 
   public void validateBeforeCreatingPipePlugin(String pluginName, String jarName, String jarMD5) {
+    // both build-in and user defined pipe plugin should be unique
     if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
       throw new PipeManagementException(
           String.format(
@@ -100,12 +101,13 @@ public class PipePluginInfo implements SnapshotProcessor {
   }
 
   public void validateBeforeDroppingPipePlugin(String pluginName) {
-    if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
-      return;
+    if (pipePluginMetaKeeper.containsPipePlugin(pluginName)
+        && pipePluginMetaKeeper.getPipePluginMeta(pluginName).isBuiltin()) {
+      throw new PipeManagementException(
+          String.format(
+              "Failed to drop PipePlugin [%s], the PipePlugin is a built-in PipePlugin",
+              pluginName));
     }
-
-    throw new PipeManagementException(
-        String.format("Failed to drop PipePlugin [%s], the PipePlugin does not exist", pluginName));
   }
 
   public boolean isJarNeededToBeSavedWhenCreatingPipePlugin(String jarName) {
@@ -119,32 +121,37 @@ public class PipePluginInfo implements SnapshotProcessor {
 
   /////////////////////////////// Pipe Plugin Management ///////////////////////////////
 
-  public TSStatus createPipePlugin(CreatePipePluginPlan physicalPlan) {
+  public TSStatus createPipePlugin(CreatePipePluginPlan createPipePluginPlan) {
     try {
-      final PipePluginMeta pipePluginMeta = physicalPlan.getPipePluginMeta();
+      final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta();
+
+      // try to drop the old pipe plugin if exists to reduce the effect of the inconsistency
+      dropPipePlugin(new DropPipePluginPlan(pipePluginMeta.getPluginName()));
+
       pipePluginMetaKeeper.addPipePluginMeta(pipePluginMeta.getPluginName(), pipePluginMeta);
       pipePluginMetaKeeper.addJarNameAndMd5(
           pipePluginMeta.getJarName(), pipePluginMeta.getJarMD5());
 
-      if (physicalPlan.getJarFile() != null) {
+      if (createPipePluginPlan.getJarFile() != null) {
         pipePluginExecutableManager.saveToInstallDir(
-            ByteBuffer.wrap(physicalPlan.getJarFile().getValues()), pipePluginMeta.getJarName());
+            ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()),
+            pipePluginMeta.getJarName());
       }
 
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } catch (Exception e) {
       final String errorMessage =
           String.format(
-              "Failed to add PipePlugin [%s] in PipePlugin_Table on Config Nodes, because of %s",
-              physicalPlan.getPipePluginMeta().getPluginName(), e);
+              "Failed to execute createPipePlugin(%s) on config nodes, because of %s",
+              createPipePluginPlan.getPipePluginMeta().getPluginName(), e);
       LOGGER.warn(errorMessage, e);
       return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
           .setMessage(errorMessage);
     }
   }
 
-  public TSStatus dropPipePlugin(DropPipePluginPlan physicalPlan) {
-    final String pluginName = physicalPlan.getPluginName();
+  public TSStatus dropPipePlugin(DropPipePluginPlan dropPipePluginPlan) {
+    final String pluginName = dropPipePluginPlan.getPluginName();
 
     if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
       pipePluginMetaKeeper.removeJarNameAndMd5IfPossible(
@@ -161,10 +168,10 @@ public class PipePluginInfo implements SnapshotProcessor {
         Arrays.asList(pipePluginMetaKeeper.getAllPipePluginMeta()));
   }
 
-  public JarResp getPipePluginJar(GetPipePluginJarPlan physicalPlan) {
+  public JarResp getPipePluginJar(GetPipePluginJarPlan getPipePluginJarPlan) {
     try {
       List<ByteBuffer> jarList = new ArrayList<>();
-      for (String jarName : physicalPlan.getJarNames()) {
+      for (String jarName : getPipePluginJarPlan.getJarNames()) {
         jarList.add(
             ExecutableManager.transferToBytebuffer(
                 PipePluginExecutableManager.getInstance()
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index 6aa4d6109d..7c8268ecd0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -110,7 +110,7 @@ public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePlugi
     try {
       pipePluginCoordinator.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName);
     } catch (PipeManagementException e) {
-      // if the pipe plugin is not exist, we should end the procedure
+      // if the pipe plugin is a built-in plugin, we should not drop it
       LOGGER.warn(e.getMessage());
       setFailure(new ProcedureException(e.getMessage()));
       pipePluginCoordinator.unlock();
@@ -180,14 +180,14 @@ public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePlugi
     LOGGER.info("DropPipePluginProcedure: rollbackFromDropOnDataNodes({})", pluginName);
 
     // do nothing but wait for rolling back to the previous state: LOCK
-    // TODO: we should drop the pipe plugin on data nodes
+    // TODO: we should drop the pipe plugin on data nodes properly with RuntimeAgent's help
   }
 
   private void rollbackFromDropOnConfigNodes(ConfigNodeProcedureEnv env) {
     LOGGER.info("DropPipePluginProcedure: rollbackFromDropOnConfigNodes({})", pluginName);
 
     // do nothing but wait for rolling back to the previous state: DROP_ON_DATA_NODES
-    // TODO: we should drop the pipe plugin on config nodes
+    // TODO: we should drop the pipe plugin on config nodes properly with RuntimeCoordinator's help
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 2e80a9b0d1..efc3541a8c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -897,7 +897,6 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
   }
 
   @Override
-  @Deprecated
   public TGetAllPipeInfoResp getAllPipeInfo() {
     return configManager.getAllPipeInfo();
   }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index 2e21cbf893..d6972646d4 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -17,5 +17,39 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.pipe.plugin.builtin;public enum BuiltinPipePlugin {
+package org.apache.iotdb.commons.pipe.plugin.builtin;
+
+import org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector;
+import org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor;
+
+public enum BuiltinPipePlugin {
+
+  // processors
+  DO_NOTHING_PROCESSOR("do_nothing_processor", DoNothingProcessor.class),
+
+  // connectors
+  DO_NOTHING_CONNECTOR("do_nothing_connector", DoNothingConnector.class),
+  ;
+
+  private final String pipePluginName;
+  private final Class<?> pipePluginClass;
+  private final String className;
+
+  BuiltinPipePlugin(String functionName, Class<?> pipePluginClass) {
+    this.pipePluginName = functionName;
+    this.pipePluginClass = pipePluginClass;
+    this.className = pipePluginClass.getName();
+  }
+
+  public String getPipePluginName() {
+    return pipePluginName;
+  }
+
+  public Class<?> getPipePluginClass() {
+    return pipePluginClass;
+  }
+
+  public String getClassName() {
+    return className;
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
index e50d3c3c1e..503005fefa 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
@@ -17,5 +17,56 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.pipe.plugin.builtin.connector;public class DoNothingConnector {
+package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
+
+import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+
+public class DoNothingConnector implements PipeConnector {
+
+  @Override
+  public void validate(PipeParameterValidator validator) {
+    // do nothing
+  }
+
+  @Override
+  public void customize(
+      PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration) {
+    // do nothing
+  }
+
+  @Override
+  public void handshake() {
+    // do nothing
+  }
+
+  @Override
+  public void heartbeat() {
+    // do nothing
+  }
+
+  @Override
+  public void transfer(TabletInsertionEvent tabletInsertionEvent) {
+    // do nothing
+  }
+
+  @Override
+  public void transfer(TsFileInsertionEvent tsFileInsertionEvent) {
+    // do nothing
+  }
+
+  @Override
+  public void transfer(DeletionEvent deletionEvent) {
+    // do nothing
+  }
+
+  @Override
+  public void close() {
+    // do nothing
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
index 572a9d365a..45634119db 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
@@ -17,5 +17,52 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.pipe.plugin.builtin.processor;public class DoNothingProcessor {
+package org.apache.iotdb.commons.pipe.plugin.builtin.processor;
+
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+
+import java.io.IOException;
+
+public class DoNothingProcessor implements PipeProcessor {
+
+  @Override
+  public void validate(PipeParameterValidator validator) {
+    // do nothing
+  }
+
+  @Override
+  public void customize(
+      PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) {
+    // do nothing
+  }
+
+  @Override
+  public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector)
+      throws IOException {
+    eventCollector.collectTabletInsertionEvent(tabletInsertionEvent);
+  }
+
+  @Override
+  public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector)
+      throws IOException {
+    eventCollector.collectTsFileInsertionEvent(tsFileInsertionEvent);
+  }
+
+  @Override
+  public void process(DeletionEvent deletionEvent, EventCollector eventCollector)
+      throws IOException {
+    eventCollector.collectDeletionEvent(deletionEvent);
+  }
+
+  @Override
+  public void close() {
+    // do nothing
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java
index c02fdeadb6..7c3632e4b4 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java
@@ -33,7 +33,8 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
   protected final Map<String, Integer> jarNameToReferenceCountMap;
 
   public ConfigNodePipePluginMetaKeeper() {
-    super();
+    loadBuiltInPlugins();
+
     jarNameToMd5Map = new HashMap<>();
     jarNameToReferenceCountMap = new HashMap<>();
   }
@@ -67,6 +68,7 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
     }
   }
 
+  @Override
   public void processTakeSnapshot(OutputStream outputStream) throws IOException {
     ReadWriteIOUtils.write(jarNameToMd5Map.size(), outputStream);
     for (Map.Entry<String, String> entry : jarNameToMd5Map.entrySet()) {
@@ -75,14 +77,13 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
       ReadWriteIOUtils.write(jarNameToReferenceCountMap.get(entry.getKey()), outputStream);
     }
 
-    ReadWriteIOUtils.write(pipeNameToPipeMetaMap.size(), outputStream);
-    for (PipePluginMeta pipePluginMeta : pipeNameToPipeMetaMap.values()) {
-      ReadWriteIOUtils.write(pipePluginMeta.serialize(), outputStream);
-    }
+    super.processTakeSnapshot(outputStream);
   }
 
+  @Override
   public void processLoadSnapshot(InputStream inputStream) throws IOException {
-    clear();
+    jarNameToMd5Map.clear();
+    jarNameToReferenceCountMap.clear();
 
     final int jarSize = ReadWriteIOUtils.readInt(inputStream);
     for (int i = 0; i < jarSize; i++) {
@@ -93,16 +94,6 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
       jarNameToReferenceCountMap.put(jarName, count);
     }
 
-    final int pipePluginMetaSize = ReadWriteIOUtils.readInt(inputStream);
-    for (int i = 0; i < pipePluginMetaSize; i++) {
-      final PipePluginMeta pipePluginMeta = PipePluginMeta.deserialize(inputStream);
-      addPipePluginMeta(pipePluginMeta.getPluginName().toUpperCase(), pipePluginMeta);
-    }
-  }
-
-  public void clear() {
-    pipeNameToPipeMetaMap.clear();
-    jarNameToMd5Map.clear();
-    jarNameToReferenceCountMap.clear();
+    super.processLoadSnapshot(inputStream);
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/DataNodePipePluginMetaKeeper.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/DataNodePipePluginMetaKeeper.java
index 64d7c8ef94..42656ad870 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/DataNodePipePluginMetaKeeper.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/DataNodePipePluginMetaKeeper.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.commons.pipe.plugin.meta;
 
+import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoader;
 
 import java.util.Map;
@@ -26,28 +27,39 @@ import java.util.concurrent.ConcurrentHashMap;
 
 public class DataNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
 
-  private final Map<String, Class<?>> pipeNameToPipeClassMap;
+  private final Map<String, Class<?>> pipePluginNameToClassMap;
 
   public DataNodePipePluginMetaKeeper() {
-    super();
-    pipeNameToPipeClassMap = new ConcurrentHashMap<>();
+    pipePluginNameToClassMap = new ConcurrentHashMap<>();
+
+    loadBuiltInPlugins();
+  }
+
+  @Override
+  protected void loadBuiltInPlugins() {
+    super.loadBuiltInPlugins();
+
+    for (final BuiltinPipePlugin builtinPipePlugin : BuiltinPipePlugin.values()) {
+      addPluginAndClass(
+          builtinPipePlugin.getPipePluginName(), builtinPipePlugin.getPipePluginClass());
+    }
   }
 
   public void addPluginAndClass(String pluginName, Class<?> clazz) {
-    pipeNameToPipeClassMap.put(pluginName.toUpperCase(), clazz);
+    pipePluginNameToClassMap.put(pluginName.toUpperCase(), clazz);
   }
 
   public Class<?> getPluginClass(String pluginName) {
-    return pipeNameToPipeClassMap.get(pluginName.toUpperCase());
+    return pipePluginNameToClassMap.get(pluginName.toUpperCase());
   }
 
   public void removePluginClass(String pluginName) {
-    pipeNameToPipeClassMap.remove(pluginName.toUpperCase());
+    pipePluginNameToClassMap.remove(pluginName.toUpperCase());
   }
 
   public void updatePluginClass(PipePluginMeta pipePluginMeta, PipePluginClassLoader classLoader)
       throws ClassNotFoundException {
-    Class<?> functionClass = Class.forName(pipePluginMeta.getClassName(), true, classLoader);
-    pipeNameToPipeClassMap.put(pipePluginMeta.getPluginName().toUpperCase(), functionClass);
+    final Class<?> functionClass = Class.forName(pipePluginMeta.getClassName(), true, classLoader);
+    pipePluginNameToClassMap.put(pipePluginMeta.getPluginName().toUpperCase(), functionClass);
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMeta.java
index 08bcc614f0..5b11cd4d44 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMeta.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMeta.java
@@ -26,24 +26,39 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.util.Objects;
 
 public class PipePluginMeta {
 
-  private String pluginName;
+  private final String pluginName;
+  private final String className;
 
-  private String className;
+  // jarName and jarMD5 are used to identify the jar file.
+  // they could be null if the plugin is built-in. they should be both null or both not null.
+  private final boolean isBuiltin;
+  private final String jarName;
+  private final String jarMD5;
 
-  private String jarName;
+  public PipePluginMeta(String pluginName, String className, String jarName, String jarMD5) {
+    this.pluginName = Objects.requireNonNull(pluginName).toUpperCase();
+    this.className = Objects.requireNonNull(className);
 
-  private String jarMD5;
+    isBuiltin = false;
+    this.jarName = Objects.requireNonNull(jarName);
+    this.jarMD5 = Objects.requireNonNull(jarMD5);
+  }
 
-  private PipePluginMeta() {}
+  public PipePluginMeta(String pluginName, String className) {
+    this.pluginName = Objects.requireNonNull(pluginName).toUpperCase();
+    this.className = Objects.requireNonNull(className);
 
-  public PipePluginMeta(String pluginName, String className, String jarName, String jarMD5) {
-    this.pluginName = pluginName.toUpperCase();
-    this.className = className;
-    this.jarName = jarName;
-    this.jarMD5 = jarMD5;
+    this.isBuiltin = true;
+    this.jarName = null;
+    this.jarMD5 = null;
+  }
+
+  public boolean isBuiltin() {
+    return isBuiltin;
   }
 
   public String getPluginName() {
@@ -69,6 +84,10 @@ public class PipePluginMeta {
     return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
   }
 
+  /**
+   * All built-in plugins' information is kept in Java class {@code BuiltinPipePlugin }. So we never
+   * serialize the built-in plugins, then we don't need to serialize the isBuiltin field.
+   */
   public void serialize(DataOutputStream outputStream) throws IOException {
     ReadWriteIOUtils.write(pluginName, outputStream);
     ReadWriteIOUtils.write(className, outputStream);
@@ -77,12 +96,11 @@ public class PipePluginMeta {
   }
 
   public static PipePluginMeta deserialize(ByteBuffer byteBuffer) {
-    PipePluginMeta pipePluginMeta = new PipePluginMeta();
-    pipePluginMeta.pluginName = ReadWriteIOUtils.readString(byteBuffer);
-    pipePluginMeta.className = ReadWriteIOUtils.readString(byteBuffer);
-    pipePluginMeta.jarName = ReadWriteIOUtils.readString(byteBuffer);
-    pipePluginMeta.jarMD5 = ReadWriteIOUtils.readString(byteBuffer);
-    return pipePluginMeta;
+    final String pluginName = ReadWriteIOUtils.readString(byteBuffer);
+    final String className = ReadWriteIOUtils.readString(byteBuffer);
+    final String jarName = ReadWriteIOUtils.readString(byteBuffer);
+    final String jarMD5 = ReadWriteIOUtils.readString(byteBuffer);
+    return new PipePluginMeta(pluginName, className, jarName, jarMD5);
   }
 
   public static PipePluginMeta deserialize(InputStream inputStream) throws IOException {
@@ -101,8 +119,9 @@ public class PipePluginMeta {
     PipePluginMeta that = (PipePluginMeta) obj;
     return pluginName.equals(that.pluginName)
         && className.equals(that.className)
-        && jarName.equals(that.jarName)
-        && jarMD5.equals(that.jarMD5);
+        && isBuiltin == that.isBuiltin
+        && Objects.equals(jarName, that.jarName)
+        && Objects.equals(jarMD5, that.jarMD5);
   }
 
   @Override
@@ -119,6 +138,8 @@ public class PipePluginMeta {
         + ", className='"
         + className
         + '\''
+        + ", isBuiltin="
+        + isBuiltin
         + ", jarName='"
         + jarName
         + '\''
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
index 4417141106..9a47fcfb89 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
@@ -19,40 +19,78 @@
 
 package org.apache.iotdb.commons.pipe.plugin.meta;
 
+import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 
 public abstract class PipePluginMetaKeeper {
 
-  protected final Map<String, PipePluginMeta> pipeNameToPipeMetaMap;
+  protected final Map<String, PipePluginMeta> pipePluginNameToMetaMap = new ConcurrentHashMap<>();
 
-  public PipePluginMetaKeeper() {
-    pipeNameToPipeMetaMap = new ConcurrentHashMap<>();
+  protected void loadBuiltInPlugins() {
+    for (final BuiltinPipePlugin builtinPipePlugin : BuiltinPipePlugin.values()) {
+      addPipePluginMeta(
+          builtinPipePlugin.getPipePluginName(),
+          new PipePluginMeta(
+              builtinPipePlugin.getPipePluginName(), builtinPipePlugin.getClassName()));
+    }
   }
 
   public void addPipePluginMeta(String pluginName, PipePluginMeta pipePluginMeta) {
-    pipeNameToPipeMetaMap.put(pluginName.toUpperCase(), pipePluginMeta);
+    pipePluginNameToMetaMap.put(pluginName.toUpperCase(), pipePluginMeta);
   }
 
   public void removePipePluginMeta(String pluginName) {
-    pipeNameToPipeMetaMap.remove(pluginName.toUpperCase());
+    pipePluginNameToMetaMap.remove(pluginName.toUpperCase());
   }
 
   public PipePluginMeta getPipePluginMeta(String pluginName) {
-    return pipeNameToPipeMetaMap.get(pluginName.toUpperCase());
+    return pipePluginNameToMetaMap.get(pluginName.toUpperCase());
   }
 
   public PipePluginMeta[] getAllPipePluginMeta() {
-    return pipeNameToPipeMetaMap.values().toArray(new PipePluginMeta[0]);
+    return pipePluginNameToMetaMap.values().toArray(new PipePluginMeta[0]);
   }
 
   public boolean containsPipePlugin(String pluginName) {
-    return pipeNameToPipeMetaMap.containsKey(pluginName.toUpperCase());
+    return pipePluginNameToMetaMap.containsKey(pluginName.toUpperCase());
+  }
+
+  protected void processTakeSnapshot(OutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write(
+        (int)
+            pipePluginNameToMetaMap.values().stream()
+                .filter(pipePluginMeta -> !pipePluginMeta.isBuiltin())
+                .count(),
+        outputStream);
+
+    for (PipePluginMeta pipePluginMeta : pipePluginNameToMetaMap.values()) {
+      if (pipePluginMeta.isBuiltin()) {
+        continue;
+      }
+      ReadWriteIOUtils.write(pipePluginMeta.serialize(), outputStream);
+    }
   }
 
-  public void clear() {
-    pipeNameToPipeMetaMap.clear();
+  protected void processLoadSnapshot(InputStream inputStream) throws IOException {
+    pipePluginNameToMetaMap.forEach(
+        (pluginName, pluginMeta) -> {
+          if (!pluginMeta.isBuiltin()) {
+            pipePluginNameToMetaMap.remove(pluginName);
+          }
+        });
+
+    final int pipePluginMetaSize = ReadWriteIOUtils.readInt(inputStream);
+    for (int i = 0; i < pipePluginMetaSize; i++) {
+      final PipePluginMeta pipePluginMeta = PipePluginMeta.deserialize(inputStream);
+      addPipePluginMeta(pipePluginMeta.getPluginName().toUpperCase(), pipePluginMeta);
+    }
   }
 
   @Override
@@ -64,11 +102,11 @@ public abstract class PipePluginMetaKeeper {
       return false;
     }
     PipePluginMetaKeeper that = (PipePluginMetaKeeper) o;
-    return pipeNameToPipeMetaMap.equals(that.pipeNameToPipeMetaMap);
+    return pipePluginNameToMetaMap.equals(that.pipePluginNameToMetaMap);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(pipeNameToPipeMetaMap);
+    return Objects.hash(pipePluginNameToMetaMap);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 2dfbdef2c8..9f98d4979e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -172,6 +172,7 @@ import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement;
 import org.apache.iotdb.db.trigger.service.TriggerClassLoader;
+import org.apache.iotdb.pipe.api.PipePlugin;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -759,7 +760,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
       try (PipePluginClassLoader classLoader = new PipePluginClassLoader(libRoot)) {
         // ensure that jar file contains the class and the class is a pipe plugin
         Class<?> clazz = Class.forName(createPipePluginStatement.getClassName(), true, classLoader);
-        clazz.getDeclaredConstructor().newInstance();
+        PipePlugin ignored = (PipePlugin) clazz.getDeclaredConstructor().newInstance();
       } catch (ClassNotFoundException
           | NoSuchMethodException
           | InstantiationException
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
index 1c50fe5b40..a38d86f974 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.pipe.agent;
 
-import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
 import org.apache.iotdb.db.pipe.agent.plugin.PipePluginAgent;
 import org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent;
 import org.apache.iotdb.db.pipe.agent.task.PipeTaskAgent;
@@ -33,9 +32,7 @@ public class PipeAgent {
 
   /** Private constructor to prevent users from creating a new instance. */
   private PipeAgent() {
-    final DataNodePipePluginMetaKeeper pipePluginMetaKeeper = new DataNodePipePluginMetaKeeper();
-
-    pipePluginAgent = PipePluginAgent.setupAndGetInstance(pipePluginMetaKeeper);
+    pipePluginAgent = PipePluginAgent.setupAndGetInstance();
     pipeTaskAgent = PipeTaskAgent.setupAndGetInstance();
     pipeRuntimeAgent = PipeRuntimeAgent.setupAndGetInstance();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index d4c8359bcc..1f5f70bd3d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -58,6 +58,10 @@ public class PipePluginAgent {
   public void register(PipePluginMeta pipePluginMeta, ByteBuffer jarFile) throws Exception {
     acquireLock();
     try {
+      // try to deregister first to avoid inconsistent state
+      deregister(pipePluginMeta.getPluginName(), false);
+
+      // register process from here
       checkIfRegistered(pipePluginMeta);
       saveJarFileIfNeeded(pipePluginMeta.getJarName(), jarFile);
       doRegister(pipePluginMeta);
@@ -73,6 +77,15 @@ public class PipePluginAgent {
       return;
     }
 
+    if (information.isBuiltin()) {
+      String errorMessage =
+          String.format(
+              "Failed to register PipePlugin %s, because the given PipePlugin name is the same as a built-in PipePlugin name.",
+              pluginName);
+      LOGGER.warn(errorMessage);
+      throw new PipeManagementException(errorMessage);
+    }
+
     if (PipePluginExecutableManager.getInstance()
             .hasFileUnderInstallDir(pipePluginMeta.getJarName())
         && !PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) {
@@ -84,6 +97,9 @@ public class PipePluginAgent {
       LOGGER.warn(errMsg);
       throw new PipeManagementException(errMsg);
     }
+
+    // if the pipe plugin is already registered and the jar file is the same, do nothing
+    // we allow users to register the same pipe plugin multiple times without any error
   }
 
   private void saveJarFileIfNeeded(String jarName, ByteBuffer byteBuffer) throws IOException {
@@ -111,7 +127,7 @@ public class PipePluginAgent {
 
       final Class<?> pluginClass = Class.forName(className, true, currentActiveClassLoader);
       // ensure that it is a PipePlugin class
-      PipePlugin ignored = (PipePlugin) pluginClass.getDeclaredConstructor().newInstance();
+      final PipePlugin ignored = (PipePlugin) pluginClass.getDeclaredConstructor().newInstance();
 
       pipePluginMetaKeeper.addPipePluginMeta(pluginName, pipePluginMeta);
       pipePluginMetaKeeper.addPluginAndClass(pluginName, pluginClass);
@@ -141,15 +157,21 @@ public class PipePluginAgent {
   public void deregister(String pluginName, boolean needToDeleteJar) throws Exception {
     acquireLock();
     try {
-      PipePluginMeta information = pipePluginMetaKeeper.getPipePluginMeta(pluginName);
-      if (information == null) {
-        return;
+      final PipePluginMeta information = pipePluginMetaKeeper.getPipePluginMeta(pluginName);
+
+      if (information != null && information.isBuiltin()) {
+        String errorMessage =
+            String.format("Failed to deregister builtin PipePlugin %s.", pluginName);
+        LOGGER.warn(errorMessage);
+        throw new PipeManagementException(errorMessage);
       }
 
+      // remove anyway
       pipePluginMetaKeeper.removePipePluginMeta(pluginName);
       pipePluginMetaKeeper.removePluginClass(pluginName);
 
-      if (needToDeleteJar) {
+      // if it is needed to delete jar file of the pipe plugin, delete both jar file and md5
+      if (information != null && needToDeleteJar) {
         PipePluginExecutableManager.getInstance().removeFileUnderLibRoot(information.getJarName());
         PipePluginExecutableManager.getInstance()
             .removeFileUnderTemporaryRoot(pluginName.toUpperCase() + ".txt");
@@ -188,18 +210,17 @@ public class PipePluginAgent {
 
   /////////////////////////  Singleton Instance Holder  /////////////////////////
 
-  private PipePluginAgent(DataNodePipePluginMetaKeeper pipePluginMetaKeeper) {
-    this.pipePluginMetaKeeper = pipePluginMetaKeeper;
+  private PipePluginAgent() {
+    this.pipePluginMetaKeeper = new DataNodePipePluginMetaKeeper();
   }
 
   private static class PipePluginAgentServiceHolder {
     private static PipePluginAgent instance = null;
   }
 
-  public static PipePluginAgent setupAndGetInstance(
-      DataNodePipePluginMetaKeeper pipePluginMetaKeeper) {
+  public static PipePluginAgent setupAndGetInstance() {
     if (PipePluginAgentServiceHolder.instance == null) {
-      PipePluginAgentServiceHolder.instance = new PipePluginAgent(pipePluginMetaKeeper);
+      PipePluginAgentServiceHolder.instance = new PipePluginAgent();
     }
     return PipePluginAgentServiceHolder.instance;
   }