You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/05/21 15:15:23 UTC
[iotdb] 01/01: Revert "[IOTDB-5893] Pipe: PipeLauncher for data node setup process (#9879)"
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch revert-IOTDB-5893
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9124e21b8a49206a054b0c3fee41f071fb97bb14
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun May 21 23:14:49 2023 +0800
Revert "[IOTDB-5893] Pipe: PipeLauncher for data node setup process (#9879)"
This reverts commit 6f3f4e0d92f9b01c76990e14490d62c5b6657ad2.
---
.../iotdb/confignode/manager/ConfigManager.java | 11 +-
.../apache/iotdb/confignode/manager/IManager.java | 8 +
.../iotdb/confignode/manager/pipe/PipeManager.java | 3 -
.../manager/pipe/{runtime => }/PipeMetaSyncer.java | 2 +-
.../pipe/{plugin => }/PipePluginCoordinator.java | 2 +-
.../pipe/{runtime => }/PipeRuntimeCoordinator.java | 2 +-
.../pipe/{task => }/PipeTaskCoordinator.java | 23 +--
.../pipe/plugin/CreatePipePluginProcedure.java | 2 +-
.../impl/pipe/plugin/DropPipePluginProcedure.java | 2 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 7 +
.../apache/iotdb/db/client/ConfigNodeClient.java | 17 +++
.../iotdb/db/pipe/agent/runtime/PipeLauncher.java | 166 ---------------------
.../db/pipe/agent/runtime/PipeRuntimeAgent.java | 9 --
.../java/org/apache/iotdb/db/service/DataNode.java | 94 +++++++++++-
.../db/sync/common/ClusterSyncInfoFetcher.java | 11 +-
.../src/main/thrift/confignode.thrift | 9 +-
16 files changed, 170 insertions(+), 198 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index a23b6b44412..11d2bd2912e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -144,6 +144,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
@@ -1623,10 +1624,18 @@ public class ConfigManager implements IManager {
public TGetAllPipeInfoResp getAllPipeInfo() {
TSStatus status = confirmLeader();
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
- ? pipeManager.getPipeTaskCoordinator().getAllPipeInfo()
+ ? pipeManager.getPipeTaskCoordinator().showPipes()
: new TGetAllPipeInfoResp().setStatus(status);
}
+ @Override
+ public TSStatus recordPipeMessage(TRecordPipeMessageReq req) {
+ TSStatus status = confirmLeader();
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ? pipeManager.getPipeTaskCoordinator().recordPipeMessage(req)
+ : status;
+ }
+
@Override
public TGetRegionIdResp getRegionId(TGetRegionIdReq req) {
TSStatus status = confirmLeader();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 5eea6b30fdf..4a305f822bc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -87,6 +87,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
@@ -597,6 +598,13 @@ public interface IManager {
*/
TGetAllPipeInfoResp getAllPipeInfo();
+ /**
+ * Record PipeMessage
+ *
+ * @return TSStatus
+ */
+ TSStatus recordPipeMessage(TRecordPipeMessageReq req);
+
/**
* Get RegionId。used for Show cluster slots information in
* docs/zh/UserGuide/Cluster/Cluster-Maintenance.md.
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java
index be9440796fa..ec9ffd63009 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeManager.java
@@ -20,9 +20,6 @@
package org.apache.iotdb.confignode.manager.pipe;
import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.manager.pipe.plugin.PipePluginCoordinator;
-import org.apache.iotdb.confignode.manager.pipe.runtime.PipeRuntimeCoordinator;
-import org.apache.iotdb.confignode.manager.pipe.task.PipeTaskCoordinator;
import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
public class PipeManager {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeMetaSyncer.java
similarity index 97%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeMetaSyncer.java
index ba9bba61c6a..515abfdbdd1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeMetaSyncer.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.confignode.manager.pipe.runtime;
+package org.apache.iotdb.confignode.manager.pipe;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
similarity index 98%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
index 7435344edbf..8943a28e8c2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.confignode.manager.pipe.plugin;
+package org.apache.iotdb.confignode.manager.pipe;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
similarity index 98%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
index 9a117a0fbd1..81df8587176 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeRuntimeCoordinator.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.confignode.manager.pipe.runtime;
+package org.apache.iotdb.confignode.manager.pipe;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
similarity index 91%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
index 573c1cb1a80..e01f4b785bc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipeTaskCoordinator.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.pipe.task;
+package org.apache.iotdb.confignode.manager.pipe;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.consensus.request.read.pipe.task.ShowPipePlanV2;
@@ -25,6 +25,7 @@ import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -75,14 +76,7 @@ public class PipeTaskCoordinator {
return configManager.getProcedureManager().dropPipe(pipeName);
}
- public TShowPipeResp showPipes(TShowPipeReq req) {
- return ((PipeTableResp)
- configManager.getConsensusManager().read(new ShowPipePlanV2()).getDataset())
- .filter(req.whereClause, req.pipeName)
- .convertToTShowPipeResp();
- }
-
- public TGetAllPipeInfoResp getAllPipeInfo() {
+ public TGetAllPipeInfoResp showPipes() {
try {
return ((PipeTableResp)
configManager.getConsensusManager().read(new ShowPipePlanV2()).getDataset())
@@ -95,4 +89,15 @@ public class PipeTaskCoordinator {
Collections.emptyList());
}
}
+
+ public TShowPipeResp showPipes(TShowPipeReq req) {
+ return ((PipeTableResp)
+ configManager.getConsensusManager().read(new ShowPipePlanV2()).getDataset())
+ .filter(req.whereClause, req.pipeName)
+ .convertToTShowPipeResp();
+ }
+
+ public TSStatus recordPipeMessage(TRecordPipeMessageReq req) {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
index 88f40bdc114..b4790661278 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.manager.pipe.plugin.PipePluginCoordinator;
+import org.apache.iotdb.confignode.manager.pipe.PipePluginCoordinator;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index d44df9ac580..7c8268ecd02 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.confignode.procedure.impl.pipe.plugin;
import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
-import org.apache.iotdb.confignode.manager.pipe.plugin.PipePluginCoordinator;
+import org.apache.iotdb.confignode.manager.pipe.PipePluginCoordinator;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index c2b44e07de1..22367d7c164 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -127,6 +127,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
@@ -908,6 +909,12 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
return configManager.getAllPipeInfo();
}
+ @Override
+ @Deprecated
+ public TSStatus recordPipeMessage(TRecordPipeMessageReq req) {
+ return configManager.recordPipeMessage(req);
+ }
+
@Override
public TGetRegionIdResp getRegionId(TGetRegionIdReq req) {
if (req.isSetTimeStamp() && req.getType() != TConsensusGroupType.DataRegion) {
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 4376f6a4eec..ca35b446fc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -96,6 +96,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
@@ -1879,6 +1880,22 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
throw new TException(MSG_RECONNECTION_FAIL);
}
+ @Override
+ public TSStatus recordPipeMessage(TRecordPipeMessageReq req) throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TSStatus status = client.recordPipeMessage(req);
+ if (!updateConfigNodeLeader(status)) {
+ return status;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ waitAndReconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
@Override
public TGetRegionIdResp getRegionId(TGetRegionIdReq req) throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java
deleted file mode 100644
index 2d0d45ebc02..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeLauncher.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.agent.runtime;
-
-import org.apache.iotdb.commons.client.exception.ClientManagerException;
-import org.apache.iotdb.commons.exception.StartupException;
-import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
-import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
-import org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
-import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
-import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
-import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
-import org.apache.iotdb.db.client.ConfigNodeClient;
-import org.apache.iotdb.db.client.ConfigNodeClientManager;
-import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.service.ResourcesInformationHolder;
-import org.apache.iotdb.pipe.api.exception.PipeManagementException;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.thrift.TException;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class PipeLauncher {
-
- private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig();
-
- public void launchPipePluginAgent(ResourcesInformationHolder resourcesInformationHolder)
- throws StartupException {
- initPipePluginRelatedInstances();
-
- if (resourcesInformationHolder.getPipePluginMetaList() == null
- || resourcesInformationHolder.getPipePluginMetaList().isEmpty()) {
- return;
- }
-
- final List<PipePluginMeta> uninstalledOrConflictedPipePluginMetaList =
- getUninstalledOrConflictedPipePluginMetaList(resourcesInformationHolder);
- int index = 0;
- while (index < uninstalledOrConflictedPipePluginMetaList.size()) {
- List<PipePluginMeta> curList = new ArrayList<>();
- int offset = 0;
- while (offset < ResourcesInformationHolder.getJarNumOfOneRpc()
- && index + offset < uninstalledOrConflictedPipePluginMetaList.size()) {
- curList.add(uninstalledOrConflictedPipePluginMetaList.get(index + offset));
- offset++;
- }
- index += (offset + 1);
- fetchAndSavePipePluginJars(curList);
- }
-
- // create instances of pipe plugins and do registration
- try {
- for (PipePluginMeta meta : resourcesInformationHolder.getPipePluginMetaList()) {
- if (meta.isBuiltin()) {
- continue;
- }
- PipeAgent.plugin().doRegister(meta);
- }
- } catch (Exception e) {
- throw new StartupException(e);
- }
- }
-
- private void initPipePluginRelatedInstances() throws StartupException {
- try {
- PipePluginExecutableManager.setupAndGetInstance(
- IOTDB_CONFIG.getPipeTemporaryLibDir(), IOTDB_CONFIG.getPipeDir());
- PipePluginClassLoaderManager.setupAndGetInstance(IOTDB_CONFIG.getPipeDir());
- } catch (IOException e) {
- throw new StartupException(e);
- }
- }
-
- private List<PipePluginMeta> getUninstalledOrConflictedPipePluginMetaList(
- ResourcesInformationHolder resourcesInformationHolder) {
- final List<PipePluginMeta> pipePluginMetaList = new ArrayList<>();
- for (PipePluginMeta pipePluginMeta : resourcesInformationHolder.getPipePluginMetaList()) {
- if (pipePluginMeta.isBuiltin()) {
- continue;
- }
- // If jar does not exist, add current pipePluginMeta to list
- if (!PipePluginExecutableManager.getInstance()
- .hasFileUnderInstallDir(pipePluginMeta.getJarName())) {
- pipePluginMetaList.add(pipePluginMeta);
- } else {
- try {
- // local jar has conflicts with jar on config node, add current pipePluginMeta to list
- if (!PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) {
- pipePluginMetaList.add(pipePluginMeta);
- }
- } catch (PipeManagementException e) {
- pipePluginMetaList.add(pipePluginMeta);
- }
- }
- }
- return pipePluginMetaList;
- }
-
- private void fetchAndSavePipePluginJars(List<PipePluginMeta> pipePluginMetaList)
- throws StartupException {
- try (ConfigNodeClient configNodeClient =
- ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
- final List<String> jarNameList =
- pipePluginMetaList.stream().map(PipePluginMeta::getJarName).collect(Collectors.toList());
- final TGetJarInListResp resp =
- configNodeClient.getPipePluginJar(new TGetJarInListReq(jarNameList));
- if (resp.getStatus().getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
- throw new StartupException("Failed to get pipe plugin jar from config node.");
- }
- final List<ByteBuffer> jarList = resp.getJarList();
- for (int i = 0; i < pipePluginMetaList.size(); i++) {
- PipePluginExecutableManager.getInstance()
- .saveToInstallDir(jarList.get(i), pipePluginMetaList.get(i).getJarName());
- }
- } catch (IOException | TException | ClientManagerException e) {
- throw new StartupException(e);
- }
- }
-
- public void launchPipeTaskAgent() throws StartupException {
- try (final ConfigNodeClient configNodeClient =
- ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
- final TGetAllPipeInfoResp getAllPipeInfoResp = configNodeClient.getAllPipeInfo();
- if (getAllPipeInfoResp.getStatus().getCode()
- == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
- throw new StartupException("Failed to get pipe task meta from config node.");
- }
-
- PipeAgent.task()
- .handlePipeMetaChanges(
- getAllPipeInfoResp.getAllPipeInfo().stream()
- .map(PipeMeta::deserialize)
- .collect(Collectors.toList()));
- } catch (StartupException e) {
- throw e;
- } catch (Exception e) {
- throw new StartupException(e);
- }
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 2d85aac9e8c..1160ff4a845 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -19,9 +19,7 @@
package org.apache.iotdb.db.pipe.agent.runtime;
-import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
-import org.apache.iotdb.db.service.ResourcesInformationHolder;
import org.apache.iotdb.pipe.api.exception.PipeRuntimeException;
import org.slf4j.Logger;
@@ -31,13 +29,6 @@ public class PipeRuntimeAgent {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeRuntimeAgent.class);
- public synchronized void launch(ResourcesInformationHolder resourcesInformationHolder)
- throws StartupException {
- final PipeLauncher pipeLauncher = new PipeLauncher();
- pipeLauncher.launchPipePluginAgent(resourcesInformationHolder);
- pipeLauncher.launchPipeTaskAgent();
- }
-
public void report(PipeSubtask subtask) {
// TODO: terminate the task by the given taskID
LOGGER.warn(
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 44a5690db8c..3e0f39ac4ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -31,6 +31,8 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
+import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
+import org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
import org.apache.iotdb.commons.service.JMXService;
import org.apache.iotdb.commons.service.RegisterManager;
import org.apache.iotdb.commons.service.metric.MetricService;
@@ -82,6 +84,7 @@ import org.apache.iotdb.db.wal.WALManager;
import org.apache.iotdb.db.wal.utils.WALMode;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.metrics.utils.InternalReporterType;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.udf.api.exception.UDFManagementException;
@@ -458,7 +461,7 @@ public class DataNode implements DataNodeMBean {
private void prepareResources() throws StartupException {
prepareUDFResources();
prepareTriggerResources();
- preparePipeResources();
+ preparePipePluginResources();
}
/** register services and set up DataNode */
@@ -833,8 +836,93 @@ public class DataNode implements DataNodeMBean {
}
}
- private void preparePipeResources() throws StartupException {
- PipeAgent.runtime().launch(resourcesInformationHolder);
+ private void preparePipePluginResources() throws StartupException {
+ initPipePluginRelatedInstance();
+ if (resourcesInformationHolder.getPipePluginMetaList() == null
+ || resourcesInformationHolder.getPipePluginMetaList().isEmpty()) {
+ return;
+ }
+
+ // get jars from config node
+ List<PipePluginMeta> pipePluginNeedJarList = getJarListForPipePlugin();
+ int index = 0;
+ while (index < pipePluginNeedJarList.size()) {
+ List<PipePluginMeta> curList = new ArrayList<>();
+ int offset = 0;
+ while (offset < ResourcesInformationHolder.getJarNumOfOneRpc()
+ && index + offset < pipePluginNeedJarList.size()) {
+ curList.add(pipePluginNeedJarList.get(index + offset));
+ offset++;
+ }
+ index += (offset + 1);
+ getJarOfPipePlugins(curList);
+ }
+
+ // create instances of pipe plugins and do registration
+ try {
+ for (PipePluginMeta meta : resourcesInformationHolder.getPipePluginMetaList()) {
+ if (meta.isBuiltin()) {
+ continue;
+ }
+ PipeAgent.plugin().doRegister(meta);
+ }
+ } catch (Exception e) {
+ throw new StartupException(e);
+ }
+ }
+
+ private void initPipePluginRelatedInstance() throws StartupException {
+ try {
+ PipePluginExecutableManager.setupAndGetInstance(
+ config.getPipeTemporaryLibDir(), config.getPipeDir());
+ PipePluginClassLoaderManager.setupAndGetInstance(config.getPipeDir());
+ } catch (IOException e) {
+ throw new StartupException(e);
+ }
+ }
+
+ private List<PipePluginMeta> getJarListForPipePlugin() {
+ List<PipePluginMeta> res = new ArrayList<>();
+ for (PipePluginMeta pipePluginMeta : resourcesInformationHolder.getPipePluginMetaList()) {
+ if (pipePluginMeta.isBuiltin()) {
+ continue;
+ }
+ // If jar does not exist, add current pipePluginMeta to list
+ if (!PipePluginExecutableManager.getInstance()
+ .hasFileUnderInstallDir(pipePluginMeta.getJarName())) {
+ res.add(pipePluginMeta);
+ } else {
+ try {
+ // local jar has conflicts with jar on config node, add current pipePluginMeta to list
+ if (!PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) {
+ res.add(pipePluginMeta);
+ }
+ } catch (PipeManagementException e) {
+ res.add(pipePluginMeta);
+ }
+ }
+ }
+ return res;
+ }
+
+ private void getJarOfPipePlugins(List<PipePluginMeta> pipePluginMetaList)
+ throws StartupException {
+ try (ConfigNodeClient configNodeClient =
+ ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ List<String> jarNameList =
+ pipePluginMetaList.stream().map(PipePluginMeta::getJarName).collect(Collectors.toList());
+ TGetJarInListResp resp = configNodeClient.getPipePluginJar(new TGetJarInListReq(jarNameList));
+ if (resp.getStatus().getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
+ throw new StartupException("Failed to get pipe plugin jar from config node.");
+ }
+ List<ByteBuffer> jarList = resp.getJarList();
+ for (int i = 0; i < pipePluginMetaList.size(); i++) {
+ PipePluginExecutableManager.getInstance()
+ .saveToInstallDir(jarList.get(i), pipePluginMetaList.get(i).getJarName());
+ }
+ } catch (IOException | TException | ClientManagerException e) {
+ throw new StartupException(e);
+ }
}
private void getPipeInformationList(List<ByteBuffer> allPipeInformation) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
index 929a7ce4f47..6c4c6282a7f 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.sync.pipesink.PipeSink;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
import org.apache.iotdb.db.client.ConfigNodeClient;
import org.apache.iotdb.db.client.ConfigNodeClientManager;
import org.apache.iotdb.db.client.ConfigNodeInfo;
@@ -125,7 +126,15 @@ public class ClusterSyncInfoFetcher implements ISyncInfoFetcher {
@Override
public TSStatus recordMsg(String pipeName, PipeMessage message) {
- return RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, "method not supported");
+ try (ConfigNodeClient configNodeClient =
+ CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ TRecordPipeMessageReq req =
+ new TRecordPipeMessageReq(pipeName, message.serializeToByteBuffer());
+ return configNodeClient.recordPipeMessage(req);
+ } catch (Exception e) {
+ LOGGER.error("RecordMsg error because {}", e.getMessage(), e);
+ return RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, e.getMessage());
+ }
}
// endregion
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 13f3461981a..7607d0cedfc 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -600,7 +600,11 @@ struct TGetPathsSetTemplatesResp {
2: optional list<string> pathList
}
-// Pipe
+// SYNC
+struct TRecordPipeMessageReq{
+ 1: required string pipeName
+ 2: required binary message
+}
struct TShowPipeInfo {
1: required string id
@@ -1290,6 +1294,9 @@ service IConfigNodeRPCService {
/* Get all pipe information. It is used for DataNode registration and restart*/
TGetAllPipeInfoResp getAllPipeInfo();
+ /* Get all pipe information. It is used for DataNode registration and restart*/
+ common.TSStatus recordPipeMessage(TRecordPipeMessageReq req);
+
// ======================================================
// TestTools
// ======================================================