You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/14 19:24:25 UTC

[iotdb] branch master updated: [IOTDB-5873] Pipe: Support `CREATE`, `DROP`, `START`, `STOP` & `SHOW` Clauses (#9849)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 803c87fcdb0 [IOTDB-5873] Pipe: Support `CREATE`, `DROP`, `START`, `STOP` & `SHOW` Clauses (#9849)
803c87fcdb0 is described below

commit 803c87fcdb097292463eb23d387d7032c275aacf
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon May 15 03:24:18 2023 +0800

    [IOTDB-5873] Pipe: Support `CREATE`, `DROP`, `START`, `STOP` & `SHOW` Clauses (#9849)
---
 .../response/pipe/task/PipeTableResp.java          | 76 ++++++++++++++++++-
 .../iotdb/confignode/manager/ConfigManager.java    | 11 +--
 .../manager/pipe/PipeTaskCoordinator.java          | 11 ++-
 .../persistence/pipe/PipePluginInfo.java           | 49 +++++++++++-
 .../runtime/PipeHandleLeaderChangeProcedure.java   |  4 +-
 .../pipe/task/AbstractOperatePipeProcedureV2.java  |  2 +-
 .../impl/pipe/task/CreatePipeProcedureV2.java      | 16 ++--
 .../procedure/store/ProcedureFactory.java          | 10 +++
 .../iotdb/commons/pipe/task/meta/PipeMeta.java     |  9 +--
 .../commons/pipe/task/meta/PipeRuntimeMeta.java    | 27 ++++++-
 .../commons/pipe/task/meta/PipeStaticMeta.java     | 88 ++++++++++++++++------
 .../iotdb/commons/pipe/task/meta/PipeTaskMeta.java | 27 ++++++-
 .../commons/pipe/task/meta/PipeMetaDeSerTest.java  | 77 +++++++++++++++++++
 .../config/executor/ClusterConfigTaskExecutor.java |  5 +-
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       | 12 ++-
 .../db/pipe/config/PipeCollectorConstant.java      |  5 +-
 .../realtime/PipeRealtimeDataRegionCollector.java  |  6 +-
 .../connector/PipeConnectorSubtaskManager.java     | 23 +++++-
 .../db/pipe/task/stage/PipeTaskCollectorStage.java | 25 ++++--
 .../db/pipe/task/stage/PipeTaskProcessorStage.java | 44 ++++++++---
 .../impl/DataNodeInternalRPCServiceImpl.java       |  6 +-
 .../collector/CachedSchemaPatternMatcherTest.java  |  8 +-
 .../core/collector/PipeRealtimeCollectTest.java    |  8 +-
 23 files changed, 458 insertions(+), 91 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
index a0416ede651..2d6f401eefb 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
@@ -21,7 +21,12 @@ package org.apache.iotdb.confignode.consensus.response.pipe.task;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
 import org.apache.iotdb.consensus.common.DataSet;
 
 import java.io.IOException;
@@ -39,11 +44,80 @@ public class PipeTableResp implements DataSet {
     this.allPipeMeta = allPipeMeta;
   }
 
-  public TGetAllPipeInfoResp convertToThriftResponse() throws IOException {
+  public PipeTableResp filter(Boolean whereClause, String pipeName) {
+    if (whereClause == null) {
+      if (pipeName == null) {
+        return this;
+      } else {
+        final List<PipeMeta> filteredPipeMeta = new ArrayList<>();
+        for (PipeMeta pipeMeta : allPipeMeta) {
+          if (pipeMeta.getStaticMeta().getPipeName().equals(pipeName)) {
+            filteredPipeMeta.add(pipeMeta);
+            break;
+          }
+        }
+        return new PipeTableResp(status, filteredPipeMeta);
+      }
+    } else {
+      if (pipeName == null) {
+        return this;
+      } else {
+        String sortedConnectorParametersString = null;
+        for (PipeMeta pipeMeta : allPipeMeta) {
+          if (pipeMeta.getStaticMeta().getPipeName().equals(pipeName)) {
+            sortedConnectorParametersString =
+                pipeMeta.getStaticMeta().getConnectorParameters().toString();
+            break;
+          }
+        }
+
+        final List<PipeMeta> filteredPipeMeta = new ArrayList<>();
+        for (PipeMeta pipeMeta : allPipeMeta) {
+          if (pipeMeta
+              .getStaticMeta()
+              .getConnectorParameters()
+              .toString()
+              .equals(sortedConnectorParametersString)) {
+            filteredPipeMeta.add(pipeMeta);
+          }
+        }
+        return new PipeTableResp(status, filteredPipeMeta);
+      }
+    }
+  }
+
+  public TGetAllPipeInfoResp convertToTGetAllPipeInfoResp() throws IOException {
     final List<ByteBuffer> pipeInformationByteBuffers = new ArrayList<>();
     for (PipeMeta pipeMeta : allPipeMeta) {
       pipeInformationByteBuffers.add(pipeMeta.serialize());
     }
     return new TGetAllPipeInfoResp(status, pipeInformationByteBuffers);
   }
+
+  public TShowPipeResp convertToTShowPipeResp() {
+    final List<TShowPipeInfo> showPipeInfoList = new ArrayList<>();
+
+    for (PipeMeta pipeMeta : allPipeMeta) {
+      final PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
+      final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
+      final StringBuilder exceptionMessageBuilder = new StringBuilder();
+      for (PipeTaskMeta pipeTaskMeta : runtimeMeta.getConsensusGroupIdToTaskMetaMap().values()) {
+        for (Exception e : pipeTaskMeta.getExceptionMessages()) {
+          exceptionMessageBuilder.append(e.getMessage()).append("\n");
+        }
+      }
+
+      showPipeInfoList.add(
+          new TShowPipeInfo(
+              staticMeta.getPipeName(),
+              staticMeta.getCreationTime(),
+              runtimeMeta.getStatus().get().name(),
+              staticMeta.getCollectorParameters().toString(),
+              staticMeta.getProcessorParameters().toString(),
+              staticMeta.getConnectorParameters().toString(),
+              exceptionMessageBuilder.toString()));
+    }
+
+    return new TShowPipeResp().setStatus(status).setPipeInfoList(showPipeInfoList);
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 4d9a4cdc8c8..a6877c1bdca 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -1612,14 +1612,9 @@ public class ConfigManager implements IManager {
   @Override
   public TShowPipeResp showPipe(TShowPipeReq req) {
     TSStatus status = confirmLeader();
-    LOGGER.info("showPipe: {}", req);
-    TShowPipeResp resp = new TShowPipeResp();
-    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      // TODO: Implement PipeManager
-      return resp.setStatus(status);
-    } else {
-      return resp.setStatus(status);
-    }
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        ? pipeManager.getPipeTaskCoordinator().showPipes(req)
+        : new TShowPipeResp().setStatus(status);
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
index c99bd7d1a04..e01f4b785bc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -78,7 +80,7 @@ public class PipeTaskCoordinator {
     try {
       return ((PipeTableResp)
               configManager.getConsensusManager().read(new ShowPipePlanV2()).getDataset())
-          .convertToThriftResponse();
+          .convertToTGetAllPipeInfoResp();
     } catch (IOException e) {
       LOGGER.error("Fail to get AllPipeInfo", e);
       return new TGetAllPipeInfoResp(
@@ -88,6 +90,13 @@ public class PipeTaskCoordinator {
     }
   }
 
+  public TShowPipeResp showPipes(TShowPipeReq req) {
+    return ((PipeTableResp)
+            configManager.getConsensusManager().read(new ShowPipePlanV2()).getDataset())
+        .filter(req.whereClause, req.pipeName)
+        .convertToTShowPipeResp();
+  }
+
   public TSStatus recordPipeMessage(TRecordPipeMessageReq req) {
     throw new UnsupportedOperationException("Not implemented yet");
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 3818070daca..c954df5770e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -34,6 +34,10 @@ import org.apache.iotdb.confignode.consensus.response.pipe.plugin.PipePluginTabl
 import org.apache.iotdb.confignode.consensus.response.udf.JarResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.config.PipeConnectorConstant;
+import org.apache.iotdb.db.pipe.config.PipeProcessorConstant;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeManagementException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -52,6 +56,9 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.locks.ReentrantLock;
 
+import static org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.DEFAULT_COLLECTOR;
+import static org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin.DO_NOTHING_PROCESSOR;
+
 public class PipePluginInfo implements SnapshotProcessor {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PipePluginInfo.class);
@@ -115,8 +122,46 @@ public class PipePluginInfo implements SnapshotProcessor {
   }
 
   public boolean checkBeforeCreatePipe(TCreatePipeReq createPipeRequest) {
-    // TODO: validate the plugins in the create pipe Req.
-    throw new UnsupportedOperationException("Not implemented yet");
+    final PipeParameters collectorParameters =
+        new PipeParameters(createPipeRequest.getCollectorAttributes());
+    final String collectorPluginName =
+        collectorParameters.getStringOrDefault(
+            PipeCollectorConstant.COLLECTOR_KEY, DEFAULT_COLLECTOR.getPipePluginName());
+    if (!pipePluginMetaKeeper.containsPipePlugin(collectorPluginName)) {
+      LOGGER.warn(
+          "Failed to create pipe, the pipe collector plugin {} does not exist",
+          collectorPluginName);
+      return false;
+    }
+
+    final PipeParameters processorParameters =
+        new PipeParameters(createPipeRequest.getProcessorAttributes());
+    final String processorPluginName =
+        processorParameters.getStringOrDefault(
+            PipeProcessorConstant.PROCESSOR_KEY, DO_NOTHING_PROCESSOR.getPipePluginName());
+    if (!pipePluginMetaKeeper.containsPipePlugin(processorPluginName)) {
+      LOGGER.warn(
+          "Failed to create pipe, the pipe processor plugin {} does not exist",
+          processorPluginName);
+      return false;
+    }
+
+    final PipeParameters connectorParameters =
+        new PipeParameters(createPipeRequest.getConnectorAttributes());
+    if (!connectorParameters.hasAttribute(PipeConnectorConstant.CONNECTOR_KEY)) {
+      LOGGER.warn("Failed to create pipe, the pipe connector plugin is not specified");
+      return false;
+    }
+    final String connectorPluginName =
+        connectorParameters.getString(PipeConnectorConstant.CONNECTOR_KEY);
+    if (!pipePluginMetaKeeper.containsPipePlugin(connectorPluginName)) {
+      LOGGER.warn(
+          "Failed to create pipe, the pipe connector plugin {} does not exist",
+          connectorPluginName);
+      return false;
+    }
+
+    return true;
   }
 
   /////////////////////////////// Pipe Plugin Management ///////////////////////////////
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index adc787f58f9..42de84cc9e5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -153,7 +153,7 @@ public class PipeHandleLeaderChangeProcedure extends AbstractOperatePipeProcedur
   public void serialize(DataOutputStream stream) throws IOException {
     stream.writeShort(ProcedureType.PIPE_HANDLE_LEADER_CHANGE_PROCEDURE.getTypeCode());
     super.serialize(stream);
-    stream.writeInt(dataRegionGroupToOldAndNewLeaderPairMap.size());
+    ReadWriteIOUtils.write(dataRegionGroupToOldAndNewLeaderPairMap.size(), stream);
     for (Map.Entry<TConsensusGroupId, Pair<Integer, Integer>> entry :
         dataRegionGroupToOldAndNewLeaderPairMap.entrySet()) {
       ReadWriteIOUtils.write(entry.getKey().getId(), stream);
@@ -165,7 +165,7 @@ public class PipeHandleLeaderChangeProcedure extends AbstractOperatePipeProcedur
   @Override
   public void deserialize(ByteBuffer byteBuffer) {
     super.deserialize(byteBuffer);
-    final int size = byteBuffer.getInt();
+    final int size = ReadWriteIOUtils.readInt(byteBuffer);
     for (int i = 0; i < size; ++i) {
       final int dataRegionGroupId = ReadWriteIOUtils.readInt(byteBuffer);
       final int oldDataRegionLeaderId = ReadWriteIOUtils.readInt(byteBuffer);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
index 15b36ecc8dd..402a9d43bf3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
@@ -208,7 +208,7 @@ public abstract class AbstractOperatePipeProcedureV2
   @Override
   public void serialize(DataOutputStream stream) throws IOException {
     super.serialize(stream);
-    stream.writeBoolean(isRollbackFromOperateOnDataNodesSuccessful);
+    ReadWriteIOUtils.write(isRollbackFromOperateOnDataNodesSuccessful, stream);
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index f3bc82dd725..ec45d1c26b4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -180,26 +180,26 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
     stream.writeShort(ProcedureType.CREATE_PIPE_PROCEDURE_V2.getTypeCode());
     super.serialize(stream);
     ReadWriteIOUtils.write(createPipeRequest.getPipeName(), stream);
-    stream.writeInt(createPipeRequest.getCollectorAttributesSize());
+    ReadWriteIOUtils.write(createPipeRequest.getCollectorAttributesSize(), stream);
     for (Map.Entry<String, String> entry : createPipeRequest.getCollectorAttributes().entrySet()) {
       ReadWriteIOUtils.write(entry.getKey(), stream);
       ReadWriteIOUtils.write(entry.getValue(), stream);
     }
-    stream.writeInt(createPipeRequest.getProcessorAttributesSize());
+    ReadWriteIOUtils.write(createPipeRequest.getProcessorAttributesSize(), stream);
     for (Map.Entry<String, String> entry : createPipeRequest.getProcessorAttributes().entrySet()) {
       ReadWriteIOUtils.write(entry.getKey(), stream);
       ReadWriteIOUtils.write(entry.getValue(), stream);
     }
-    stream.writeInt(createPipeRequest.getConnectorAttributesSize());
+    ReadWriteIOUtils.write(createPipeRequest.getConnectorAttributesSize(), stream);
     for (Map.Entry<String, String> entry : createPipeRequest.getConnectorAttributes().entrySet()) {
       ReadWriteIOUtils.write(entry.getKey(), stream);
       ReadWriteIOUtils.write(entry.getValue(), stream);
     }
     if (pipeStaticMeta != null) {
-      stream.writeBoolean(true);
+      ReadWriteIOUtils.write(true, stream);
       pipeStaticMeta.serialize(stream);
     } else {
-      stream.writeBoolean(false);
+      ReadWriteIOUtils.write(false, stream);
     }
   }
 
@@ -212,19 +212,19 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
             .setCollectorAttributes(new HashMap<>())
             .setProcessorAttributes(new HashMap<>())
             .setConnectorAttributes(new HashMap<>());
-    int size = byteBuffer.getInt();
+    int size = ReadWriteIOUtils.readInt(byteBuffer);
     for (int i = 0; i < size; ++i) {
       createPipeRequest
           .getCollectorAttributes()
           .put(ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
     }
-    size = byteBuffer.getInt();
+    size = ReadWriteIOUtils.readInt(byteBuffer);
     for (int i = 0; i < size; ++i) {
       createPipeRequest
           .getProcessorAttributes()
           .put(ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
     }
-    size = byteBuffer.getInt();
+    size = ReadWriteIOUtils.readInt(byteBuffer);
     for (int i = 0; i < size; ++i) {
       createPipeRequest
           .getConnectorAttributes()
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index 724a86f19cb..a24475fc8c9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -200,6 +200,16 @@ public class ProcedureFactory implements IProcedureFactory {
       return ProcedureType.CREATE_MODEL_PROCEDURE;
     } else if (procedure instanceof DropModelProcedure) {
       return ProcedureType.DROP_MODEL_PROCEDURE;
+    } else if (procedure instanceof CreatePipeProcedureV2) {
+      return ProcedureType.CREATE_PIPE_PROCEDURE_V2;
+    } else if (procedure instanceof StartPipeProcedureV2) {
+      return ProcedureType.START_PIPE_PROCEDURE_V2;
+    } else if (procedure instanceof StopPipeProcedureV2) {
+      return ProcedureType.STOP_PIPE_PROCEDURE_V2;
+    } else if (procedure instanceof DropPipeProcedureV2) {
+      return ProcedureType.DROP_PIPE_PROCEDURE_V2;
+    } else if (procedure instanceof PipeHandleLeaderChangeProcedure) {
+      return ProcedureType.PIPE_HANDLE_LEADER_CHANGE_PROCEDURE;
     }
     return null;
   }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
index 1fa610cc35c..8b15b0ffa9c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.commons.pipe.task.meta;
 
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataOutputStream;
 import java.io.FileInputStream;
@@ -55,13 +54,13 @@ public class PipeMeta {
   }
 
   public void serialize(DataOutputStream outputStream) throws IOException {
-    ReadWriteIOUtils.write(staticMeta.serialize(), outputStream);
-    ReadWriteIOUtils.write(runtimeMeta.serialize(), outputStream);
+    staticMeta.serialize(outputStream);
+    runtimeMeta.serialize(outputStream);
   }
 
   public void serialize(FileOutputStream outputStream) throws IOException {
-    ReadWriteIOUtils.write(staticMeta.serialize(), outputStream);
-    ReadWriteIOUtils.write(runtimeMeta.serialize(), outputStream);
+    staticMeta.serialize(outputStream);
+    runtimeMeta.serialize(outputStream);
   }
 
   public static PipeMeta deserialize(FileInputStream fileInputStream) throws IOException {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMeta.java
index 683177573ad..54f664c3953 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMeta.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeRuntimeMeta.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataOutputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
@@ -74,9 +75,31 @@ public class PipeRuntimeMeta {
     }
   }
 
+  public void serialize(FileOutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write(status.get().getType(), outputStream);
+
+    ReadWriteIOUtils.write(consensusGroupIdToTaskMetaMap.size(), outputStream);
+    for (Map.Entry<TConsensusGroupId, PipeTaskMeta> entry :
+        consensusGroupIdToTaskMetaMap.entrySet()) {
+      ReadWriteIOUtils.write(entry.getKey().getId(), outputStream);
+      entry.getValue().serialize(outputStream);
+    }
+  }
+
   public static PipeRuntimeMeta deserialize(InputStream inputStream) throws IOException {
-    return deserialize(
-        ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(inputStream)));
+    final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta();
+
+    pipeRuntimeMeta.status.set(PipeStatus.getPipeStatus(ReadWriteIOUtils.readByte(inputStream)));
+
+    final int size = ReadWriteIOUtils.readInt(inputStream);
+    for (int i = 0; i < size; ++i) {
+      pipeRuntimeMeta.consensusGroupIdToTaskMetaMap.put(
+          new TConsensusGroupId(
+              TConsensusGroupType.DataRegion, ReadWriteIOUtils.readInt(inputStream)),
+          PipeTaskMeta.deserialize(inputStream));
+    }
+
+    return pipeRuntimeMeta;
   }
 
   public static PipeRuntimeMeta deserialize(ByteBuffer byteBuffer) {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
index f487de445cb..df8be6538a2 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataOutputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
@@ -47,7 +48,7 @@ public class PipeStaticMeta {
       Map<String, String> collectorAttributes,
       Map<String, String> processorAttributes,
       Map<String, String> connectorAttributes) {
-    this.pipeName = pipeName.toUpperCase();
+    this.pipeName = pipeName;
     this.creationTime = creationTime;
     collectorParameters = new PipeParameters(collectorAttributes);
     processorParameters = new PipeParameters(processorAttributes);
@@ -85,17 +86,38 @@ public class PipeStaticMeta {
     ReadWriteIOUtils.write(pipeName, outputStream);
     ReadWriteIOUtils.write(creationTime, outputStream);
 
-    outputStream.writeInt(collectorParameters.getAttribute().size());
+    ReadWriteIOUtils.write(collectorParameters.getAttribute().size(), outputStream);
     for (Map.Entry<String, String> entry : collectorParameters.getAttribute().entrySet()) {
       ReadWriteIOUtils.write(entry.getKey(), outputStream);
       ReadWriteIOUtils.write(entry.getValue(), outputStream);
     }
-    outputStream.writeInt(processorParameters.getAttribute().size());
+    ReadWriteIOUtils.write(processorParameters.getAttribute().size(), outputStream);
     for (Map.Entry<String, String> entry : processorParameters.getAttribute().entrySet()) {
       ReadWriteIOUtils.write(entry.getKey(), outputStream);
       ReadWriteIOUtils.write(entry.getValue(), outputStream);
     }
-    outputStream.writeInt(connectorParameters.getAttribute().size());
+    ReadWriteIOUtils.write(connectorParameters.getAttribute().size(), outputStream);
+    for (Map.Entry<String, String> entry : connectorParameters.getAttribute().entrySet()) {
+      ReadWriteIOUtils.write(entry.getKey(), outputStream);
+      ReadWriteIOUtils.write(entry.getValue(), outputStream);
+    }
+  }
+
+  public void serialize(FileOutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write(pipeName, outputStream);
+    ReadWriteIOUtils.write(creationTime, outputStream);
+
+    ReadWriteIOUtils.write(collectorParameters.getAttribute().size(), outputStream);
+    for (Map.Entry<String, String> entry : collectorParameters.getAttribute().entrySet()) {
+      ReadWriteIOUtils.write(entry.getKey(), outputStream);
+      ReadWriteIOUtils.write(entry.getValue(), outputStream);
+    }
+    ReadWriteIOUtils.write(processorParameters.getAttribute().size(), outputStream);
+    for (Map.Entry<String, String> entry : processorParameters.getAttribute().entrySet()) {
+      ReadWriteIOUtils.write(entry.getKey(), outputStream);
+      ReadWriteIOUtils.write(entry.getValue(), outputStream);
+    }
+    ReadWriteIOUtils.write(connectorParameters.getAttribute().size(), outputStream);
     for (Map.Entry<String, String> entry : connectorParameters.getAttribute().entrySet()) {
       ReadWriteIOUtils.write(entry.getKey(), outputStream);
       ReadWriteIOUtils.write(entry.getValue(), outputStream);
@@ -103,8 +125,35 @@ public class PipeStaticMeta {
   }
 
   public static PipeStaticMeta deserialize(InputStream inputStream) throws IOException {
-    return deserialize(
-        ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(inputStream)));
+    final PipeStaticMeta pipeStaticMeta = new PipeStaticMeta();
+
+    pipeStaticMeta.pipeName = ReadWriteIOUtils.readString(inputStream);
+    pipeStaticMeta.creationTime = ReadWriteIOUtils.readLong(inputStream);
+
+    pipeStaticMeta.collectorParameters = new PipeParameters(new HashMap<>());
+    pipeStaticMeta.processorParameters = new PipeParameters(new HashMap<>());
+    pipeStaticMeta.connectorParameters = new PipeParameters(new HashMap<>());
+
+    int size = ReadWriteIOUtils.readInt(inputStream);
+    for (int i = 0; i < size; ++i) {
+      final String key = ReadWriteIOUtils.readString(inputStream);
+      final String value = ReadWriteIOUtils.readString(inputStream);
+      pipeStaticMeta.collectorParameters.getAttribute().put(key, value);
+    }
+    size = ReadWriteIOUtils.readInt(inputStream);
+    for (int i = 0; i < size; ++i) {
+      final String key = ReadWriteIOUtils.readString(inputStream);
+      final String value = ReadWriteIOUtils.readString(inputStream);
+      pipeStaticMeta.processorParameters.getAttribute().put(key, value);
+    }
+    size = ReadWriteIOUtils.readInt(inputStream);
+    for (int i = 0; i < size; ++i) {
+      final String key = ReadWriteIOUtils.readString(inputStream);
+      final String value = ReadWriteIOUtils.readString(inputStream);
+      pipeStaticMeta.connectorParameters.getAttribute().put(key, value);
+    }
+
+    return pipeStaticMeta;
   }
 
   public static PipeStaticMeta deserialize(ByteBuffer byteBuffer) {
@@ -117,26 +166,23 @@ public class PipeStaticMeta {
     pipeStaticMeta.processorParameters = new PipeParameters(new HashMap<>());
     pipeStaticMeta.connectorParameters = new PipeParameters(new HashMap<>());
 
-    int size = byteBuffer.getInt();
+    int size = ReadWriteIOUtils.readInt(byteBuffer);
     for (int i = 0; i < size; ++i) {
-      pipeStaticMeta
-          .collectorParameters
-          .getAttribute()
-          .put(ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
+      final String key = ReadWriteIOUtils.readString(byteBuffer);
+      final String value = ReadWriteIOUtils.readString(byteBuffer);
+      pipeStaticMeta.collectorParameters.getAttribute().put(key, value);
     }
-    size = byteBuffer.getInt();
+    size = ReadWriteIOUtils.readInt(byteBuffer);
     for (int i = 0; i < size; ++i) {
-      pipeStaticMeta
-          .processorParameters
-          .getAttribute()
-          .put(ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
+      final String key = ReadWriteIOUtils.readString(byteBuffer);
+      final String value = ReadWriteIOUtils.readString(byteBuffer);
+      pipeStaticMeta.processorParameters.getAttribute().put(key, value);
     }
-    size = byteBuffer.getInt();
+    size = ReadWriteIOUtils.readInt(byteBuffer);
     for (int i = 0; i < size; ++i) {
-      pipeStaticMeta
-          .connectorParameters
-          .getAttribute()
-          .put(ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readString(byteBuffer));
+      final String key = ReadWriteIOUtils.readString(byteBuffer);
+      final String value = ReadWriteIOUtils.readString(byteBuffer);
+      pipeStaticMeta.connectorParameters.getAttribute().put(key, value);
     }
 
     return pipeStaticMeta;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
index 1131047c104..213c3da3a22 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataOutputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
@@ -93,6 +94,17 @@ public class PipeTaskMeta {
     }
   }
 
+  public void serialize(FileOutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write(progressIndex.get(), outputStream);
+    ReadWriteIOUtils.write(regionLeader.get(), outputStream);
+    ReadWriteIOUtils.write(exceptionMessages.size(), outputStream);
+    for (final PipeRuntimeException exceptionMessage : exceptionMessages) {
+      ReadWriteIOUtils.write(
+          exceptionMessage instanceof PipeRuntimeCriticalException, outputStream);
+      ReadWriteIOUtils.write(exceptionMessage.getMessage(), outputStream);
+    }
+  }
+
   public static PipeTaskMeta deserialize(ByteBuffer byteBuffer) {
     final PipeTaskMeta PipeTaskMeta = new PipeTaskMeta();
     PipeTaskMeta.progressIndex.set(ReadWriteIOUtils.readLong(byteBuffer));
@@ -110,8 +122,19 @@ public class PipeTaskMeta {
   }
 
   public static PipeTaskMeta deserialize(InputStream inputStream) throws IOException {
-    return deserialize(
-        ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(inputStream)));
+    final PipeTaskMeta PipeTaskMeta = new PipeTaskMeta();
+    PipeTaskMeta.progressIndex.set(ReadWriteIOUtils.readLong(inputStream));
+    PipeTaskMeta.regionLeader.set(ReadWriteIOUtils.readInt(inputStream));
+    final int size = ReadWriteIOUtils.readInt(inputStream);
+    for (int i = 0; i < size; ++i) {
+      final boolean critical = ReadWriteIOUtils.readBool(inputStream);
+      final String message = ReadWriteIOUtils.readString(inputStream);
+      PipeTaskMeta.exceptionMessages.add(
+          critical
+              ? new PipeRuntimeCriticalException(message)
+              : new PipeRuntimeNonCriticalException(message));
+    }
+    return PipeTaskMeta;
   }
 
   @Override
diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaDeSerTest.java b/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaDeSerTest.java
new file mode 100644
index 00000000000..08e7f86007c
--- /dev/null
+++ b/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaDeSerTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.task.meta;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+
+public class PipeMetaDeSerTest {
+
+  @Test
+  public void test() throws IOException {
+    PipeStaticMeta pipeStaticMeta =
+        new PipeStaticMeta(
+            "pipeName",
+            123L,
+            new HashMap() {
+              {
+                put("collector-key", "collector-value");
+              }
+            },
+            new HashMap() {
+              {
+                put("processor-key-1", "processor-value-1");
+                put("processor-key-2", "processor-value-2");
+              }
+            },
+            new HashMap() {});
+    ByteBuffer staticByteBuffer = pipeStaticMeta.serialize();
+    PipeStaticMeta pipeStaticMeta1 = PipeStaticMeta.deserialize(staticByteBuffer);
+    Assert.assertEquals(pipeStaticMeta, pipeStaticMeta1);
+
+    PipeRuntimeMeta pipeRuntimeMeta =
+        new PipeRuntimeMeta(
+            new HashMap() {
+              {
+                put(
+                    new TConsensusGroupId(TConsensusGroupType.DataRegion, 456),
+                    new PipeTaskMeta(789, 987));
+                put(
+                    new TConsensusGroupId(TConsensusGroupType.DataRegion, 123),
+                    new PipeTaskMeta(456, 789));
+              }
+            });
+    ByteBuffer runtimeByteBuffer = pipeRuntimeMeta.serialize();
+    PipeRuntimeMeta pipeRuntimeMeta1 = PipeRuntimeMeta.deserialize(runtimeByteBuffer);
+    Assert.assertEquals(pipeRuntimeMeta, pipeRuntimeMeta1);
+
+    PipeMeta pipeMeta = new PipeMeta(pipeStaticMeta, pipeRuntimeMeta);
+    ByteBuffer byteBuffer = pipeMeta.serialize();
+    PipeMeta pipeMeta1 = PipeMeta.deserialize(byteBuffer);
+    Assert.assertEquals(pipeMeta, pipeMeta1);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 3119043f3f6..5fb4f5b7b5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -85,7 +85,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowThrottleReq;
@@ -1645,8 +1644,8 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
       if (showPipeStatement.getWhereClause()) {
         tShowPipeReq.setWhereClause(true);
       }
-      TShowPipeResp resp = configNodeClient.showPipe(tShowPipeReq);
-      List<TShowPipeInfo> tShowPipeInfoList = new ArrayList<>();
+      List<TShowPipeInfo> tShowPipeInfoList =
+          configNodeClient.showPipe(tShowPipeReq).getPipeInfoList();
       ShowPipeTask.buildTSBlock(tShowPipeInfoList, future);
     } catch (Exception e) {
       future.setException(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index bea8913d268..82b7b494f91 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -3384,7 +3384,9 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
     Map<String, String> collectorMap = new HashMap<>();
     for (IoTDBSqlParser.CollectorAttributeClauseContext singleCtx :
         ctx.collectorAttributeClause()) {
-      collectorMap.put(singleCtx.collectorKey.getText(), singleCtx.collectorValue.getText());
+      collectorMap.put(
+          parseStringLiteral(singleCtx.collectorKey.getText()),
+          parseStringLiteral(singleCtx.collectorValue.getText()));
     }
     return collectorMap;
   }
@@ -3394,7 +3396,9 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
     Map<String, String> processorMap = new HashMap<>();
     for (IoTDBSqlParser.ProcessorAttributeClauseContext singleCtx :
         ctx.processorAttributeClause()) {
-      processorMap.put(singleCtx.processorKey.getText(), singleCtx.processorValue.getText());
+      processorMap.put(
+          parseStringLiteral(singleCtx.processorKey.getText()),
+          parseStringLiteral(singleCtx.processorValue.getText()));
     }
     return processorMap;
   }
@@ -3404,7 +3408,9 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
     Map<String, String> connectorMap = new HashMap<>();
     for (IoTDBSqlParser.ConnectorAttributeClauseContext singleCtx :
         ctx.connectorAttributeClause()) {
-      connectorMap.put(singleCtx.connectorKey.getText(), singleCtx.connectorValue.getText());
+      connectorMap.put(
+          parseStringLiteral(singleCtx.connectorKey.getText()),
+          parseStringLiteral(singleCtx.connectorValue.getText()));
     }
     return connectorMap;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
index 5906de3a49a..06485b8cc2e 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeCollectorConstant.java
@@ -22,7 +22,10 @@ package org.apache.iotdb.db.pipe.config;
 public class PipeCollectorConstant {
 
   public static final String COLLECTOR_KEY = "collector";
-  public static final String PATTERN_PATTERN_KEY = "collector.pattern";
+
+  public static final String COLLECTOR_PATTERN_KEY = "collector.pattern";
+  public static final String COLLECTOR_PATTERN_DEFAULT_VALUE = "root";
+
   public static final String DATA_REGION_KEY = "collector.data-region";
 
   private PipeCollectorConstant() {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
index 08e731df136..41e9f5d76fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
@@ -34,14 +34,16 @@ public abstract class PipeRealtimeDataRegionCollector implements PipeCollector {
 
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
-    validator.validateRequiredAttribute(PipeCollectorConstant.PATTERN_PATTERN_KEY);
     validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY);
   }
 
   @Override
   public void customize(
       PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) {
-    pattern = parameters.getString(PipeCollectorConstant.PATTERN_PATTERN_KEY);
+    pattern =
+        parameters.getStringOrDefault(
+            PipeCollectorConstant.COLLECTOR_PATTERN_KEY,
+            PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE);
     dataRegionId = parameters.getString(PipeCollectorConstant.DATA_REGION_KEY);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
index 070e4e05b2c..33a298c5d02 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/PipeConnectorSubtaskManager.java
@@ -24,9 +24,12 @@ import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask;
 import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -38,13 +41,27 @@ public class PipeConnectorSubtaskManager {
       attributeSortedString2SubtaskLifeCycleMap = new HashMap<>();
 
   public synchronized String register(
-      PipeConnectorSubtaskExecutor executor, PipeParameters connectorAttributes) {
+      PipeConnectorSubtaskExecutor executor, PipeParameters pipeConnectorParameters) {
     final String attributeSortedString =
-        new TreeMap<>(connectorAttributes.getAttribute()).toString();
+        new TreeMap<>(pipeConnectorParameters.getAttribute()).toString();
 
     if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
-      final PipeConnector pipeConnector = PipeAgent.plugin().reflectConnector(connectorAttributes);
+      // 1. construct, validate and customize PipeConnector
+      final PipeConnector pipeConnector =
+          PipeAgent.plugin().reflectConnector(pipeConnectorParameters);
+      try {
+        pipeConnector.validate(new PipeParameterValidator(pipeConnectorParameters));
+        final PipeConnectorRuntimeConfiguration runtimeConfiguration =
+            new PipeConnectorRuntimeConfiguration();
+        pipeConnector.customize(pipeConnectorParameters, runtimeConfiguration);
+        // TODO: use runtimeConfiguration to configure PipeConnector
+      } catch (Exception e) {
+        throw new PipeManagementException(
+            "Failed to construct PipeConnector, because of " + e.getMessage(), e);
+      }
+
       // TODO: make pendingQueue size configurable
+      // 2. construct PipeConnectorSubtaskLifeCycle to manage PipeConnectorSubtask's life cycle
       final ListenableBlockingPendingQueue<Event> pendingQueue =
           new ListenableBlockingPendingQueue<>(65535);
       final PipeConnectorSubtask pipeConnectorSubtask =
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 73c127f3dab..31e935256ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -26,7 +26,9 @@ import org.apache.iotdb.db.pipe.core.collector.IoTDBDataRegionCollector;
 import org.apache.iotdb.db.pipe.task.queue.EventSupplier;
 import org.apache.iotdb.db.pipe.task.queue.ListenableUnblockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeCollector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
@@ -47,17 +49,14 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
    */
   private ListenableUnblockingPendingQueue<Event> collectorPendingQueue;
 
-  private PipeCollector pipeCollector;
+  private final PipeCollector pipeCollector;
 
   public PipeTaskCollectorStage(String dataRegionId, PipeParameters collectorParameters) {
     this.collectorParameters = collectorParameters;
     // set data region id to collector parameters, so that collector can get data region id inside
     // collector
     collectorParameters.getAttribute().put(PipeCollectorConstant.DATA_REGION_KEY, dataRegionId);
-  }
 
-  @Override
-  public void createSubtask() throws PipeException {
     if (collectorParameters
         .getStringOrDefault(
             PipeCollectorConstant.COLLECTOR_KEY,
@@ -70,6 +69,22 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
     }
   }
 
+  @Override
+  public void createSubtask() throws PipeException {
+    try {
+      // 1. validate collector parameters
+      pipeCollector.validate(new PipeParameterValidator(collectorParameters));
+
+      // 2. customize collector
+      final PipeCollectorRuntimeConfiguration runtimeConfiguration =
+          new PipeCollectorRuntimeConfiguration();
+      pipeCollector.customize(collectorParameters, runtimeConfiguration);
+      // TODO: use runtimeConfiguration to configure collector
+    } catch (Exception e) {
+      throw new PipeException(e.getMessage(), e);
+    }
+  }
+
   @Override
   public void startSubtask() throws PipeException {
     try {
@@ -94,7 +109,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
   }
 
   public EventSupplier getEventSupplier() {
-    return () -> pipeCollector.supply();
+    return pipeCollector::supply;
   }
 
   public ListenableUnblockingPendingQueue<Event> getCollectorPendingQueue() {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 77bc2de5f44..3ba031a23a4 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -29,7 +29,9 @@ import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.queue.ListenablePendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask;
 import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
@@ -40,7 +42,9 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
   protected final PipeProcessorSubtaskExecutor executor =
       PipeSubtaskExecutorManager.getInstance().getProcessorSubtaskExecutor();
 
-  protected final PipeProcessorSubtask subtask;
+  protected final PipeParameters pipeProcessorParameters;
+  protected final PipeProcessor pipeProcessor;
+  protected final PipeProcessorSubtask pipeProcessorSubtask;
 
   protected final ListenablePendingQueue<Event> pipeCollectorInputPendingQueue;
   protected final ListenablePendingQueue<Event> pipeConnectorOutputPendingQueue;
@@ -61,13 +65,14 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
       @Nullable ListenablePendingQueue<Event> pipeCollectorInputPendingQueue,
       PipeParameters pipeProcessorParameters,
       ListenableBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue) {
+    this.pipeProcessorParameters = pipeProcessorParameters;
+
     final String taskId = pipeName + "_" + dataRegionId;
-    final PipeProcessor pipeProcessor =
-        PipeAgent.plugin().reflectProcessor(pipeProcessorParameters);
+    pipeProcessor = PipeAgent.plugin().reflectProcessor(pipeProcessorParameters);
     final PipeEventCollector pipeConnectorOutputEventCollector =
         new PipeEventCollector(pipeConnectorOutputPendingQueue);
 
-    this.subtask =
+    this.pipeProcessorSubtask =
         new PipeProcessorSubtask(
             taskId,
             pipeCollectorInputEventSupplier,
@@ -81,43 +86,58 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
                     taskId,
                     () -> {
                       if (status == PipeStatus.RUNNING) {
-                        executor.start(subtask.getTaskID());
+                        executor.start(pipeProcessorSubtask.getTaskID());
                       }
                     })
-                .registerNotEmptyToEmptyListener(taskId, () -> executor.stop(subtask.getTaskID()))
+                .registerNotEmptyToEmptyListener(
+                    taskId, () -> executor.stop(pipeProcessorSubtask.getTaskID()))
             : null;
     this.pipeConnectorOutputPendingQueue =
         pipeConnectorOutputPendingQueue
-            .registerNotFullToFullListener(taskId, () -> executor.stop(subtask.getTaskID()))
+            .registerNotFullToFullListener(
+                taskId, () -> executor.stop(pipeProcessorSubtask.getTaskID()))
             .registerFullToNotFullListener(
                 taskId,
                 () -> {
                   // only start when the pipe is running
                   if (status == PipeStatus.RUNNING) {
                     pipeConnectorOutputEventCollector.tryCollectBufferedEvents();
-                    executor.start(subtask.getTaskID());
+                    executor.start(pipeProcessorSubtask.getTaskID());
                   }
                 });
   }
 
   @Override
   public void createSubtask() throws PipeException {
-    executor.register(subtask);
+    try {
+      // 1. validate processor parameters
+      pipeProcessor.validate(new PipeParameterValidator(pipeProcessorParameters));
+
+      // 2. customize processor
+      final PipeProcessorRuntimeConfiguration runtimeConfiguration =
+          new PipeProcessorRuntimeConfiguration();
+      pipeProcessor.customize(pipeProcessorParameters, runtimeConfiguration);
+      // TODO: use runtimeConfiguration to configure processor
+    } catch (Exception e) {
+      throw new PipeException(e.getMessage(), e);
+    }
+
+    executor.register(pipeProcessorSubtask);
   }
 
   @Override
   public void startSubtask() throws PipeException {
-    executor.start(subtask.getTaskID());
+    executor.start(pipeProcessorSubtask.getTaskID());
   }
 
   @Override
   public void stopSubtask() throws PipeException {
-    executor.stop(subtask.getTaskID());
+    executor.stop(pipeProcessorSubtask.getTaskID());
   }
 
   @Override
   public void dropSubtask() throws PipeException {
-    final String taskId = subtask.getTaskID();
+    final String taskId = pipeProcessorSubtask.getTaskID();
 
     if (pipeCollectorInputPendingQueue != null) {
       pipeCollectorInputPendingQueue.removeEmptyToNotEmptyListener(taskId);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index c8931a521f8..6e0f82681a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -804,8 +804,10 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
 
   @Override
   public TSStatus pushPipeMeta(TPushPipeMetaReq req) {
-    List<PipeMeta> pipeMetas = new ArrayList<>();
-    req.getPipeMetas().forEach(byteBuffer -> pipeMetas.add(PipeMeta.deserialize(byteBuffer)));
+    final List<PipeMeta> pipeMetas = new ArrayList<>();
+    for (ByteBuffer byteBuffer : req.getPipeMetas()) {
+      pipeMetas.add(PipeMeta.deserialize(byteBuffer));
+    }
     PipeAgent.task().handlePipeMetaChanges(pipeMetas);
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
index 741e4ddb1b1..d833e2761a0 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
@@ -69,7 +69,7 @@ public class CachedSchemaPatternMatcherTest {
         new PipeParameters(
             new HashMap<String, String>() {
               {
-                put(PipeCollectorConstant.PATTERN_PATTERN_KEY, "root");
+                put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, "root");
                 put(PipeCollectorConstant.DATA_REGION_KEY, "1");
               }
             }),
@@ -85,7 +85,7 @@ public class CachedSchemaPatternMatcherTest {
           new PipeParameters(
               new HashMap<String, String>() {
                 {
-                  put(PipeCollectorConstant.PATTERN_PATTERN_KEY, "root." + finalI1);
+                  put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, "root." + finalI1);
                   put(PipeCollectorConstant.DATA_REGION_KEY, "1");
                 }
               }),
@@ -99,7 +99,9 @@ public class CachedSchemaPatternMatcherTest {
             new PipeParameters(
                 new HashMap<String, String>() {
                   {
-                    put(PipeCollectorConstant.PATTERN_PATTERN_KEY, "root." + finalI + "." + finalJ);
+                    put(
+                        PipeCollectorConstant.COLLECTOR_PATTERN_KEY,
+                        "root." + finalI + "." + finalJ);
                     put(PipeCollectorConstant.DATA_REGION_KEY, "1");
                   }
                 }),
diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index f4e7f7f6843..a2c3508b294 100644
--- a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -111,7 +111,7 @@ public class PipeRealtimeCollectTest {
           new PipeParameters(
               new HashMap<String, String>() {
                 {
-                  put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern1);
+                  put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern1);
                   put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion1);
                 }
               }),
@@ -120,7 +120,7 @@ public class PipeRealtimeCollectTest {
           new PipeParameters(
               new HashMap<String, String>() {
                 {
-                  put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern2);
+                  put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern2);
                   put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion1);
                 }
               }),
@@ -129,7 +129,7 @@ public class PipeRealtimeCollectTest {
           new PipeParameters(
               new HashMap<String, String>() {
                 {
-                  put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern1);
+                  put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern1);
                   put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion2);
                 }
               }),
@@ -138,7 +138,7 @@ public class PipeRealtimeCollectTest {
           new PipeParameters(
               new HashMap<String, String>() {
                 {
-                  put(PipeCollectorConstant.PATTERN_PATTERN_KEY, pattern2);
+                  put(PipeCollectorConstant.COLLECTOR_PATTERN_KEY, pattern2);
                   put(PipeCollectorConstant.DATA_REGION_KEY, dataRegion2);
                 }
               }),