You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/21 15:15:22 UTC

[iotdb] branch revert-IOTDB-5893 created (now 9124e21b8a4)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a change to branch revert-IOTDB-5893
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 9124e21b8a4 Revert "[IOTDB-5893] Pipe: PipeLauncher for data node setup process (#9879)"

This branch includes the following new commits:

     new 9124e21b8a4 Revert "[IOTDB-5893] Pipe: PipeLauncher for data node setup process (#9879)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Revert "[IOTDB-5893] Pipe: PipeLauncher for data node setup process (#9879)"

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch revert-IOTDB-5893
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9124e21b8a49206a054b0c3fee41f071fb97bb14
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun May 21 23:14:49 2023 +0800

    Revert "[IOTDB-5893] Pipe: PipeLauncher for data node setup process (#9879)"
    
    This reverts commit 6f3f4e0d92f9b01c76990e14490d62c5b6657ad2.
---
 .../iotdb/confignode/manager/ConfigManager.java    |  11 +-
 .../apache/iotdb/confignode/manager/IManager.java  |   8 +
 .../iotdb/confignode/manager/pipe/PipeManager.java |   3 -
 .../manager/pipe/{runtime => }/PipeMetaSyncer.java |   2 +-
 .../pipe/{plugin => }/PipePluginCoordinator.java   |   2 +-
 .../pipe/{runtime => }/PipeRuntimeCoordinator.java |   2 +-
 .../pipe/{task => }/PipeTaskCoordinator.java       |  23 +--
 .../pipe/plugin/CreatePipePluginProcedure.java     |   2 +-
 .../impl/pipe/plugin/DropPipePluginProcedure.java  |   2 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   7 +
 .../apache/iotdb/db/client/ConfigNodeClient.java   |  17 +++
 .../iotdb/db/pipe/agent/runtime/PipeLauncher.java  | 166 ---------------------
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |   9 --
 .../java/org/apache/iotdb/db/service/DataNode.java |  94 +++++++++++-
 .../db/sync/common/ClusterSyncInfoFetcher.java     |  11 +-
 .../src/main/thrift/confignode.thrift              |   9 +-
 16 files changed, 170 insertions(+), 198 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index a23b6b44412..11d2bd2912e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -144,6 +144,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
@@ -1623,10 +1624,18 @@ public class ConfigManager implements IManager {
   public TGetAllPipeInfoResp getAllPipeInfo() {
     TSStatus status = confirmLeader();
     return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        ? pipeManager.getPipeTaskCoordinator().getAllPipeInfo()
+        ? pipeManager.getPipeTaskCoordinator().showPipes()
         : new TGetAllPipeInfoResp().setStatus(status);
   }
 
+  @Override
+  public TSStatus recordPipeMessage(TRecordPipeMessageReq req) {
+    TSStatus status = confirmLeader();
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        ? pipeManager.getPipeTaskCoordinator().recordPipeMessage(req)
+        : status;
+  }
+
   @Override
   public TGetRegionIdResp getRegionId(TGetRegionIdReq req) {
     TSStatus status = confirmLeader();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 5eea6b30fdf..4a305f822bc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -87,6 +87,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
@@ -597,6 +598,13 @@ public interface IManager {
    */
   TGetAllPipeInfoResp getAllPipeInfo();
 
+  /**
+   * Record PipeMessage
+   *
+   * @return TSStatus
+   */
+  TSStatus recordPipeMessage(TRecordPipeMessageReq req);
+
   /**
    * Get RegionId。used for Show cluster slots information in
    * docs/zh/UserGuide/Cluster/Cluster-Maintenance.md.
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java
index be9440796fa..ec9ffd63009 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java
@@ -20,9 +20,6 @@
 package org.apache.iotdb.confignode.manager.pipe;
 
 import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.manager.pipe.plugin.PipePluginCoordinator;
-import org.apache.iotdb.confignode.manager.pipe.runtime.PipeRuntimeCoordinator;
-import org.apache.iotdb.confignode.manager.pipe.task.PipeTaskCoordinator;
 import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
 
 public class PipeManager {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeMetaSyncer.java
similarity index 97%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeMetaSyncer.java
index ba9bba61c6a..515abfdbdd1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeMetaSyncer.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.runtime;
+package org.apache.iotdb.confignode.manager.pipe;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
similarity index 98%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
index 7435344edbf..8943a28e8c2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.plugin;
+package org.apache.iotdb.confignode.manager.pipe;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
similarity index 98%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
index 9a117a0fbd1..81df8587176 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.runtime;
+package org.apache.iotdb.confignode.manager.pipe;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
similarity index 91%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
index 573c1cb1a80..e01f4b785bc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.pipe.task;
+package org.apache.iotdb.confignode.manager.pipe;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.confignode.consensus.request.read.pipe.task.ShowPipePlanV2;
@@ -25,6 +25,7 @@ import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -75,14 +76,7 @@ public class PipeTaskCoordinator {
     return configManager.getProcedureManager().dropPipe(pipeName);
   }
 
-  public TShowPipeResp showPipes(TShowPipeReq req) {
-    return ((PipeTableResp)
-            configManager.getConsensusManager().read(new ShowPipePlanV2()).getDataset())
-        .filter(req.whereClause, req.pipeName)
-        .convertToTShowPipeResp();
-  }
-
-  public TGetAllPipeInfoResp getAllPipeInfo() {
+  public TGetAllPipeInfoResp showPipes() {
     try {
       return ((PipeTableResp)
               configManager.getConsensusManager().read(new ShowPipePlanV2()).getDataset())
@@ -95,4 +89,15 @@ public class PipeTaskCoordinator {
           Collections.emptyList());
     }
   }
+
+  public TShowPipeResp showPipes(TShowPipeReq req) {
+    return ((PipeTableResp)
+            configManager.getConsensusManager().read(new ShowPipePlanV2()).getDataset())
+        .filter(req.whereClause, req.pipeName)
+        .convertToTShowPipeResp();
+  }
+
+  public TSStatus recordPipeMessage(TRecordPipeMessageReq req) {
+    throw new UnsupportedOperationException("Not implemented yet");
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
index 88f40bdc114..b4790661278 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
 import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.manager.pipe.plugin.PipePluginCoordinator;
+import org.apache.iotdb.confignode.manager.pipe.PipePluginCoordinator;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index d44df9ac580..7c8268ecd02 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.confignode.procedure.impl.pipe.plugin;
 
 import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
-import org.apache.iotdb.confignode.manager.pipe.plugin.PipePluginCoordinator;
+import org.apache.iotdb.confignode.manager.pipe.PipePluginCoordinator;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index c2b44e07de1..22367d7c164 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -127,6 +127,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
@@ -908,6 +909,12 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
     return configManager.getAllPipeInfo();
   }
 
+  @Override
+  @Deprecated
+  public TSStatus recordPipeMessage(TRecordPipeMessageReq req) {
+    return configManager.recordPipeMessage(req);
+  }
+
   @Override
   public TGetRegionIdResp getRegionId(TGetRegionIdReq req) {
     if (req.isSetTimeStamp() && req.getType() != TConsensusGroupType.DataRegion) {
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 4376f6a4eec..ca35b446fc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -96,6 +96,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
@@ -1879,6 +1880,22 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
     throw new TException(MSG_RECONNECTION_FAIL);
   }
 
+  @Override
+  public TSStatus recordPipeMessage(TRecordPipeMessageReq req) throws TException {
+    for (int i = 0; i < RETRY_NUM; i++) {
+      try {
+        TSStatus status = client.recordPipeMessage(req);
+        if (!updateConfigNodeLeader(status)) {
+          return status;
+        }
+      } catch (TException e) {
+        configLeader = null;
+      }
+      waitAndReconnect();
+    }
+    throw new TException(MSG_RECONNECTION_FAIL);
+  }
+
   @Override
   public TGetRegionIdResp getRegionId(TGetRegionIdReq req) throws TException {
     for (int i = 0; i < RETRY_NUM; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java
deleted file mode 100644
index 2d0d45ebc02..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.agent.runtime;
-
-import org.apache.iotdb.commons.client.exception.ClientManagerException;
-import org.apache.iotdb.commons.exception.StartupException;
-import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
-import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
-import org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
-import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
-import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
-import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
-import org.apache.iotdb.db.client.ConfigNodeClient;
-import org.apache.iotdb.db.client.ConfigNodeClientManager;
-import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.service.ResourcesInformationHolder;
-import org.apache.iotdb.pipe.api.exception.PipeManagementException;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.TException;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class PipeLauncher {
-
-  private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig();
-
-  public void launchPipePluginAgent(ResourcesInformationHolder resourcesInformationHolder)
-      throws StartupException {
-    initPipePluginRelatedInstances();
-
-    if (resourcesInformationHolder.getPipePluginMetaList() == null
-        || resourcesInformationHolder.getPipePluginMetaList().isEmpty()) {
-      return;
-    }
-
-    final List<PipePluginMeta> uninstalledOrConflictedPipePluginMetaList =
-        getUninstalledOrConflictedPipePluginMetaList(resourcesInformationHolder);
-    int index = 0;
-    while (index < uninstalledOrConflictedPipePluginMetaList.size()) {
-      List<PipePluginMeta> curList = new ArrayList<>();
-      int offset = 0;
-      while (offset < ResourcesInformationHolder.getJarNumOfOneRpc()
-          && index + offset < uninstalledOrConflictedPipePluginMetaList.size()) {
-        curList.add(uninstalledOrConflictedPipePluginMetaList.get(index + offset));
-        offset++;
-      }
-      index += (offset + 1);
-      fetchAndSavePipePluginJars(curList);
-    }
-
-    // create instances of pipe plugins and do registration
-    try {
-      for (PipePluginMeta meta : resourcesInformationHolder.getPipePluginMetaList()) {
-        if (meta.isBuiltin()) {
-          continue;
-        }
-        PipeAgent.plugin().doRegister(meta);
-      }
-    } catch (Exception e) {
-      throw new StartupException(e);
-    }
-  }
-
-  private void initPipePluginRelatedInstances() throws StartupException {
-    try {
-      PipePluginExecutableManager.setupAndGetInstance(
-          IOTDB_CONFIG.getPipeTemporaryLibDir(), IOTDB_CONFIG.getPipeDir());
-      PipePluginClassLoaderManager.setupAndGetInstance(IOTDB_CONFIG.getPipeDir());
-    } catch (IOException e) {
-      throw new StartupException(e);
-    }
-  }
-
-  private List<PipePluginMeta> getUninstalledOrConflictedPipePluginMetaList(
-      ResourcesInformationHolder resourcesInformationHolder) {
-    final List<PipePluginMeta> pipePluginMetaList = new ArrayList<>();
-    for (PipePluginMeta pipePluginMeta : resourcesInformationHolder.getPipePluginMetaList()) {
-      if (pipePluginMeta.isBuiltin()) {
-        continue;
-      }
-      // If jar does not exist, add current pipePluginMeta to list
-      if (!PipePluginExecutableManager.getInstance()
-          .hasFileUnderInstallDir(pipePluginMeta.getJarName())) {
-        pipePluginMetaList.add(pipePluginMeta);
-      } else {
-        try {
-          // local jar has conflicts with jar on config node, add current pipePluginMeta to list
-          if (!PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) {
-            pipePluginMetaList.add(pipePluginMeta);
-          }
-        } catch (PipeManagementException e) {
-          pipePluginMetaList.add(pipePluginMeta);
-        }
-      }
-    }
-    return pipePluginMetaList;
-  }
-
-  private void fetchAndSavePipePluginJars(List<PipePluginMeta> pipePluginMetaList)
-      throws StartupException {
-    try (ConfigNodeClient configNodeClient =
-        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
-      final List<String> jarNameList =
-          pipePluginMetaList.stream().map(PipePluginMeta::getJarName).collect(Collectors.toList());
-      final TGetJarInListResp resp =
-          configNodeClient.getPipePluginJar(new TGetJarInListReq(jarNameList));
-      if (resp.getStatus().getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
-        throw new StartupException("Failed to get pipe plugin jar from config node.");
-      }
-      final List<ByteBuffer> jarList = resp.getJarList();
-      for (int i = 0; i < pipePluginMetaList.size(); i++) {
-        PipePluginExecutableManager.getInstance()
-            .saveToInstallDir(jarList.get(i), pipePluginMetaList.get(i).getJarName());
-      }
-    } catch (IOException | TException | ClientManagerException e) {
-      throw new StartupException(e);
-    }
-  }
-
-  public void launchPipeTaskAgent() throws StartupException {
-    try (final ConfigNodeClient configNodeClient =
-        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
-      final TGetAllPipeInfoResp getAllPipeInfoResp = configNodeClient.getAllPipeInfo();
-      if (getAllPipeInfoResp.getStatus().getCode()
-          == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
-        throw new StartupException("Failed to get pipe task meta from config node.");
-      }
-
-      PipeAgent.task()
-          .handlePipeMetaChanges(
-              getAllPipeInfoResp.getAllPipeInfo().stream()
-                  .map(PipeMeta::deserialize)
-                  .collect(Collectors.toList()));
-    } catch (StartupException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new StartupException(e);
-    }
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 2d85aac9e8c..1160ff4a845 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -19,9 +19,7 @@
 
 package org.apache.iotdb.db.pipe.agent.runtime;
 
-import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
-import org.apache.iotdb.db.service.ResourcesInformationHolder;
 import org.apache.iotdb.pipe.api.exception.PipeRuntimeException;
 
 import org.slf4j.Logger;
@@ -31,13 +29,6 @@ public class PipeRuntimeAgent {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PipeRuntimeAgent.class);
 
-  public synchronized void launch(ResourcesInformationHolder resourcesInformationHolder)
-      throws StartupException {
-    final PipeLauncher pipeLauncher = new PipeLauncher();
-    pipeLauncher.launchPipePluginAgent(resourcesInformationHolder);
-    pipeLauncher.launchPipeTaskAgent();
-  }
-
   public void report(PipeSubtask subtask) {
     // TODO: terminate the task by the given taskID
     LOGGER.warn(
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 44a5690db8c..3e0f39ac4ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -31,6 +31,8 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
+import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
+import org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
 import org.apache.iotdb.commons.service.JMXService;
 import org.apache.iotdb.commons.service.RegisterManager;
 import org.apache.iotdb.commons.service.metric.MetricService;
@@ -82,6 +84,7 @@ import org.apache.iotdb.db.wal.WALManager;
 import org.apache.iotdb.db.wal.utils.WALMode;
 import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
 import org.apache.iotdb.metrics.utils.InternalReporterType;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.udf.api.exception.UDFManagementException;
 
@@ -458,7 +461,7 @@ public class DataNode implements DataNodeMBean {
   private void prepareResources() throws StartupException {
     prepareUDFResources();
     prepareTriggerResources();
-    preparePipeResources();
+    preparePipePluginResources();
   }
 
   /** register services and set up DataNode */
@@ -833,8 +836,93 @@ public class DataNode implements DataNodeMBean {
     }
   }
 
-  private void preparePipeResources() throws StartupException {
-    PipeAgent.runtime().launch(resourcesInformationHolder);
+  private void preparePipePluginResources() throws StartupException {
+    initPipePluginRelatedInstance();
+    if (resourcesInformationHolder.getPipePluginMetaList() == null
+        || resourcesInformationHolder.getPipePluginMetaList().isEmpty()) {
+      return;
+    }
+
+    // get jars from config node
+    List<PipePluginMeta> pipePluginNeedJarList = getJarListForPipePlugin();
+    int index = 0;
+    while (index < pipePluginNeedJarList.size()) {
+      List<PipePluginMeta> curList = new ArrayList<>();
+      int offset = 0;
+      while (offset < ResourcesInformationHolder.getJarNumOfOneRpc()
+          && index + offset < pipePluginNeedJarList.size()) {
+        curList.add(pipePluginNeedJarList.get(index + offset));
+        offset++;
+      }
+      index += (offset + 1);
+      getJarOfPipePlugins(curList);
+    }
+
+    // create instances of pipe plugins and do registration
+    try {
+      for (PipePluginMeta meta : resourcesInformationHolder.getPipePluginMetaList()) {
+        if (meta.isBuiltin()) {
+          continue;
+        }
+        PipeAgent.plugin().doRegister(meta);
+      }
+    } catch (Exception e) {
+      throw new StartupException(e);
+    }
+  }
+
+  private void initPipePluginRelatedInstance() throws StartupException {
+    try {
+      PipePluginExecutableManager.setupAndGetInstance(
+          config.getPipeTemporaryLibDir(), config.getPipeDir());
+      PipePluginClassLoaderManager.setupAndGetInstance(config.getPipeDir());
+    } catch (IOException e) {
+      throw new StartupException(e);
+    }
+  }
+
+  private List<PipePluginMeta> getJarListForPipePlugin() {
+    List<PipePluginMeta> res = new ArrayList<>();
+    for (PipePluginMeta pipePluginMeta : resourcesInformationHolder.getPipePluginMetaList()) {
+      if (pipePluginMeta.isBuiltin()) {
+        continue;
+      }
+      // If jar does not exist, add current pipePluginMeta to list
+      if (!PipePluginExecutableManager.getInstance()
+          .hasFileUnderInstallDir(pipePluginMeta.getJarName())) {
+        res.add(pipePluginMeta);
+      } else {
+        try {
+          // local jar has conflicts with jar on config node, add current pipePluginMeta to list
+          if (!PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) {
+            res.add(pipePluginMeta);
+          }
+        } catch (PipeManagementException e) {
+          res.add(pipePluginMeta);
+        }
+      }
+    }
+    return res;
+  }
+
+  private void getJarOfPipePlugins(List<PipePluginMeta> pipePluginMetaList)
+      throws StartupException {
+    try (ConfigNodeClient configNodeClient =
+        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+      List<String> jarNameList =
+          pipePluginMetaList.stream().map(PipePluginMeta::getJarName).collect(Collectors.toList());
+      TGetJarInListResp resp = configNodeClient.getPipePluginJar(new TGetJarInListReq(jarNameList));
+      if (resp.getStatus().getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
+        throw new StartupException("Failed to get pipe plugin jar from config node.");
+      }
+      List<ByteBuffer> jarList = resp.getJarList();
+      for (int i = 0; i < pipePluginMetaList.size(); i++) {
+        PipePluginExecutableManager.getInstance()
+            .saveToInstallDir(jarList.get(i), pipePluginMetaList.get(i).getJarName());
+      }
+    } catch (IOException | TException | ClientManagerException e) {
+      throw new StartupException(e);
+    }
   }
 
   private void getPipeInformationList(List<ByteBuffer> allPipeInformation) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
index 929a7ce4f47..6c4c6282a7f 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.sync.pipesink.PipeSink;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
 import org.apache.iotdb.db.client.ConfigNodeClient;
 import org.apache.iotdb.db.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
@@ -125,7 +126,15 @@ public class ClusterSyncInfoFetcher implements ISyncInfoFetcher {
 
   @Override
   public TSStatus recordMsg(String pipeName, PipeMessage message) {
-    return RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, "method not supported");
+    try (ConfigNodeClient configNodeClient =
+        CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+      TRecordPipeMessageReq req =
+          new TRecordPipeMessageReq(pipeName, message.serializeToByteBuffer());
+      return configNodeClient.recordPipeMessage(req);
+    } catch (Exception e) {
+      LOGGER.error("RecordMsg error because {}", e.getMessage(), e);
+      return RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, e.getMessage());
+    }
   }
 
   // endregion
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 13f3461981a..7607d0cedfc 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -600,7 +600,11 @@ struct TGetPathsSetTemplatesResp {
   2: optional list<string> pathList
 }
 
-// Pipe
+// SYNC
+struct TRecordPipeMessageReq{
+  1: required string pipeName
+  2: required binary message
+}
 
 struct TShowPipeInfo {
   1: required string id
@@ -1290,6 +1294,9 @@ service IConfigNodeRPCService {
   /* Get all pipe information. It is used for DataNode registration and restart*/
   TGetAllPipeInfoResp getAllPipeInfo();
 
+  /* Get all pipe information. It is used for DataNode registration and restart*/
+  common.TSStatus recordPipeMessage(TRecordPipeMessageReq req);
+
   // ======================================================
   // TestTools
   // ======================================================