You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/03/18 05:05:11 UTC
[iotdb] branch master updated: [IOTDB-5690] PipePlugin: CreatePipePluginProcedure & DropPipePluginProcedure (#9363)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d0e2b3f108 [IOTDB-5690] PipePlugin: CreatePipePluginProcedure & DropPipePluginProcedure (#9363)
d0e2b3f108 is described below
commit d0e2b3f108ab6caa6389b24923e96788edf42274
Author: Itami Sho <42...@users.noreply.github.com>
AuthorDate: Sat Mar 18 13:05:04 2023 +0800
[IOTDB-5690] PipePlugin: CreatePipePluginProcedure & DropPipePluginProcedure (#9363)
Signed-off-by: Steve Yurong Su <ro...@apache.org>
Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
.../client/async/AsyncDataNodeClientPool.java | 14 +
.../consensus/request/ConfigPhysicalPlan.java | 16 ++
.../write/pipe/plugin/CreatePipePluginPlan.java | 4 +
.../iotdb/confignode/manager/ConfigManager.java | 5 +
.../apache/iotdb/confignode/manager/IManager.java | 8 +
.../iotdb/confignode/manager/ProcedureManager.java | 46 ++++
.../iotdb/confignode/manager/node/NodeManager.java | 9 +
.../manager/pipe/PipePluginCoordinator.java | 120 +-------
.../procedure/env/ConfigNodeProcedureEnv.java | 31 +++
.../pipe/plugin/CreatePipePluginProcedure.java | 306 +++++++++++++++++++++
.../impl/pipe/plugin/DropPipePluginProcedure.java | 251 +++++++++++++++++
.../state/pipe/plugin/CreatePipePluginState.java | 27 ++
.../state/pipe/plugin/DropPipePluginState.java | 27 ++
.../procedure/store/ProcedureFactory.java | 12 +
.../confignode/procedure/store/ProcedureType.java | 6 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 5 +
.../impl/pipe/CreatePipePluginProcedureTest.java | 60 ++++
.../impl/pipe/DropPipePluginProcedureTest.java | 53 ++++
.../confignode1conf/iotdb-common.properties | 1 +
.../confignode2conf/iotdb-common.properties | 1 +
.../confignode3conf/iotdb-common.properties | 1 +
.../resources/conf/iotdb-common.properties | 9 +
.../service/PipePluginClassLoaderManager.java | 2 +-
.../commons/udf/service/UDFManagementService.java | 2 +-
.../apache/iotdb/db/client/ConfigNodeClient.java | 16 ++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 24 ++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 7 +
.../iotdb/db/pipe/agent/PipePluginAgent.java | 10 +-
.../java/org/apache/iotdb/db/service/DataNode.java | 110 +++++++-
.../db/service/ResourcesInformationHolder.java | 12 +
.../datanode1conf/iotdb-common.properties | 3 +-
.../datanode2conf/iotdb-common.properties | 3 +-
.../datanode3conf/iotdb-common.properties | 3 +-
.../src/main/thrift/confignode.thrift | 6 +
34 files changed, 1087 insertions(+), 123 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index ce07676532..79496235f5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -41,12 +41,14 @@ import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreatePipeOnDataNodeReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreatePipePluginInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TDeactivateTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteSchemaReq;
import org.apache.iotdb.mpp.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TDropPipePluginInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
@@ -203,6 +205,18 @@ public class AsyncDataNodeClientPool {
(AsyncTSStatusRPCHandler)
clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
break;
+ case CREATE_PIPE_PLUGIN:
+ client.createPipePlugin(
+ (TCreatePipePluginInstanceReq) clientHandler.getRequest(requestId),
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
+ case DROP_PIPE_PLUGIN:
+ client.dropPipePlugin(
+ (TDropPipePluginInstanceReq) clientHandler.getRequest(requestId),
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
case MERGE:
case FULL_MERGE:
client.merge(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index 62d83e48f3..bb3dc7d4e3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -33,6 +33,8 @@ import org.apache.iotdb.confignode.consensus.request.read.partition.GetOrCreateS
import org.apache.iotdb.confignode.consensus.request.read.partition.GetSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.read.partition.GetSeriesSlotListPlan;
import org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotListPlan;
+import org.apache.iotdb.confignode.consensus.request.read.pipe.plugin.GetPipePluginJarPlan;
+import org.apache.iotdb.confignode.consensus.request.read.pipe.plugin.GetPipePluginTablePlan;
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionIdPlan;
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
import org.apache.iotdb.confignode.consensus.request.read.template.CheckTemplateSettablePlan;
@@ -73,6 +75,8 @@ import org.apache.iotdb.confignode.consensus.request.write.model.UpdateModelStat
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
import org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
@@ -414,6 +418,18 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest {
case ShowTrail:
plan = new ShowTrailPlan();
break;
+ case CreatePipePlugin:
+ plan = new CreatePipePluginPlan();
+ break;
+ case DropPipePlugin:
+ plan = new DropPipePluginPlan();
+ break;
+ case GetPipePluginTable:
+ plan = new GetPipePluginTablePlan();
+ break;
+ case GetPipePluginJar:
+ plan = new GetPipePluginJarPlan();
+ break;
default:
throw new IOException("unknown PhysicalPlan configPhysicalPlanType: " + planType);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/plugin/CreatePipePluginPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/plugin/CreatePipePluginPlan.java
index 35fb68272d..ce4984228c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/plugin/CreatePipePluginPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/plugin/CreatePipePluginPlan.java
@@ -34,6 +34,10 @@ public class CreatePipePluginPlan extends ConfigPhysicalPlan {
private Binary jarFile;
+ public CreatePipePluginPlan() {
+ super(ConfigPhysicalPlanType.CreatePipePlugin);
+ }
+
public CreatePipePluginPlan(PipePluginMeta pipePluginMeta, Binary jarFile) {
super(ConfigPhysicalPlanType.CreatePipePlugin);
this.pipePluginMeta = pipePluginMeta;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 1f1d825e88..d92883c14f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -927,6 +927,11 @@ public class ConfigManager implements IManager {
return modelManager;
}
+ @Override
+ public PipeManager getPipeManager() {
+ return pipeManager;
+ }
+
@Override
public TSStatus operatePermission(AuthorPlan authorPlan) {
TSStatus status = confirmLeader();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index eaa7ccbe92..75aaaf9a5b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.confignode.manager.cq.CQManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
+import org.apache.iotdb.confignode.manager.pipe.PipeManager;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
@@ -187,6 +188,13 @@ public interface IManager {
*/
CQManager getCQManager();
+ /**
+ * Get PipeManager
+ *
+ * @return PipeManager instance
+ */
+ PipeManager getPipeManager();
+
/**
* Get RetryFailedTasksThread
*
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 7084c01aac..63a78d36c1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.commons.exception.sync.PipeException;
import org.apache.iotdb.commons.model.ModelInformation;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
@@ -51,6 +52,8 @@ import org.apache.iotdb.confignode.procedure.impl.model.DropModelProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
+import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.DeactivateTemplateProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.DeleteDatabaseProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.DeleteTimeSeriesProcedure;
@@ -576,6 +579,49 @@ public class ProcedureManager {
}
}
+ public TSStatus createPipePlugin(PipePluginMeta pipePluginMeta, byte[] jarFile) {
+ final CreatePipePluginProcedure createPipePluginProcedure =
+ new CreatePipePluginProcedure(pipePluginMeta, jarFile);
+ try {
+ if (jarFile != null
+ && new UpdateProcedurePlan(createPipePluginProcedure).getSerializedSize()
+ > planSizeLimit) {
+ return new TSStatus(TSStatusCode.CREATE_PIPE_PLUGIN_ERROR.getStatusCode())
+ .setMessage(
+ String.format(
+ "Fail to create pipe plugin[%s], the size of Jar is too large, you can increase the value of property 'config_node_ratis_log_appender_buffer_size_max' on ConfigNode",
+ pipePluginMeta.getPluginName()));
+ }
+ } catch (IOException e) {
+ return new TSStatus(TSStatusCode.CREATE_PIPE_PLUGIN_ERROR.getStatusCode())
+ .setMessage(e.getMessage());
+ }
+
+ long procedureId = executor.submitProcedure(createPipePluginProcedure);
+ List<TSStatus> statusList = new ArrayList<>();
+ boolean isSucceed =
+ waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
+ if (isSucceed) {
+ return RpcUtils.SUCCESS_STATUS;
+ } else {
+ return new TSStatus(TSStatusCode.CREATE_PIPE_PLUGIN_ERROR.getStatusCode())
+ .setMessage(statusList.get(0).getMessage());
+ }
+ }
+
+ public TSStatus dropPipePlugin(String pluginName) {
+ long procedureId = executor.submitProcedure(new DropPipePluginProcedure(pluginName));
+ List<TSStatus> statusList = new ArrayList<>();
+ boolean isSucceed =
+ waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
+ if (isSucceed) {
+ return RpcUtils.SUCCESS_STATUS;
+ } else {
+ return new TSStatus(TSStatusCode.DROP_PIPE_PLUGIN_ERROR.getStatusCode())
+ .setMessage(statusList.get(0).getMessage());
+ }
+ }
+
public TSStatus createPipe(TCreatePipeReq req) {
try {
long procedureId = executor.submitProcedure(new CreatePipeProcedure(req));
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index fb822a9abb..b502a25e3a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -64,6 +64,7 @@ import org.apache.iotdb.confignode.manager.node.heartbeat.ConfigNodeHeartbeatCac
import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
+import org.apache.iotdb.confignode.manager.pipe.PipeManager;
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
import org.apache.iotdb.confignode.rpc.thrift.TCQConfig;
@@ -231,6 +232,7 @@ public class NodeManager {
}
private TRuntimeConfiguration getRuntimeConfiguration() {
+ getPipeManager().getPipePluginCoordinator().getPipePluginInfo().acquirePipePluginInfoLock();
getTriggerManager().getTriggerInfo().acquireTriggerTableLock();
getUDFManager().getUdfInfo().acquireUDFTableLock();
@@ -241,12 +243,15 @@ public class NodeManager {
getTriggerManager().getTriggerTable(false).getAllTriggerInformation());
runtimeConfiguration.setAllUDFInformation(
getUDFManager().getUDFTable().getAllUDFInformation());
+ runtimeConfiguration.setAllPipeInformation(
+ getPipeManager().getPipePluginCoordinator().getPipePluginTable().getAllPipePluginMeta());
runtimeConfiguration.setAllTTLInformation(
DataNodeRegisterResp.convertAllTTLInformation(getClusterSchemaManager().getAllTTLInfo()));
return runtimeConfiguration;
} finally {
getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
getUDFManager().getUdfInfo().releaseUDFTableLock();
+ getPipeManager().getPipePluginCoordinator().getPipePluginInfo().releasePipePluginInfoLock();
}
}
@@ -978,6 +983,10 @@ public class NodeManager {
return configManager.getTriggerManager();
}
+ private PipeManager getPipeManager() {
+ return configManager.getPipeManager();
+ }
+
private UDFManager getUDFManager() {
return configManager.getUDFManager();
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
index c2cee0cc74..9e45a41dce 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
@@ -19,18 +19,10 @@
package org.apache.iotdb.confignode.manager.pipe;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
-import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.pipe.plugin.GetPipePluginJarPlan;
import org.apache.iotdb.confignode.consensus.request.read.pipe.plugin.GetPipePluginTablePlan;
-import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
-import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
import org.apache.iotdb.confignode.consensus.response.pipe.plugin.PipePluginTableResp;
import org.apache.iotdb.confignode.consensus.response.udf.JarResp;
import org.apache.iotdb.confignode.manager.ConfigManager;
@@ -39,31 +31,18 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
-import org.apache.iotdb.mpp.rpc.thrift.TCreatePipePluginInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDropPipePluginInstanceReq;
-import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.utils.Binary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Collections;
-import java.util.List;
-import java.util.Map;
public class PipePluginCoordinator {
private static final Logger LOGGER = LoggerFactory.getLogger(PipePluginCoordinator.class);
- private final long planSizeLimit =
- ConfigNodeDescriptor.getInstance()
- .getConf()
- .getConfigNodeRatisConsensusLogAppenderBufferSize()
- - IoTDBConstant.RAFT_LOG_BASIC_SIZE;
-
private final ConfigManager configManager;
private final PipePluginInfo pipePluginInfo;
@@ -72,108 +51,23 @@ public class PipePluginCoordinator {
this.pipePluginInfo = pipePluginInfo;
}
- public TSStatus createPipePlugin(TCreatePipePluginReq req) {
- pipePluginInfo.acquirePipePluginInfoLock();
- try {
- return doCreatePipePlugin(req);
- } catch (Exception e) {
- LOGGER.warn(e.getMessage(), e);
- return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
- .setMessage(e.getMessage());
- } finally {
- pipePluginInfo.releasePipePluginInfoLock();
- }
+ public PipePluginInfo getPipePluginInfo() {
+ return pipePluginInfo;
}
- private TSStatus doCreatePipePlugin(TCreatePipePluginReq req) throws IOException {
+ public TSStatus createPipePlugin(TCreatePipePluginReq req) {
final String pluginName = req.getPluginName().toUpperCase();
+ final String className = req.getClassName();
final String jarName = req.getJarName();
final String jarMD5 = req.getJarMD5();
-
- pipePluginInfo.validateBeforeCreatingPipePlugin(pluginName, jarName, jarMD5);
- final boolean needToSaveJar =
- pipePluginInfo.isJarNeededToBeSavedWhenCreatingPipePlugin(jarName);
- LOGGER.info(
- "Start to create PipePlugin [{}] on Data Nodes, needToSaveJar[{}]",
- pluginName,
- needToSaveJar);
-
- final byte[] jarFile = req.getJarFile();
final PipePluginMeta pipePluginMeta =
- new PipePluginMeta(pluginName, req.getClassName(), jarName, jarMD5);
-
- // data nodes
- final TSStatus dataNodesStatus =
- RpcUtils.squashResponseStatusList(
- createPipePluginOnDataNodes(pipePluginMeta, needToSaveJar ? jarFile : null));
- if (dataNodesStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return dataNodesStatus;
- }
+ new PipePluginMeta(pluginName, className, jarName, jarMD5);
- // config nodes
- final CreatePipePluginPlan createPluginPlan =
- new CreatePipePluginPlan(pipePluginMeta, needToSaveJar ? new Binary(jarFile) : null);
- if (needToSaveJar && createPluginPlan.getSerializedSize() > planSizeLimit) {
- return new TSStatus(TSStatusCode.CREATE_PIPE_PLUGIN_ERROR.getStatusCode())
- .setMessage(
- String.format(
- "Fail to create PipePlugin[%s], the size of Jar is too large, you should increase the value of property 'config_node_ratis_log_appender_buffer_size_max' on ConfigNode",
- pluginName));
- }
- LOGGER.info("Start to add PipePlugin [{}] to PipePluginTable", pluginName);
- return configManager.getConsensusManager().write(createPluginPlan).getStatus();
- }
-
- private List<TSStatus> createPipePluginOnDataNodes(PipePluginMeta pipePluginMeta, byte[] jarFile)
- throws IOException {
- final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
- configManager.getNodeManager().getRegisteredDataNodeLocations();
- final TCreatePipePluginInstanceReq req =
- new TCreatePipePluginInstanceReq(pipePluginMeta.serialize(), ByteBuffer.wrap(jarFile));
-
- final AsyncClientHandler<TCreatePipePluginInstanceReq, TSStatus> clientHandler =
- new AsyncClientHandler<>(DataNodeRequestType.CREATE_PIPE_PLUGIN, req, dataNodeLocationMap);
- AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
- return clientHandler.getResponseList();
+ return configManager.getProcedureManager().createPipePlugin(pipePluginMeta, req.getJarFile());
}
public TSStatus dropPipePlugin(String pluginName) {
- pipePluginInfo.acquirePipePluginInfoLock();
- try {
- return doDropPipePlugin(pluginName);
- } catch (Exception e) {
- LOGGER.warn(e.getMessage(), e);
- return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
- .setMessage(e.getMessage());
- } finally {
- pipePluginInfo.releasePipePluginInfoLock();
- }
- }
-
- private TSStatus doDropPipePlugin(String pluginName) {
- pluginName = pluginName.toUpperCase();
- pipePluginInfo.validateBeforeDroppingPipePlugin(pluginName);
-
- TSStatus result = RpcUtils.squashResponseStatusList(dropPipePluginOnDataNodes(pluginName));
- if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return result;
- }
-
- return configManager
- .getConsensusManager()
- .write(new DropPipePluginPlan(pluginName))
- .getStatus();
- }
-
- private List<TSStatus> dropPipePluginOnDataNodes(String pluginName) {
- final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
- configManager.getNodeManager().getRegisteredDataNodeLocations();
- final TDropPipePluginInstanceReq req = new TDropPipePluginInstanceReq(pluginName, false);
-
- final AsyncClientHandler<TDropPipePluginInstanceReq, TSStatus> clientHandler =
- new AsyncClientHandler<>(DataNodeRequestType.DROP_PIPE_PLUGIN, req, dataNodeLocationMap);
- AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
- return clientHandler.getResponseList();
+ return configManager.getProcedureManager().dropPipePlugin(pluginName);
}
public TGetPipePluginTableResp getPipePluginTable() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 4bb874c4ba..efa53d330d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
@@ -60,8 +61,10 @@ import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreatePipePluginInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TDropPipePluginInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
@@ -625,6 +628,34 @@ public class ConfigNodeProcedureEnv {
return clientHandler.getResponseList();
}
+ public List<TSStatus> createPipePluginOnDataNodes(PipePluginMeta pipePluginMeta, byte[] jarFile)
+ throws IOException {
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ configManager.getNodeManager().getRegisteredDataNodeLocations();
+ final TCreatePipePluginInstanceReq request =
+ new TCreatePipePluginInstanceReq(pipePluginMeta.serialize(), ByteBuffer.wrap(jarFile));
+
+ final AsyncClientHandler<TCreatePipePluginInstanceReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(
+ DataNodeRequestType.CREATE_PIPE_PLUGIN, request, dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ return clientHandler.getResponseList();
+ }
+
+ public List<TSStatus> dropPipePluginOnDataNodes(
+ String pipePluginName, boolean needToDeleteJarFile) {
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ configManager.getNodeManager().getRegisteredDataNodeLocations();
+ final TDropPipePluginInstanceReq request =
+ new TDropPipePluginInstanceReq(pipePluginName, needToDeleteJarFile);
+
+ AsyncClientHandler<TDropPipePluginInstanceReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(
+ DataNodeRequestType.DROP_PIPE_PLUGIN, request, dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ return clientHandler.getResponseList();
+ }
+
public LockQueue getNodeLock() {
return nodeLock;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
new file mode 100644
index 0000000000..7d2a494fc7
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.pipe.plugin;
+
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.persistence.pipe.PipePluginInfo;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
+import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
+import org.apache.iotdb.confignode.procedure.state.pipe.plugin.CreatePipePluginState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * This class extends {@link AbstractNodeProcedure} to make sure that when a {@link
+ * CreatePipePluginProcedure} is executed, the {@link AddConfigNodeProcedure}, {@link
+ * RemoveConfigNodeProcedure} or {@link RemoveDataNodeProcedure} will not be executed at the same
+ * time.
+ */
+public class CreatePipePluginProcedure extends AbstractNodeProcedure<CreatePipePluginState> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CreatePipePluginProcedure.class);
+
+ private static final int RETRY_THRESHOLD = 5;
+
+ private PipePluginMeta pipePluginMeta;
+ private byte[] jarFile;
+
+ public CreatePipePluginProcedure() {
+ super();
+ }
+
+ public CreatePipePluginProcedure(PipePluginMeta pipePluginMeta, byte[] jarFile) {
+ super();
+ this.pipePluginMeta = pipePluginMeta;
+ this.jarFile = jarFile;
+ }
+
+ @Override
+ protected Flow executeFromState(ConfigNodeProcedureEnv env, CreatePipePluginState state)
+ throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+ if (pipePluginMeta == null) {
+ return Flow.NO_MORE_STATE;
+ }
+
+ try {
+ switch (state) {
+ case LOCK:
+ return executeFromLock(env);
+ case CREATE_ON_CONFIG_NODES:
+ return executeFromCreateOnConfigNodes(env);
+ case CREATE_ON_DATA_NODES:
+ return executeFromCreateOnDataNodes(env);
+ case UNLOCK:
+ return executeFromUnlock(env);
+ }
+ } catch (Exception e) {
+ if (isRollbackSupported(state)) {
+ LOGGER.error("CreatePipePluginProcedure failed in state {}, will rollback", state, e);
+ setFailure(new ProcedureException(e.getMessage()));
+ } else {
+ LOGGER.error(
+ "Retrievable error trying to create pipe plugin [{}], state: {}",
+ pipePluginMeta.getPluginName(),
+ state,
+ e);
+ if (getCycles() > RETRY_THRESHOLD) {
+ LOGGER.error(
+ "Fail to create pipe plugin [{}] after {} retries",
+ pipePluginMeta.getPluginName(),
+ getCycles());
+ setFailure(new ProcedureException(e.getMessage()));
+ }
+ }
+ }
+ return Flow.HAS_MORE_STATE;
+ }
+
+ private Flow executeFromLock(ConfigNodeProcedureEnv env) {
+ LOGGER.info("CreatePipePluginProcedure: executeFromLock({})", pipePluginMeta.getPluginName());
+ final PipePluginInfo pipePluginInfo =
+ env.getConfigManager().getPipeManager().getPipePluginCoordinator().getPipePluginInfo();
+
+ pipePluginInfo.acquirePipePluginInfoLock();
+
+ try {
+ pipePluginInfo.validateBeforeCreatingPipePlugin(
+ pipePluginMeta.getPluginName(), pipePluginMeta.getJarName(), pipePluginMeta.getJarMD5());
+ } catch (PipeManagementException e) {
+ // The pipe plugin has already created, we should end the procedure
+ LOGGER.warn(
+ "Pipe plugin {} is already created, end the CreatePipePluginProcedure({})",
+ pipePluginMeta.getPluginName(),
+ pipePluginMeta.getPluginName());
+ setFailure(new ProcedureException(e.getMessage()));
+ pipePluginInfo.releasePipePluginInfoLock();
+ return Flow.NO_MORE_STATE;
+ }
+
+ setNextState(CreatePipePluginState.CREATE_ON_CONFIG_NODES);
+ return Flow.HAS_MORE_STATE;
+ }
+
+ private Flow executeFromCreateOnConfigNodes(ConfigNodeProcedureEnv env) {
+ LOGGER.info(
+ "CreatePipePluginProcedure: executeFromCreateOnConfigNodes({})",
+ pipePluginMeta.getPluginName());
+
+ final ConfigManager configNodeManager = env.getConfigManager();
+
+ final boolean needToSaveJar =
+ configNodeManager
+ .getPipeManager()
+ .getPipePluginCoordinator()
+ .getPipePluginInfo()
+ .isJarNeededToBeSavedWhenCreatingPipePlugin(pipePluginMeta.getJarName());
+ final CreatePipePluginPlan createPluginPlan =
+ new CreatePipePluginPlan(pipePluginMeta, needToSaveJar ? new Binary(jarFile) : null);
+
+ final ConsensusWriteResponse response =
+ configNodeManager.getConsensusManager().write(createPluginPlan);
+ if (!response.isSuccessful()) {
+ throw new PipeManagementException(response.getErrorMessage());
+ }
+
+ setNextState(CreatePipePluginState.CREATE_ON_DATA_NODES);
+ return Flow.HAS_MORE_STATE;
+ }
+
+ private Flow executeFromCreateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException {
+ LOGGER.info(
+ "CreatePipePluginProcedure: executeFromCreateOnDataNodes({})",
+ pipePluginMeta.getPluginName());
+
+ if (RpcUtils.squashResponseStatusList(env.createPipePluginOnDataNodes(pipePluginMeta, jarFile))
+ .getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ setNextState(CreatePipePluginState.UNLOCK);
+ return Flow.HAS_MORE_STATE;
+ }
+
+ throw new PipeManagementException(
+ String.format(
+ "Failed to create pipe plugin instance [%s] on data nodes",
+ pipePluginMeta.getPluginName()));
+ }
+
+ private Flow executeFromUnlock(ConfigNodeProcedureEnv env) {
+ LOGGER.info("CreatePipePluginProcedure: executeFromUnlock({})", pipePluginMeta.getPluginName());
+
+ env.getConfigManager()
+ .getPipeManager()
+ .getPipePluginCoordinator()
+ .getPipePluginInfo()
+ .releasePipePluginInfoLock();
+
+ return Flow.NO_MORE_STATE;
+ }
+
+ @Override
+ protected void rollbackState(ConfigNodeProcedureEnv env, CreatePipePluginState state)
+ throws IOException, InterruptedException, ProcedureException {
+ switch (state) {
+ case LOCK:
+ rollbackFromLock(env);
+ break;
+ case CREATE_ON_CONFIG_NODES:
+ rollbackFromCreateOnConfigNodes(env);
+ break;
+ case CREATE_ON_DATA_NODES:
+ rollbackFromCreateOnDataNodes(env);
+ break;
+ }
+ }
+
+ private void rollbackFromLock(ConfigNodeProcedureEnv env) {
+ LOGGER.info("CreatePipePluginProcedure: rollbackFromLock({})", pipePluginMeta.getPluginName());
+
+ env.getConfigManager()
+ .getPipeManager()
+ .getPipePluginCoordinator()
+ .getPipePluginInfo()
+ .releasePipePluginInfoLock();
+ }
+
+ private void rollbackFromCreateOnConfigNodes(ConfigNodeProcedureEnv env) {
+ LOGGER.info(
+ "CreatePipePluginProcedure: rollbackFromCreateOnConfigNodes({})",
+ pipePluginMeta.getPluginName());
+
+ env.getConfigManager()
+ .getConsensusManager()
+ .write(new DropPipePluginPlan(pipePluginMeta.getPluginName()));
+ }
+
+ private void rollbackFromCreateOnDataNodes(ConfigNodeProcedureEnv env) throws ProcedureException {
+ LOGGER.info(
+ "CreatePipePluginProcedure: rollbackFromCreateOnDataNodes({})",
+ pipePluginMeta.getPluginName());
+
+ if (RpcUtils.squashResponseStatusList(
+ env.dropPipePluginOnDataNodes(pipePluginMeta.getPluginName(), false))
+ .getCode()
+ != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new ProcedureException(
+ String.format(
+ "Failed to rollback pipe plugin [%s] on data nodes", pipePluginMeta.getPluginName()));
+ }
+ }
+
+ @Override
+ protected boolean isRollbackSupported(CreatePipePluginState state) {
+ switch (state) {
+ case LOCK:
+ case CREATE_ON_CONFIG_NODES:
+ case CREATE_ON_DATA_NODES:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ @Override
+ protected CreatePipePluginState getState(int stateId) {
+ return CreatePipePluginState.values()[stateId];
+ }
+
+ @Override
+ protected int getStateId(CreatePipePluginState createPipePluginState) {
+ return createPipePluginState.ordinal();
+ }
+
+ @Override
+ protected CreatePipePluginState getInitialState() {
+ return CreatePipePluginState.LOCK;
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.writeShort(ProcedureType.CREATE_PIPE_PLUGIN_PROCEDURE.getTypeCode());
+ super.serialize(stream);
+ pipePluginMeta.serialize(stream);
+ ReadWriteIOUtils.write(ByteBuffer.wrap(jarFile), stream);
+ }
+
+ @Override
+ public void deserialize(ByteBuffer byteBuffer) {
+ super.deserialize(byteBuffer);
+ pipePluginMeta = PipePluginMeta.deserialize(byteBuffer);
+ jarFile = ReadWriteIOUtils.readBinary(byteBuffer).getValues();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that instanceof CreatePipePluginProcedure) {
+ CreatePipePluginProcedure thatProcedure = (CreatePipePluginProcedure) that;
+ return thatProcedure.getProcId() == getProcId()
+ && thatProcedure.getState() == this.getState()
+ && thatProcedure.pipePluginMeta.equals(pipePluginMeta);
+ }
+ return false;
+ }
+
+ @TestOnly
+ public byte[] getJarFile() {
+ return jarFile;
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
new file mode 100644
index 0000000000..815782a0ac
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.pipe.plugin;
+
+import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
+import org.apache.iotdb.confignode.persistence.pipe.PipePluginInfo;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
+import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
+import org.apache.iotdb.confignode.procedure.state.pipe.plugin.DropPipePluginState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * This class extends {@link AbstractNodeProcedure} to make sure that when a {@link
+ * DropPipePluginProcedure} is executed, the {@link AddConfigNodeProcedure}, {@link
+ * RemoveConfigNodeProcedure} or {@link RemoveDataNodeProcedure} will not be executed at the same
+ * time.
+ */
+public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePluginState> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DropPipePluginProcedure.class);
+
+ private static final int RETRY_THRESHOLD = 5;
+
+ private String pluginName;
+
+ public DropPipePluginProcedure() {
+ super();
+ }
+
+ public DropPipePluginProcedure(String pluginName) {
+ super();
+ this.pluginName = pluginName;
+ }
+
+ @Override
+ protected Flow executeFromState(ConfigNodeProcedureEnv env, DropPipePluginState state)
+ throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+ if (pluginName == null) {
+ return Flow.NO_MORE_STATE;
+ }
+
+ try {
+ switch (state) {
+ case LOCK:
+ return executeFromLock(env);
+ case DROP_ON_DATA_NODES:
+ return executeFromDropOnDataNodes(env);
+ case DROP_ON_CONFIG_NODES:
+ return executeFromDropOnConfigNodes(env);
+ case UNLOCK:
+ return executeFromUnlock(env);
+ }
+ } catch (Exception e) {
+ if (isRollbackSupported(state)) {
+ LOGGER.warn("DropPipePluginProcedure failed in state {}, will rollback", state, e);
+ setFailure(new ProcedureException(e.getMessage()));
+ } else {
+ LOGGER.error(
+ "Retrievable error trying to drop pipe plugin [{}], state: {}", pluginName, state, e);
+ if (getCycles() > RETRY_THRESHOLD) {
+ LOGGER.error("Fail to drop pipe plugin [{}] after {} retries", pluginName, getCycles());
+ setFailure(new ProcedureException(e.getMessage()));
+ }
+ }
+ }
+ return Flow.HAS_MORE_STATE;
+ }
+
+ private Flow executeFromLock(ConfigNodeProcedureEnv env) {
+ LOGGER.info("DropPipePluginProcedure: executeFromLock({})", pluginName);
+ final PipePluginInfo pipePluginInfo =
+ env.getConfigManager().getPipeManager().getPipePluginCoordinator().getPipePluginInfo();
+
+ pipePluginInfo.acquirePipePluginInfoLock();
+
+ try {
+ pipePluginInfo.validateBeforeDroppingPipePlugin(pluginName);
+ } catch (PipeManagementException e) {
+ // if the pipe plugin is not exist, we should end the procedure
+ LOGGER.warn(e.getMessage());
+ setFailure(new ProcedureException(e.getMessage()));
+ pipePluginInfo.releasePipePluginInfoLock();
+ return Flow.NO_MORE_STATE;
+ }
+
+ env.getConfigManager().getConsensusManager().write(new DropPipePluginPlan(pluginName));
+
+ setNextState(DropPipePluginState.DROP_ON_DATA_NODES);
+ return Flow.HAS_MORE_STATE;
+ }
+
+ private Flow executeFromDropOnDataNodes(ConfigNodeProcedureEnv env) {
+ LOGGER.info("DropPipePluginProcedure: executeFromDropOnDataNodes({})", pluginName);
+
+ if (RpcUtils.squashResponseStatusList(env.dropPipePluginOnDataNodes(pluginName, false))
+ .getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ setNextState(DropPipePluginState.DROP_ON_CONFIG_NODES);
+ return Flow.HAS_MORE_STATE;
+ }
+
+ throw new PipeManagementException(
+ String.format("Failed to drop pipe plugin %s on data nodes", pluginName));
+ }
+
+ private Flow executeFromDropOnConfigNodes(ConfigNodeProcedureEnv env) {
+ LOGGER.info("DropPipePluginProcedure: executeFromDropOnConfigNodes({})", pluginName);
+
+ env.getConfigManager().getConsensusManager().write(new DropPipePluginPlan(pluginName));
+
+ setNextState(DropPipePluginState.UNLOCK);
+ return Flow.HAS_MORE_STATE;
+ }
+
+ private Flow executeFromUnlock(ConfigNodeProcedureEnv env) {
+ LOGGER.info("DropPipePluginProcedure: executeFromUnlock({})", pluginName);
+
+ env.getConfigManager()
+ .getPipeManager()
+ .getPipePluginCoordinator()
+ .getPipePluginInfo()
+ .releasePipePluginInfoLock();
+
+ return Flow.NO_MORE_STATE;
+ }
+
+ @Override
+ protected void rollbackState(ConfigNodeProcedureEnv env, DropPipePluginState state)
+ throws IOException, InterruptedException, ProcedureException {
+ switch (state) {
+ case LOCK:
+ rollbackFromLock(env);
+ break;
+ case DROP_ON_DATA_NODES:
+ rollbackFromDropOnDataNodes(env);
+ break;
+ case DROP_ON_CONFIG_NODES:
+ rollbackFromDropOnConfigNodes(env);
+ break;
+ }
+ }
+
+ private void rollbackFromLock(ConfigNodeProcedureEnv env) {
+ LOGGER.info("DropPipePluginProcedure: rollbackFromLock({})", pluginName);
+
+ env.getConfigManager()
+ .getPipeManager()
+ .getPipePluginCoordinator()
+ .getPipePluginInfo()
+ .releasePipePluginInfoLock();
+ }
+
+ private void rollbackFromDropOnDataNodes(ConfigNodeProcedureEnv env) {
+ LOGGER.info("DropPipePluginProcedure: rollbackFromDropOnDataNodes({})", pluginName);
+
+ // do nothing but wait for rolling back to the previous state: LOCK
+ // TODO: we should drop the pipe plugin on data nodes
+ }
+
+ private void rollbackFromDropOnConfigNodes(ConfigNodeProcedureEnv env) {
+ LOGGER.info("DropPipePluginProcedure: rollbackFromDropOnConfigNodes({})", pluginName);
+
+ // do nothing but wait for rolling back to the previous state: DROP_ON_DATA_NODES
+ // TODO: we should drop the pipe plugin on config nodes
+ }
+
+ @Override
+ protected boolean isRollbackSupported(DropPipePluginState state) {
+ switch (state) {
+ case LOCK:
+ case DROP_ON_DATA_NODES:
+ case DROP_ON_CONFIG_NODES:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ @Override
+ protected DropPipePluginState getState(int stateId) {
+ return DropPipePluginState.values()[stateId];
+ }
+
+ @Override
+ protected int getStateId(DropPipePluginState dropPipePluginState) {
+ return dropPipePluginState.ordinal();
+ }
+
+ @Override
+ protected DropPipePluginState getInitialState() {
+ return DropPipePluginState.LOCK;
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.writeShort(ProcedureType.DROP_PIPE_PLUGIN_PROCEDURE.getTypeCode());
+ super.serialize(stream);
+ ReadWriteIOUtils.write(pluginName, stream);
+ }
+
+ @Override
+ public void deserialize(ByteBuffer byteBuffer) {
+ super.deserialize(byteBuffer);
+ pluginName = ReadWriteIOUtils.readString(byteBuffer);
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that instanceof DropPipePluginProcedure) {
+ final DropPipePluginProcedure thatProcedure = (DropPipePluginProcedure) that;
+ return thatProcedure.getProcId() == getProcId()
+ && thatProcedure.getState() == getState()
+ && (thatProcedure.pluginName).equals(pluginName);
+ }
+ return false;
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/pipe/plugin/CreatePipePluginState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/pipe/plugin/CreatePipePluginState.java
new file mode 100644
index 0000000000..81b7992cff
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/pipe/plugin/CreatePipePluginState.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.state.pipe.plugin;
+
+public enum CreatePipePluginState {
+ LOCK,
+ CREATE_ON_CONFIG_NODES,
+ CREATE_ON_DATA_NODES,
+ UNLOCK
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/pipe/plugin/DropPipePluginState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/pipe/plugin/DropPipePluginState.java
new file mode 100644
index 0000000000..189606307d
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/pipe/plugin/DropPipePluginState.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.state.pipe.plugin;
+
+public enum DropPipePluginState {
+ LOCK,
+ DROP_ON_DATA_NODES,
+ DROP_ON_CONFIG_NODES,
+ UNLOCK
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index 48a4cfd997..42a421891f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.confignode.procedure.impl.model.DropModelProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
+import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.DeactivateTemplateProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.DeleteDatabaseProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.DeleteTimeSeriesProcedure;
@@ -117,6 +119,12 @@ public class ProcedureFactory implements IProcedureFactory {
case DROP_MODEL_PROCEDURE:
procedure = new DropModelProcedure();
break;
+ case CREATE_PIPE_PLUGIN_PROCEDURE:
+ procedure = new CreatePipePluginProcedure();
+ break;
+ case DROP_PIPE_PLUGIN_PROCEDURE:
+ procedure = new DropPipePluginProcedure();
+ break;
default:
LOGGER.error("unknown Procedure type: " + typeCode);
throw new IOException("unknown Procedure type: " + typeCode);
@@ -158,6 +166,10 @@ public class ProcedureFactory implements IProcedureFactory {
return ProcedureType.DEACTIVATE_TEMPLATE_PROCEDURE;
} else if (procedure instanceof UnsetTemplateProcedure) {
return ProcedureType.UNSET_TEMPLATE_PROCEDURE;
+ } else if (procedure instanceof CreatePipePluginProcedure) {
+ return ProcedureType.CREATE_PIPE_PLUGIN_PROCEDURE;
+ } else if (procedure instanceof DropPipePluginProcedure) {
+ return ProcedureType.DROP_PIPE_PLUGIN_PROCEDURE;
}
return null;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
index a5c9d33eb9..c5d6054646 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
@@ -59,7 +59,11 @@ public enum ProcedureType {
/** Ml Model */
CREATE_MODEL_PROCEDURE((short) 800),
- DROP_MODEL_PROCEDURE((short) 801);
+ DROP_MODEL_PROCEDURE((short) 801),
+
+ /** Pipe Plugin */
+ CREATE_PIPE_PLUGIN_PROCEDURE((short) 900),
+ DROP_PIPE_PLUGIN_PROCEDURE((short) 901);
private final short typeCode;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index ffa2c3749f..da9a7cd5e6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -708,6 +708,11 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
return configManager.getPipePluginTable();
}
+ @Override
+ public TGetJarInListResp getPipePluginJar(TGetJarInListReq req) throws TException {
+ return configManager.getPipePluginJar(req);
+ }
+
@Override
public TSStatus merge() throws TException {
return configManager.merge();
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/CreatePipePluginProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/CreatePipePluginProcedureTest.java
new file mode 100644
index 0000000000..cda15ec592
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/CreatePipePluginProcedureTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.pipe;
+
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
+import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class CreatePipePluginProcedureTest {
+ @Test
+ public void serializeDeserializeTest() {
+ PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+
+ PipePluginMeta pipePluginMeta =
+ new PipePluginMeta("test", "test.class", "test.jar", "testMD5test");
+ CreatePipePluginProcedure proc =
+ new CreatePipePluginProcedure(pipePluginMeta, new byte[] {1, 2, 3});
+
+ try {
+ proc.serialize(outputStream);
+ ByteBuffer buffer =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+ CreatePipePluginProcedure proc2 =
+ (CreatePipePluginProcedure) ProcedureFactory.getInstance().create(buffer);
+
+ assertEquals(proc, proc2);
+ assertEquals(new Binary(proc.getJarFile()), new Binary(proc2.getJarFile()));
+ } catch (Exception e) {
+ fail();
+ }
+ }
+}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java
new file mode 100644
index 0000000000..02f7c856f2
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/DropPipePluginProcedureTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.pipe;
+
+import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class DropPipePluginProcedureTest {
+ @Test
+ public void serializeDeserializeTest() {
+ PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+
+ DropPipePluginProcedure proc = new DropPipePluginProcedure("test");
+
+ try {
+ proc.serialize(outputStream);
+ ByteBuffer buffer =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+ DropPipePluginProcedure proc2 =
+ (DropPipePluginProcedure) ProcedureFactory.getInstance().create(buffer);
+ assertEquals(proc, proc2);
+ } catch (Exception e) {
+ fail();
+ }
+ }
+}
diff --git a/confignode/src/test/resources/confignode1conf/iotdb-common.properties b/confignode/src/test/resources/confignode1conf/iotdb-common.properties
index bc77b25874..8981b21285 100644
--- a/confignode/src/test/resources/confignode1conf/iotdb-common.properties
+++ b/confignode/src/test/resources/confignode1conf/iotdb-common.properties
@@ -24,4 +24,5 @@ schema_replication_factor=3
data_replication_factor=3
udf_lib_dir=target/confignode1/ext/udf
trigger_lib_dir=target/confignode1/ext/trigger
+pipe_lib_dir=target/confignode1/ext/pipe
config_node_ratis_log_appender_buffer_size_max = 14194304
\ No newline at end of file
diff --git a/confignode/src/test/resources/confignode2conf/iotdb-common.properties b/confignode/src/test/resources/confignode2conf/iotdb-common.properties
index 1e431a85af..a9789fedab 100644
--- a/confignode/src/test/resources/confignode2conf/iotdb-common.properties
+++ b/confignode/src/test/resources/confignode2conf/iotdb-common.properties
@@ -24,4 +24,5 @@ schema_replication_factor=3
data_replication_factor=3
udf_lib_dir=target/confignode2/ext/udf
trigger_lib_dir=target/confignode2/ext/trigger
+pipe_lib_dir=target/confignode2/ext/pipe
config_node_ratis_log_appender_buffer_size_max = 14194304
\ No newline at end of file
diff --git a/confignode/src/test/resources/confignode3conf/iotdb-common.properties b/confignode/src/test/resources/confignode3conf/iotdb-common.properties
index e784ec66d3..6a95388bd7 100644
--- a/confignode/src/test/resources/confignode3conf/iotdb-common.properties
+++ b/confignode/src/test/resources/confignode3conf/iotdb-common.properties
@@ -24,4 +24,5 @@ schema_replication_factor=3
data_replication_factor=3
udf_lib_dir=target/confignode3/ext/udf
trigger_lib_dir=target/confignode3/ext/trigger
+pipe_lib_dir=target/confignode3/ext/pipe
config_node_ratis_log_appender_buffer_size_max = 14194304
\ No newline at end of file
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 14ff363c31..0ac0c6f4ce 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -892,6 +892,15 @@ cluster_name=defaultCluster
### Continuous Query Configuration
####################
+# Uncomment the following field to configure the pipe lib directory.
+# For Window platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is
+# absolute. Otherwise, it is relative.
+# pipe_lib_dir=ext\\pipe
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+# pipe_lib_dir=ext/pipe
+
# The number of threads in the scheduled thread pool that submit continuous query tasks periodically
# Datatype: int
# continuous_query_submit_thread_count=2
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoaderManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoaderManager.java
index 3766af8bd1..4151af3bc2 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoaderManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoaderManager.java
@@ -89,7 +89,7 @@ public class PipePluginClassLoaderManager implements IService {
private static PipePluginClassLoaderManager INSTANCE = null;
- public static synchronized PipePluginClassLoaderManager getInstance(String libRoot)
+ public static synchronized PipePluginClassLoaderManager setupAndGetInstance(String libRoot)
throws IOException {
if (INSTANCE == null) {
INSTANCE = new PipePluginClassLoaderManager(libRoot);
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java
index fedfa1c3d0..fbdf8684e9 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java
@@ -168,7 +168,7 @@ public class UDFManagementService {
UDFExecutableManager.getInstance().getInstallDir()
+ File.separator
+ udfInformation.getJarName())));
- // save the md5 in a txt under trigger temporary lib
+ // save the md5 in a txt under UDF temporary lib
UDFExecutableManager.getInstance()
.saveTextAsFileUnderTemporaryRoot(existedMd5, md5FilePath);
} catch (IOException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index a0673904c1..83d295f473 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -1467,6 +1467,22 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
throw new TException(MSG_RECONNECTION_FAIL);
}
+ @Override
+ public TGetJarInListResp getPipePluginJar(TGetJarInListReq req) throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TGetJarInListResp resp = client.getPipePluginJar(req);
+ if (!updateConfigNodeLeader(resp.getStatus())) {
+ return resp;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ waitAndReconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
@Override
public TSStatus createSchemaTemplate(TCreateSchemaTemplateReq req) throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 47917ace72..f4ea5ba157 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -271,6 +271,13 @@ public class IoTDBConfig {
private String triggerTemporaryLibDir =
triggerDir + File.separator + IoTDBConstant.TMP_FOLDER_NAME;
+ /** External lib directory for Pipe Plugin, stores user-defined JAR files */
+ private String pipeDir =
+ IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.PIPE_FOLDER_NAME;
+
+ /** External temporary lib directory for storing downloaded pipe plugin JAR files */
+ private String pipeTemporaryLibDir = pipeDir + File.separator + IoTDBConstant.TMP_FOLDER_NAME;
+
/** External lib directory for ext Pipe plugins, stores user-defined JAR files */
private String extPipeDir =
IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.EXT_PIPE_FOLDER_NAME;
@@ -1412,6 +1419,23 @@ public class IoTDBConfig {
this.triggerTemporaryLibDir = triggerDir + File.separator + IoTDBConstant.TMP_FOLDER_NAME;
}
+ public String getPipeDir() {
+ return pipeDir;
+ }
+
+ public void setPipeDir(String pipeDir) {
+ this.pipeDir = pipeDir;
+ updatePipeTemporaryLibDir();
+ }
+
+ public String getPipeTemporaryLibDir() {
+ return pipeTemporaryLibDir;
+ }
+
+ public void updatePipeTemporaryLibDir() {
+ this.pipeTemporaryLibDir = pipeDir + File.separator + IoTDBConstant.TMP_FOLDER_NAME;
+ }
+
public String getMqttDir() {
return mqttDir;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 6bd05d6f86..26e0544248 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1006,6 +1006,9 @@ public class IoTDBDescriptor {
// CQ
loadCQProps(properties);
+ // Pipe
+ loadPipeProps(properties);
+
// cluster
loadClusterProps(properties);
@@ -1787,6 +1790,10 @@ public class IoTDBDescriptor {
Integer.toString(conf.getTriggerForwardMQTTPoolSize()))));
}
+ private void loadPipeProps(Properties properties) {
+ conf.setPipeDir(properties.getProperty("pipe_lib_dir", conf.getPipeDir()));
+ }
+
private void loadCQProps(Properties properties) {
conf.setContinuousQueryThreadNum(
Integer.parseInt(
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java
index d820441dac..dc0565d4a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipePluginAgent.java
@@ -93,7 +93,15 @@ public class PipePluginAgent {
}
}
- private void doRegister(PipePluginMeta pipePluginMeta) throws PipeManagementException {
+ /**
+ * Register a PipePlugin to the system without any meta checks. The PipePlugin will be loaded by
+ * the PipePluginClassLoader and its instance will be created to ensure that it can be loaded.
+ *
+ * @param pipePluginMeta the meta information of the PipePlugin
+ * @throws PipeManagementException if the PipePlugin can not be loaded or its instance can not be
+ * created
+ */
+ public void doRegister(PipePluginMeta pipePluginMeta) throws PipeManagementException {
final String pluginName = pipePluginMeta.getPluginName();
final String className = pipePluginMeta.getClassName();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 7747bcdd08..c86e919e6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -30,6 +30,9 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
+import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
+import org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
import org.apache.iotdb.commons.service.JMXService;
import org.apache.iotdb.commons.service.RegisterManager;
import org.apache.iotdb.commons.service.metric.MetricService;
@@ -67,6 +70,7 @@ import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
import org.apache.iotdb.db.mpp.execution.schedule.DriverScheduler;
+import org.apache.iotdb.db.pipe.agent.PipePluginAgent;
import org.apache.iotdb.db.protocol.rest.RestService;
import org.apache.iotdb.db.service.metrics.DataNodeMetricsHelper;
import org.apache.iotdb.db.service.metrics.IoTDBInternalLocalReporter;
@@ -80,6 +84,7 @@ import org.apache.iotdb.db.wal.WALManager;
import org.apache.iotdb.db.wal.utils.WALMode;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.metrics.utils.InternalReporterType;
+import org.apache.iotdb.pipe.api.exception.PipeManagementException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.udf.api.exception.UDFManagementException;
@@ -312,7 +317,9 @@ public class DataNode implements DataNodeMBean {
*
* <p>4. All trigger information
*
- * <p>5. All TTL information
+ * <p>5. All Pipe information
+ *
+ * <p>6. All TTL information
*/
private void storeRuntimeConfigurations(
List<TConfigNodeLocation> configNodeLocations, TRuntimeConfiguration runtimeConfiguration) {
@@ -333,6 +340,9 @@ public class DataNode implements DataNodeMBean {
/* Store triggerInformationList */
getTriggerInformationList(runtimeConfiguration.getAllTriggerInformation());
+ /* Store pipeInformationList */
+ getPipeInformationList(runtimeConfiguration.getAllPipeInformation());
+
/* Store ttl information */
StorageEngine.getInstance().updateTTLInfo(runtimeConfiguration.getAllTTLInformation());
}
@@ -451,6 +461,7 @@ public class DataNode implements DataNodeMBean {
private void prepareResources() throws StartupException {
prepareUDFResources();
prepareTriggerResources();
+ preparePipePluginResources();
}
/** register services and set up DataNode */
@@ -638,7 +649,7 @@ public class DataNode implements DataNodeMBean {
getJarOfUDFs(curList);
}
- // create instances of triggers and do registration
+ // create instances of udf and do registration
try {
for (UDFInformation udfInformation : resourcesInformationHolder.getUDFInformationList()) {
UDFManagementService.getInstance().doRegister(udfInformation);
@@ -678,7 +689,7 @@ public class DataNode implements DataNodeMBean {
List<UDFInformation> res = new ArrayList<>();
for (UDFInformation udfInformation : resourcesInformationHolder.getUDFInformationList()) {
if (udfInformation.isUsingURI()) {
- // jar does not exist, add current triggerInformation to list
+ // jar does not exist, add current udfInformation to list
if (!UDFExecutableManager.getInstance()
.hasFileUnderInstallDir(udfInformation.getJarName())) {
res.add(udfInformation);
@@ -821,6 +832,99 @@ public class DataNode implements DataNodeMBean {
}
}
+ private void preparePipePluginResources() throws StartupException {
+ initPipePluginRelatedInstance();
+ if (resourcesInformationHolder.getPipePluginMetaList() == null
+ || resourcesInformationHolder.getPipePluginMetaList().isEmpty()) {
+ return;
+ }
+
+ // get jars from config node
+ List<PipePluginMeta> pipePluginNeedJarList = getJarListForPipePlugin();
+ int index = 0;
+ while (index < pipePluginNeedJarList.size()) {
+ List<PipePluginMeta> curList = new ArrayList<>();
+ int offset = 0;
+ while (offset < ResourcesInformationHolder.getJarNumOfOneRpc()
+ && index + offset < pipePluginNeedJarList.size()) {
+ curList.add(pipePluginNeedJarList.get(index + offset));
+ offset++;
+ }
+ index += (offset + 1);
+ getJarOfPipePlugins(curList);
+ }
+
+ // create instances of pipe plugins and do registration
+ try {
+ for (PipePluginMeta meta : resourcesInformationHolder.getPipePluginMetaList()) {
+ PipePluginAgent.getInstance().doRegister(meta);
+ }
+ } catch (Exception e) {
+ throw new StartupException(e);
+ }
+ }
+
+ private void initPipePluginRelatedInstance() throws StartupException {
+ try {
+ PipePluginExecutableManager.setupAndGetInstance(
+ config.getPipeTemporaryLibDir(), config.getPipeDir());
+ PipePluginClassLoaderManager.setupAndGetInstance(config.getPipeDir());
+ } catch (IOException e) {
+ throw new StartupException(e);
+ }
+ }
+
+ private List<PipePluginMeta> getJarListForPipePlugin() {
+ List<PipePluginMeta> res = new ArrayList<>();
+ for (PipePluginMeta pipePluginMeta : resourcesInformationHolder.getPipePluginMetaList()) {
+ // If jar does not exist, add current pipePluginMeta to list
+ if (!PipePluginExecutableManager.getInstance()
+ .hasFileUnderInstallDir(pipePluginMeta.getJarName())) {
+ res.add(pipePluginMeta);
+ } else {
+ try {
+ // local jar has conflicts with jar on config node, add current pipePluginMeta to list
+ if (!PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) {
+ res.add(pipePluginMeta);
+ }
+ } catch (PipeManagementException e) {
+ res.add(pipePluginMeta);
+ }
+ }
+ }
+ return res;
+ }
+
+ private void getJarOfPipePlugins(List<PipePluginMeta> pipePluginMetaList)
+ throws StartupException {
+ try (ConfigNodeClient configNodeClient =
+ ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ List<String> jarNameList =
+ pipePluginMetaList.stream().map(PipePluginMeta::getJarName).collect(Collectors.toList());
+ TGetJarInListResp resp = configNodeClient.getPipePluginJar(new TGetJarInListReq(jarNameList));
+ if (resp.getStatus().getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
+ throw new StartupException("Failed to get pipe plugin jar from config node.");
+ }
+ List<ByteBuffer> jarList = resp.getJarList();
+ for (int i = 0; i < pipePluginMetaList.size(); i++) {
+ PipePluginExecutableManager.getInstance()
+ .saveToInstallDir(jarList.get(i), pipePluginMetaList.get(i).getJarName());
+ }
+ } catch (IOException | TException | ClientManagerException e) {
+ throw new StartupException(e);
+ }
+ }
+
+ private void getPipeInformationList(List<ByteBuffer> allPipeInformation) {
+ if (allPipeInformation != null && !allPipeInformation.isEmpty()) {
+ List<PipePluginMeta> list = new ArrayList<>();
+ for (ByteBuffer pipeInformationByteBuffer : allPipeInformation) {
+ list.add(PipePluginMeta.deserialize(pipeInformationByteBuffer));
+ }
+ resourcesInformationHolder.setPipePluginMetaList(list);
+ }
+ }
+
private void initSchemaEngine() {
long time = System.currentTimeMillis();
SchemaEngine.getInstance().init();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ResourcesInformationHolder.java b/server/src/main/java/org/apache/iotdb/db/service/ResourcesInformationHolder.java
index f26436c1d3..e650ca4856 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ResourcesInformationHolder.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ResourcesInformationHolder.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.service;
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.commons.udf.UDFInformation;
@@ -33,6 +34,9 @@ public class ResourcesInformationHolder {
/** store the list when registering in config node for preparing trigger related resources */
private List<TriggerInformation> triggerInformationList;
+ /** store the list when registering in config node for preparing pipe plugin related resources */
+ private List<PipePluginMeta> pipePluginMetaList;
+
public static int getJarNumOfOneRpc() {
return JAR_NUM_OF_ONE_RPC;
}
@@ -52,4 +56,12 @@ public class ResourcesInformationHolder {
public void setTriggerInformationList(List<TriggerInformation> triggerInformationList) {
this.triggerInformationList = triggerInformationList;
}
+
+ public List<PipePluginMeta> getPipePluginMetaList() {
+ return pipePluginMetaList;
+ }
+
+ public void setPipePluginMetaList(List<PipePluginMeta> pipePluginMetaList) {
+ this.pipePluginMetaList = pipePluginMetaList;
+ }
}
diff --git a/server/src/test/resources/datanode1conf/iotdb-common.properties b/server/src/test/resources/datanode1conf/iotdb-common.properties
index 553d89d795..ba22251554 100644
--- a/server/src/test/resources/datanode1conf/iotdb-common.properties
+++ b/server/src/test/resources/datanode1conf/iotdb-common.properties
@@ -19,4 +19,5 @@
timestamp_precision=ms
udf_lib_dir=target/datanode1/ext/udf
-trigger_lib_dir=target/datanode1/ext/trigger
\ No newline at end of file
+trigger_lib_dir=target/datanode1/ext/trigger
+pipe_lib_dir=target/datanode1/ext/pipe
diff --git a/server/src/test/resources/datanode2conf/iotdb-common.properties b/server/src/test/resources/datanode2conf/iotdb-common.properties
index 80add439f2..d75d1b5202 100644
--- a/server/src/test/resources/datanode2conf/iotdb-common.properties
+++ b/server/src/test/resources/datanode2conf/iotdb-common.properties
@@ -19,4 +19,5 @@
timestamp_precision=ms
udf_lib_dir=target/datanode2/ext/udf
-trigger_lib_dir=target/datanode2/ext/trigger
\ No newline at end of file
+trigger_lib_dir=target/datanode2/ext/trigger
+pipe_lib_dir=target/datanode2/ext/pipe
diff --git a/server/src/test/resources/datanode3conf/iotdb-common.properties b/server/src/test/resources/datanode3conf/iotdb-common.properties
index 7a06e6d735..ebd7d4fa59 100644
--- a/server/src/test/resources/datanode3conf/iotdb-common.properties
+++ b/server/src/test/resources/datanode3conf/iotdb-common.properties
@@ -19,4 +19,5 @@
timestamp_precision=ms
udf_lib_dir=target/datanode3/ext/udf
-trigger_lib_dir=target/datanode3/ext/trigger
\ No newline at end of file
+trigger_lib_dir=target/datanode3/ext/trigger
+pipe_lib_dir=target/datanode3/ext/pipe
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index d9b379e9a1..fc866714f2 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -90,6 +90,7 @@ struct TRuntimeConfiguration {
2: required list<binary> allTriggerInformation
3: required list<binary> allUDFInformation
4: required binary allTTLInformation
+ 5: required list<binary> allPipeInformation
}
struct TDataNodeRegisterReq {
@@ -1100,6 +1101,11 @@ service IConfigNodeRPCService {
*/
TGetPipePluginTableResp getPipePluginTable();
+ /**
+ * Return the pipe plugin jar list of the plugin name list
+ */
+ TGetJarInListResp getPipePluginJar(TGetJarInListReq req)
+
// ======================================================
// Maintenance Tools
// ======================================================