You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/21 15:15:23 UTC

[iotdb] 01/01: Revert "[IOTDB-5893] Pipe: PipeLauncher for data node setup process (#9879)"

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch revert-IOTDB-5893
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9124e21b8a49206a054b0c3fee41f071fb97bb14
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun May 21 23:14:49 2023 +0800

    Revert "[IOTDB-5893] Pipe: PipeLauncher for data node setup process (#9879)"
    
    This reverts commit 6f3f4e0d92f9b01c76990e14490d62c5b6657ad2.
---
 .../iotdb/confignode/manager/ConfigManager.java    |  11 +-
 .../apache/iotdb/confignode/manager/IManager.java  |   8 +
 .../iotdb/confignode/manager/pipe/PipeManager.java |   3 -
 .../manager/pipe/{runtime => }/PipeMetaSyncer.java |   2 +-
 .../pipe/{plugin => }/PipePluginCoordinator.java   |   2 +-
 .../pipe/{runtime => }/PipeRuntimeCoordinator.java |   2 +-
 .../pipe/{task => }/PipeTaskCoordinator.java       |  23 +--
 .../pipe/plugin/CreatePipePluginProcedure.java     |   2 +-
 .../impl/pipe/plugin/DropPipePluginProcedure.java  |   2 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   7 +
 .../apache/iotdb/db/client/ConfigNodeClient.java   |  17 +++
 .../iotdb/db/pipe/agent/runtime/PipeLauncher.java  | 166 ---------------------
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |   9 --
 .../java/org/apache/iotdb/db/service/DataNode.java |  94 +++++++++++-
 .../db/sync/common/ClusterSyncInfoFetcher.java     |  11 +-
 .../src/main/thrift/confignode.thrift              |   9 +-
 16 files changed, 170 insertions(+), 198 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index a23b6b44412..11d2bd2912e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -144,6 +144,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
@@ -1623,10 +1624,18 @@ public class ConfigManager implements IManager {
   public TGetAllPipeInfoResp getAllPipeInfo() {
     TSStatus status = confirmLeader();
     return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        ? pipeManager.getPipeTaskCoordinator().getAllPipeInfo()
+        ? pipeManager.getPipeTaskCoordinator().showPipes()
         : new TGetAllPipeInfoResp().setStatus(status);
   }
 
+  @Override
+  public TSStatus recordPipeMessage(TRecordPipeMessageReq req) {
+    TSStatus status = confirmLeader();
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        ? pipeManager.getPipeTaskCoordinator().recordPipeMessage(req)
+        : status;
+  }
+
   @Override
   public TGetRegionIdResp getRegionId(TGetRegionIdReq req) {
     TSStatus status = confirmLeader();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 5eea6b30fdf..4a305f822bc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -87,6 +87,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
@@ -597,6 +598,13 @@ public interface IManager {
    */
   TGetAllPipeInfoResp getAllPipeInfo();
 
+  /**
+   * Record PipeMessage
+   *
+   * @return TSStatus
+   */
+  TSStatus recordPipeMessage(TRecordPipeMessageReq req);
+
   /**
    * Get RegionId。used for Show cluster slots information in
    * docs/zh/UserGuide/Cluster/Cluster-Maintenance.md.
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java
index be9440796fa..ec9ffd63009 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java
@@ -20,9 +20,6 @@
 package org.apache.iotdb.confignode.manager.pipe;
 
 import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.manager.pipe.plugin.PipePluginCoordinator;
-import org.apache.iotdb.confignode.manager.pipe.runtime.PipeRuntimeCoordinator;
-import org.apache.iotdb.confignode.manager.pipe.task.PipeTaskCoordinator;
 import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
 
 public class PipeManager {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeMetaSyncer.java
similarity index 97%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeMetaSyncer.java
index ba9bba61c6a..515abfdbdd1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeMetaSyncer.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.runtime;
+package org.apache.iotdb.confignode.manager.pipe;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
similarity index 98%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
index 7435344edbf..8943a28e8c2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.plugin;
+package org.apache.iotdb.confignode.manager.pipe;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
similarity index 98%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
index 9a117a0fbd1..81df8587176 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.runtime;
+package org.apache.iotdb.confignode.manager.pipe;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
similarity index 91%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
index 573c1cb1a80..e01f4b785bc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.pipe.task;
+package org.apache.iotdb.confignode.manager.pipe;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.confignode.consensus.request.read.pipe.task.ShowPipePlanV2;
@@ -25,6 +25,7 @@ import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -75,14 +76,7 @@ public class PipeTaskCoordinator {
     return configManager.getProcedureManager().dropPipe(pipeName);
   }
 
-  public TShowPipeResp showPipes(TShowPipeReq req) {
-    return ((PipeTableResp)
-            configManager.getConsensusManager().read(new ShowPipePlanV2()).getDataset())
-        .filter(req.whereClause, req.pipeName)
-        .convertToTShowPipeResp();
-  }
-
-  public TGetAllPipeInfoResp getAllPipeInfo() {
+  public TGetAllPipeInfoResp showPipes() {
     try {
       return ((PipeTableResp)
               configManager.getConsensusManager().read(new ShowPipePlanV2()).getDataset())
@@ -95,4 +89,15 @@ public class PipeTaskCoordinator {
           Collections.emptyList());
     }
   }
+
+  public TShowPipeResp showPipes(TShowPipeReq req) {
+    return ((PipeTableResp)
+            configManager.getConsensusManager().read(new ShowPipePlanV2()).getDataset())
+        .filter(req.whereClause, req.pipeName)
+        .convertToTShowPipeResp();
+  }
+
+  public TSStatus recordPipeMessage(TRecordPipeMessageReq req) {
+    throw new UnsupportedOperationException("Not implemented yet");
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
index 88f40bdc114..b4790661278 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
 import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.manager.pipe.plugin.PipePluginCoordinator;
+import org.apache.iotdb.confignode.manager.pipe.PipePluginCoordinator;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index d44df9ac580..7c8268ecd02 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.confignode.procedure.impl.pipe.plugin;
 
 import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
-import org.apache.iotdb.confignode.manager.pipe.plugin.PipePluginCoordinator;
+import org.apache.iotdb.confignode.manager.pipe.PipePluginCoordinator;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index c2b44e07de1..22367d7c164 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -127,6 +127,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
@@ -908,6 +909,12 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
     return configManager.getAllPipeInfo();
   }
 
+  @Override
+  @Deprecated
+  public TSStatus recordPipeMessage(TRecordPipeMessageReq req) {
+    return configManager.recordPipeMessage(req);
+  }
+
   @Override
   public TGetRegionIdResp getRegionId(TGetRegionIdReq req) {
     if (req.isSetTimeStamp() && req.getType() != TConsensusGroupType.DataRegion) {
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 4376f6a4eec..ca35b446fc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -96,6 +96,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
@@ -1879,6 +1880,22 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
     throw new TException(MSG_RECONNECTION_FAIL);
   }
 
+  @Override
+  public TSStatus recordPipeMessage(TRecordPipeMessageReq req) throws TException {
+    for (int i = 0; i < RETRY_NUM; i++) {
+      try {
+        TSStatus status = client.recordPipeMessage(req);
+        if (!updateConfigNodeLeader(status)) {
+          return status;
+        }
+      } catch (TException e) {
+        configLeader = null;
+      }
+      waitAndReconnect();
+    }
+    throw new TException(MSG_RECONNECTION_FAIL);
+  }
+
   @Override
   public TGetRegionIdResp getRegionId(TGetRegionIdReq req) throws TException {
     for (int i = 0; i < RETRY_NUM; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java
deleted file mode 100644
index 2d0d45ebc02..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.agent.runtime;
-
-import org.apache.iotdb.commons.client.exception.ClientManagerException;
-import org.apache.iotdb.commons.exception.StartupException;
-import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
-import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
-import org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
-import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
-import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
-import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
-import org.apache.iotdb.db.client.ConfigNodeClient;
-import org.apache.iotdb.db.client.ConfigNodeClientManager;
-import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.service.ResourcesInformationHolder;
-import org.apache.iotdb.pipe.api.exception.PipeManagementException;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.TException;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class PipeLauncher {
-
-  private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig();
-
-  public void launchPipePluginAgent(ResourcesInformationHolder resourcesInformationHolder)
-      throws StartupException {
-    initPipePluginRelatedInstances();
-
-    if (resourcesInformationHolder.getPipePluginMetaList() == null
-        || resourcesInformationHolder.getPipePluginMetaList().isEmpty()) {
-      return;
-    }
-
-    final List<PipePluginMeta> uninstalledOrConflictedPipePluginMetaList =
-        getUninstalledOrConflictedPipePluginMetaList(resourcesInformationHolder);
-    int index = 0;
-    while (index < uninstalledOrConflictedPipePluginMetaList.size()) {
-      List<PipePluginMeta> curList = new ArrayList<>();
-      int offset = 0;
-      while (offset < ResourcesInformationHolder.getJarNumOfOneRpc()
-          && index + offset < uninstalledOrConflictedPipePluginMetaList.size()) {
-        curList.add(uninstalledOrConflictedPipePluginMetaList.get(index + offset));
-        offset++;
-      }
-      index += (offset + 1);
-      fetchAndSavePipePluginJars(curList);
-    }
-
-    // create instances of pipe plugins and do registration
-    try {
-      for (PipePluginMeta meta : resourcesInformationHolder.getPipePluginMetaList()) {
-        if (meta.isBuiltin()) {
-          continue;
-        }
-        PipeAgent.plugin().doRegister(meta);
-      }
-    } catch (Exception e) {
-      throw new StartupException(e);
-    }
-  }
-
-  private void initPipePluginRelatedInstances() throws StartupException {
-    try {
-      PipePluginExecutableManager.setupAndGetInstance(
-          IOTDB_CONFIG.getPipeTemporaryLibDir(), IOTDB_CONFIG.getPipeDir());
-      PipePluginClassLoaderManager.setupAndGetInstance(IOTDB_CONFIG.getPipeDir());
-    } catch (IOException e) {
-      throw new StartupException(e);
-    }
-  }
-
-  private List<PipePluginMeta> getUninstalledOrConflictedPipePluginMetaList(
-      ResourcesInformationHolder resourcesInformationHolder) {
-    final List<PipePluginMeta> pipePluginMetaList = new ArrayList<>();
-    for (PipePluginMeta pipePluginMeta : resourcesInformationHolder.getPipePluginMetaList()) {
-      if (pipePluginMeta.isBuiltin()) {
-        continue;
-      }
-      // If jar does not exist, add current pipePluginMeta to list
-      if (!PipePluginExecutableManager.getInstance()
-          .hasFileUnderInstallDir(pipePluginMeta.getJarName())) {
-        pipePluginMetaList.add(pipePluginMeta);
-      } else {
-        try {
-          // local jar has conflicts with jar on config node, add current pipePluginMeta to list
-          if (!PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) {
-            pipePluginMetaList.add(pipePluginMeta);
-          }
-        } catch (PipeManagementException e) {
-          pipePluginMetaList.add(pipePluginMeta);
-        }
-      }
-    }
-    return pipePluginMetaList;
-  }
-
-  private void fetchAndSavePipePluginJars(List<PipePluginMeta> pipePluginMetaList)
-      throws StartupException {
-    try (ConfigNodeClient configNodeClient =
-        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
-      final List<String> jarNameList =
-          pipePluginMetaList.stream().map(PipePluginMeta::getJarName).collect(Collectors.toList());
-      final TGetJarInListResp resp =
-          configNodeClient.getPipePluginJar(new TGetJarInListReq(jarNameList));
-      if (resp.getStatus().getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
-        throw new StartupException("Failed to get pipe plugin jar from config node.");
-      }
-      final List<ByteBuffer> jarList = resp.getJarList();
-      for (int i = 0; i < pipePluginMetaList.size(); i++) {
-        PipePluginExecutableManager.getInstance()
-            .saveToInstallDir(jarList.get(i), pipePluginMetaList.get(i).getJarName());
-      }
-    } catch (IOException | TException | ClientManagerException e) {
-      throw new StartupException(e);
-    }
-  }
-
-  public void launchPipeTaskAgent() throws StartupException {
-    try (final ConfigNodeClient configNodeClient =
-        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
-      final TGetAllPipeInfoResp getAllPipeInfoResp = configNodeClient.getAllPipeInfo();
-      if (getAllPipeInfoResp.getStatus().getCode()
-          == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
-        throw new StartupException("Failed to get pipe task meta from config node.");
-      }
-
-      PipeAgent.task()
-          .handlePipeMetaChanges(
-              getAllPipeInfoResp.getAllPipeInfo().stream()
-                  .map(PipeMeta::deserialize)
-                  .collect(Collectors.toList()));
-    } catch (StartupException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new StartupException(e);
-    }
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 2d85aac9e8c..1160ff4a845 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -19,9 +19,7 @@
 
 package org.apache.iotdb.db.pipe.agent.runtime;
 
-import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
-import org.apache.iotdb.db.service.ResourcesInformationHolder;
 import org.apache.iotdb.pipe.api.exception.PipeRuntimeException;
 
 import org.slf4j.Logger;
@@ -31,13 +29,6 @@ public class PipeRuntimeAgent {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeRuntimeAgent.class);
 
-  public synchronized void launch(ResourcesInformationHolder resourcesInformationHolder)
-      throws StartupException {
-    final PipeLauncher pipeLauncher = new PipeLauncher();
-    pipeLauncher.launchPipePluginAgent(resourcesInformationHolder);
-    pipeLauncher.launchPipeTaskAgent();
-  }
-
   public void report(PipeSubtask subtask) {
     // TODO: terminate the task by the given taskID
     LOGGER.warn(
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 44a5690db8c..3e0f39ac4ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -31,6 +31,8 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
+import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
+import org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
 import org.apache.iotdb.commons.service.JMXService;
 import org.apache.iotdb.commons.service.RegisterManager;
 import org.apache.iotdb.commons.service.metric.MetricService;
@@ -82,6 +84,7 @@ import org.apache.iotdb.db.wal.WALManager;
 import org.apache.iotdb.db.wal.utils.WALMode;
 import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
 import org.apache.iotdb.metrics.utils.InternalReporterType;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.udf.api.exception.UDFManagementException;
 
@@ -458,7 +461,7 @@ public class DataNode implements DataNodeMBean {
   private void prepareResources() throws StartupException {
     prepareUDFResources();
     prepareTriggerResources();
-    preparePipeResources();
+    preparePipePluginResources();
   }
 
   /** register services and set up DataNode */
@@ -833,8 +836,93 @@ public class DataNode implements DataNodeMBean {
     }
   }
 
-  private void preparePipeResources() throws StartupException {
-    PipeAgent.runtime().launch(resourcesInformationHolder);
+  private void preparePipePluginResources() throws StartupException {
+    initPipePluginRelatedInstance();
+    if (resourcesInformationHolder.getPipePluginMetaList() == null
+        || resourcesInformationHolder.getPipePluginMetaList().isEmpty()) {
+      return;
+    }
+
+    // get jars from config node
+    List<PipePluginMeta> pipePluginNeedJarList = getJarListForPipePlugin();
+    int index = 0;
+    while (index < pipePluginNeedJarList.size()) {
+      List<PipePluginMeta> curList = new ArrayList<>();
+      int offset = 0;
+      while (offset < ResourcesInformationHolder.getJarNumOfOneRpc()
+          && index + offset < pipePluginNeedJarList.size()) {
+        curList.add(pipePluginNeedJarList.get(index + offset));
+        offset++;
+      }
+      index += (offset + 1);
+      getJarOfPipePlugins(curList);
+    }
+
+    // create instances of pipe plugins and do registration
+    try {
+      for (PipePluginMeta meta : resourcesInformationHolder.getPipePluginMetaList()) {
+        if (meta.isBuiltin()) {
+          continue;
+        }
+        PipeAgent.plugin().doRegister(meta);
+      }
+    } catch (Exception e) {
+      throw new StartupException(e);
+    }
+  }
+
+  private void initPipePluginRelatedInstance() throws StartupException {
+    try {
+      PipePluginExecutableManager.setupAndGetInstance(
+          config.getPipeTemporaryLibDir(), config.getPipeDir());
+      PipePluginClassLoaderManager.setupAndGetInstance(config.getPipeDir());
+    } catch (IOException e) {
+      throw new StartupException(e);
+    }
+  }
+
+  private List<PipePluginMeta> getJarListForPipePlugin() {
+    List<PipePluginMeta> res = new ArrayList<>();
+    for (PipePluginMeta pipePluginMeta : resourcesInformationHolder.getPipePluginMetaList()) {
+      if (pipePluginMeta.isBuiltin()) {
+        continue;
+      }
+      // If jar does not exist, add current pipePluginMeta to list
+      if (!PipePluginExecutableManager.getInstance()
+          .hasFileUnderInstallDir(pipePluginMeta.getJarName())) {
+        res.add(pipePluginMeta);
+      } else {
+        try {
+          // local jar has conflicts with jar on config node, add current pipePluginMeta to list
+          if (!PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) {
+            res.add(pipePluginMeta);
+          }
+        } catch (PipeManagementException e) {
+          res.add(pipePluginMeta);
+        }
+      }
+    }
+    return res;
+  }
+
+  private void getJarOfPipePlugins(List<PipePluginMeta> pipePluginMetaList)
+      throws StartupException {
+    try (ConfigNodeClient configNodeClient =
+        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+      List<String> jarNameList =
+          pipePluginMetaList.stream().map(PipePluginMeta::getJarName).collect(Collectors.toList());
+      TGetJarInListResp resp = configNodeClient.getPipePluginJar(new TGetJarInListReq(jarNameList));
+      if (resp.getStatus().getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
+        throw new StartupException("Failed to get pipe plugin jar from config node.");
+      }
+      List<ByteBuffer> jarList = resp.getJarList();
+      for (int i = 0; i < pipePluginMetaList.size(); i++) {
+        PipePluginExecutableManager.getInstance()
+            .saveToInstallDir(jarList.get(i), pipePluginMetaList.get(i).getJarName());
+      }
+    } catch (IOException | TException | ClientManagerException e) {
+      throw new StartupException(e);
+    }
   }
 
   private void getPipeInformationList(List<ByteBuffer> allPipeInformation) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
index 929a7ce4f47..6c4c6282a7f 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.sync.pipesink.PipeSink;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
 import org.apache.iotdb.db.client.ConfigNodeClient;
 import org.apache.iotdb.db.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
@@ -125,7 +126,15 @@ public class ClusterSyncInfoFetcher implements ISyncInfoFetcher {
 
   @Override
   public TSStatus recordMsg(String pipeName, PipeMessage message) {
-    return RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, "method not supported");
+    try (ConfigNodeClient configNodeClient =
+        CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+      TRecordPipeMessageReq req =
+          new TRecordPipeMessageReq(pipeName, message.serializeToByteBuffer());
+      return configNodeClient.recordPipeMessage(req);
+    } catch (Exception e) {
+      LOGGER.error("RecordMsg error because {}", e.getMessage(), e);
+      return RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, e.getMessage());
+    }
   }
 
   // endregion
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 13f3461981a..7607d0cedfc 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -600,7 +600,11 @@ struct TGetPathsSetTemplatesResp {
   2: optional list<string> pathList
 }
 
-// Pipe
+// SYNC
+struct TRecordPipeMessageReq{
+  1: required string pipeName
+  2: required binary message
+}
 
 struct TShowPipeInfo {
   1: required string id
@@ -1290,6 +1294,9 @@ service IConfigNodeRPCService {
   /* Get all pipe information. It is used for DataNode registration and restart*/
   TGetAllPipeInfoResp getAllPipeInfo();
 
+  /* Get all pipe information. It is used for DataNode registration and restart*/
+  common.TSStatus recordPipeMessage(TRecordPipeMessageReq req);
+
   // ======================================================
   // TestTools
   // ======================================================