You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/04/24 03:11:47 UTC
[iotdb] branch master updated: [IOTDB-5788] Built-in pipe plug-in management mechanism (#9680)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e3e83fe8aa [IOTDB-5788] Built-in pipe plug-in management mechanism (#9680)
e3e83fe8aa is described below
commit e3e83fe8aae3733bc27845ade7ffa20df08c9a04
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon Apr 24 11:11:40 2023 +0800
[IOTDB-5788] Built-in pipe plug-in management mechanism (#9680)
1. support built-in pipe plugins. do_nothing_processor & do_nothing_connector are added as built-in plugins.
2. allow to create / drop the same function multiple times without errors at PipePluginAgent / PipePluginInfo to adapt the retry policy of the Procedure mechanism.
3. allow to drop a function that is not registered on leader config node so that user can have a chance to change the inconsistant registration state over different config / data nodes.
4. fix ci build error: https://ci-builds.apache.org/job/IoTDB/job/IoTDB-pip-new/job/master/173/console
------
1. 支持内置 Pipe 插件,为内置插件适配了增、删、启动流程
2. 幂等操作支持:允许在 PipePluginAgent / PipePluginInfo 层面对同一个插件进行多次增、删(不对已经注册的插件进行检查,允许 procedure 的多次执行)
3. 创建:仅仅在 coordinator 层面检查是否合规。删除:coordinator 不对是否创建过进行检查,允许用户多次 drop pipe plugin 来消除不一致状态(即使该插件已经在 leader confignode 上被 drop)
4. 修复 ci build error: https://ci-builds.apache.org/job/IoTDB/job/IoTDB-pip-new/job/master/173/console
---
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 3 +-
.../manager/pipe/PipePluginCoordinator.java | 2 +-
.../persistence/pipe/PipePluginInfo.java | 37 ++++++-----
.../impl/pipe/plugin/DropPipePluginProcedure.java | 6 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 1 -
.../request/ConfigPhysicalPlanSerDeTest.java | 2 +-
.../iotdb/confignode/persistence/PipeInfoTest.java | 2 +-
.../pipe/plugin/CreatePipePluginProcedureTest.java | 2 +-
.../iotdb/it/env/cluster/AbstractNodeWrapper.java | 1 +
.../pipe/plugin/builtin/BuiltinPipePlugin.java | 55 +++++++++++++++++
.../builtin/connector/DoNothingConnector.java | 72 ++++++++++++++++++++++
.../builtin/processor/DoNothingProcessor.java | 68 ++++++++++++++++++++
.../meta/ConfigNodePipePluginMetaKeeper.java | 25 +++-----
.../plugin/meta/DataNodePipePluginMetaKeeper.java | 28 ++++++---
.../commons/pipe/plugin/meta/PipePluginMeta.java | 65 +++++++++++++------
.../pipe/plugin/meta/PipePluginMetaKeeper.java | 62 +++++++++++++++----
.../db/mpp/common/header/ColumnHeaderConstant.java | 2 +
.../config/executor/ClusterConfigTaskExecutor.java | 3 +-
.../config/metadata/ShowPipePluginsTask.java | 18 +++++-
.../org/apache/iotdb/db/pipe/agent/PipeAgent.java | 5 +-
.../db/pipe/agent/plugin/PipePluginAgent.java | 41 +++++++++---
.../java/org/apache/iotdb/db/service/DataNode.java | 6 ++
22 files changed, 408 insertions(+), 98 deletions(-)
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index c145d4fe21..5da25971ce 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -52,6 +52,8 @@ ddlStatement
| createFunction | dropFunction | showFunctions
// Trigger
| createTrigger | dropTrigger | showTriggers | startTrigger | stopTrigger
+ // Pipe Plugin
+ | createPipePlugin | dropPipePlugin | showPipePlugins
// CQ
| createContinuousQuery | dropContinuousQuery | showContinuousQueries
// Cluster
@@ -490,7 +492,6 @@ showPipePlugins
: SHOW PIPEPLUGINS
;
-
// ML Model =========================================================================================
// ---- Create Model
createModel
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
index 91da899463..8943a28e8c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/PipePluginCoordinator.java
@@ -69,7 +69,7 @@ public class PipePluginCoordinator {
final String jarName = req.getJarName();
final String jarMD5 = req.getJarMD5();
final PipePluginMeta pipePluginMeta =
- new PipePluginMeta(pluginName, className, jarName, jarMD5);
+ new PipePluginMeta(pluginName, className, false, jarName, jarMD5);
return configManager.getProcedureManager().createPipePlugin(pipePluginMeta, req.getJarFile());
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 0b47dccb6c..3818070dac 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -84,6 +84,7 @@ public class PipePluginInfo implements SnapshotProcessor {
/////////////////////////////// Validator ///////////////////////////////
public void validateBeforeCreatingPipePlugin(String pluginName, String jarName, String jarMD5) {
+ // both build-in and user defined pipe plugin should be unique
if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
throw new PipeManagementException(
String.format(
@@ -100,12 +101,13 @@ public class PipePluginInfo implements SnapshotProcessor {
}
public void validateBeforeDroppingPipePlugin(String pluginName) {
- if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
- return;
+ if (pipePluginMetaKeeper.containsPipePlugin(pluginName)
+ && pipePluginMetaKeeper.getPipePluginMeta(pluginName).isBuiltin()) {
+ throw new PipeManagementException(
+ String.format(
+ "Failed to drop PipePlugin [%s], the PipePlugin is a built-in PipePlugin",
+ pluginName));
}
-
- throw new PipeManagementException(
- String.format("Failed to drop PipePlugin [%s], the PipePlugin does not exist", pluginName));
}
public boolean isJarNeededToBeSavedWhenCreatingPipePlugin(String jarName) {
@@ -119,32 +121,37 @@ public class PipePluginInfo implements SnapshotProcessor {
/////////////////////////////// Pipe Plugin Management ///////////////////////////////
- public TSStatus createPipePlugin(CreatePipePluginPlan physicalPlan) {
+ public TSStatus createPipePlugin(CreatePipePluginPlan createPipePluginPlan) {
try {
- final PipePluginMeta pipePluginMeta = physicalPlan.getPipePluginMeta();
+ final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta();
+
+ // try to drop the old pipe plugin if exists to reduce the effect of the inconsistency
+ dropPipePlugin(new DropPipePluginPlan(pipePluginMeta.getPluginName()));
+
pipePluginMetaKeeper.addPipePluginMeta(pipePluginMeta.getPluginName(), pipePluginMeta);
pipePluginMetaKeeper.addJarNameAndMd5(
pipePluginMeta.getJarName(), pipePluginMeta.getJarMD5());
- if (physicalPlan.getJarFile() != null) {
+ if (createPipePluginPlan.getJarFile() != null) {
pipePluginExecutableManager.saveToInstallDir(
- ByteBuffer.wrap(physicalPlan.getJarFile().getValues()), pipePluginMeta.getJarName());
+ ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()),
+ pipePluginMeta.getJarName());
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (Exception e) {
final String errorMessage =
String.format(
- "Failed to add PipePlugin [%s] in PipePlugin_Table on Config Nodes, because of %s",
- physicalPlan.getPipePluginMeta().getPluginName(), e);
+ "Failed to execute createPipePlugin(%s) on config nodes, because of %s",
+ createPipePluginPlan.getPipePluginMeta().getPluginName(), e);
LOGGER.warn(errorMessage, e);
return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
.setMessage(errorMessage);
}
}
- public TSStatus dropPipePlugin(DropPipePluginPlan physicalPlan) {
- final String pluginName = physicalPlan.getPluginName();
+ public TSStatus dropPipePlugin(DropPipePluginPlan dropPipePluginPlan) {
+ final String pluginName = dropPipePluginPlan.getPluginName();
if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
pipePluginMetaKeeper.removeJarNameAndMd5IfPossible(
@@ -161,10 +168,10 @@ public class PipePluginInfo implements SnapshotProcessor {
Arrays.asList(pipePluginMetaKeeper.getAllPipePluginMeta()));
}
- public JarResp getPipePluginJar(GetPipePluginJarPlan physicalPlan) {
+ public JarResp getPipePluginJar(GetPipePluginJarPlan getPipePluginJarPlan) {
try {
List<ByteBuffer> jarList = new ArrayList<>();
- for (String jarName : physicalPlan.getJarNames()) {
+ for (String jarName : getPipePluginJarPlan.getJarNames()) {
jarList.add(
ExecutableManager.transferToBytebuffer(
PipePluginExecutableManager.getInstance()
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index 6aa4d6109d..7c8268ecd0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -110,7 +110,7 @@ public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePlugi
try {
pipePluginCoordinator.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName);
} catch (PipeManagementException e) {
- // if the pipe plugin is not exist, we should end the procedure
+ // if the pipe plugin is a built-in plugin, we should not drop it
LOGGER.warn(e.getMessage());
setFailure(new ProcedureException(e.getMessage()));
pipePluginCoordinator.unlock();
@@ -180,14 +180,14 @@ public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePlugi
LOGGER.info("DropPipePluginProcedure: rollbackFromDropOnDataNodes({})", pluginName);
// do nothing but wait for rolling back to the previous state: LOCK
- // TODO: we should drop the pipe plugin on data nodes
+ // TODO: we should drop the pipe plugin on data nodes properly with RuntimeAgent's help
}
private void rollbackFromDropOnConfigNodes(ConfigNodeProcedureEnv env) {
LOGGER.info("DropPipePluginProcedure: rollbackFromDropOnConfigNodes({})", pluginName);
// do nothing but wait for rolling back to the previous state: DROP_ON_DATA_NODES
- // TODO: we should drop the pipe plugin on config nodes
+ // TODO: we should drop the pipe plugin on config nodes properly with RuntimeCoordinator's help
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 5413818646..04ddd4a019 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -903,7 +903,6 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
}
@Override
- @Deprecated
public TGetAllPipeInfoResp getAllPipeInfo() {
return configManager.getAllPipeInfo();
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 675858f1e0..55a2e622aa 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -1098,7 +1098,7 @@ public class ConfigPhysicalPlanSerDeTest {
public void CreatePipePluginPlanTest() throws IOException {
CreatePipePluginPlan createPipePluginPlan =
new CreatePipePluginPlan(
- new PipePluginMeta("testPlugin", "org.apache.iotdb.testJar", "testJar", "???"),
+ new PipePluginMeta("testPlugin", "org.apache.iotdb.TestJar", false, "test.jar", "???"),
new Binary("123"));
CreatePipePluginPlan createPipePluginPlan1 =
(CreatePipePluginPlan)
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
index 9babc2d872..0246759c98 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
@@ -84,7 +84,7 @@ public class PipeInfoTest {
CreatePipePluginPlan createPipePluginPlan =
new CreatePipePluginPlan(
- new PipePluginMeta("testPlugin", "org.apache.iotdb.testJar", "testJar", "???"),
+ new PipePluginMeta("testPlugin", "org.apache.iotdb.TestJar", false, "test.jar", "???"),
new Binary("123"));
pipeInfo.getPipePluginInfo().createPipePlugin(createPipePluginPlan);
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java
index 729e3a01f7..103ca7963f 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java
@@ -39,7 +39,7 @@ public class CreatePipePluginProcedureTest {
DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
PipePluginMeta pipePluginMeta =
- new PipePluginMeta("test", "test.class", "test.jar", "testMD5test");
+ new PipePluginMeta("test", "test.class", false, "test.jar", "testMD5test");
CreatePipePluginProcedure proc =
new CreatePipePluginProcedure(pipePluginMeta, new byte[] {1, 2, 3});
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractNodeWrapper.java
index a7bd8513cb..62a859bd45 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractNodeWrapper.java
@@ -127,6 +127,7 @@ public abstract class AbstractNodeWrapper implements BaseNodeWrapper {
// these properties can't be mutated.
immutableCommonProperties.setProperty("udf_lib_dir", MppBaseConfig.NULL_VALUE);
immutableCommonProperties.setProperty("trigger_lib_dir", MppBaseConfig.NULL_VALUE);
+ immutableCommonProperties.setProperty("pipe_lib_dir", MppBaseConfig.NULL_VALUE);
immutableCommonProperties.setProperty("mqtt_host", MppBaseConfig.NULL_VALUE);
immutableCommonProperties.setProperty("mqtt_port", MppBaseConfig.NULL_VALUE);
immutableCommonProperties.setProperty("rest_service_port", MppBaseConfig.NULL_VALUE);
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
new file mode 100644
index 0000000000..d6972646d4
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.builtin;
+
+import org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector;
+import org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor;
+
+public enum BuiltinPipePlugin {
+
+ // processors
+ DO_NOTHING_PROCESSOR("do_nothing_processor", DoNothingProcessor.class),
+
+ // connectors
+ DO_NOTHING_CONNECTOR("do_nothing_connector", DoNothingConnector.class),
+ ;
+
+ private final String pipePluginName;
+ private final Class<?> pipePluginClass;
+ private final String className;
+
+ BuiltinPipePlugin(String functionName, Class<?> pipePluginClass) {
+ this.pipePluginName = functionName;
+ this.pipePluginClass = pipePluginClass;
+ this.className = pipePluginClass.getName();
+ }
+
+ public String getPipePluginName() {
+ return pipePluginName;
+ }
+
+ public Class<?> getPipePluginClass() {
+ return pipePluginClass;
+ }
+
+ public String getClassName() {
+ return className;
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
new file mode 100644
index 0000000000..503005fefa
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
+
+import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+
+public class DoNothingConnector implements PipeConnector {
+
+ @Override
+ public void validate(PipeParameterValidator validator) {
+ // do nothing
+ }
+
+ @Override
+ public void customize(
+ PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration) {
+ // do nothing
+ }
+
+ @Override
+ public void handshake() {
+ // do nothing
+ }
+
+ @Override
+ public void heartbeat() {
+ // do nothing
+ }
+
+ @Override
+ public void transfer(TabletInsertionEvent tabletInsertionEvent) {
+ // do nothing
+ }
+
+ @Override
+ public void transfer(TsFileInsertionEvent tsFileInsertionEvent) {
+ // do nothing
+ }
+
+ @Override
+ public void transfer(DeletionEvent deletionEvent) {
+ // do nothing
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
new file mode 100644
index 0000000000..45634119db
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.builtin.processor;
+
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+
+import java.io.IOException;
+
+public class DoNothingProcessor implements PipeProcessor {
+
+ @Override
+ public void validate(PipeParameterValidator validator) {
+ // do nothing
+ }
+
+ @Override
+ public void customize(
+ PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) {
+ // do nothing
+ }
+
+ @Override
+ public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector)
+ throws IOException {
+ eventCollector.collectTabletInsertionEvent(tabletInsertionEvent);
+ }
+
+ @Override
+ public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector)
+ throws IOException {
+ eventCollector.collectTsFileInsertionEvent(tsFileInsertionEvent);
+ }
+
+ @Override
+ public void process(DeletionEvent deletionEvent, EventCollector eventCollector)
+ throws IOException {
+ eventCollector.collectDeletionEvent(deletionEvent);
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java
index c02fdeadb6..7c3632e4b4 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java
@@ -33,7 +33,8 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
protected final Map<String, Integer> jarNameToReferenceCountMap;
public ConfigNodePipePluginMetaKeeper() {
- super();
+ loadBuiltInPlugins();
+
jarNameToMd5Map = new HashMap<>();
jarNameToReferenceCountMap = new HashMap<>();
}
@@ -67,6 +68,7 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
}
}
+ @Override
public void processTakeSnapshot(OutputStream outputStream) throws IOException {
ReadWriteIOUtils.write(jarNameToMd5Map.size(), outputStream);
for (Map.Entry<String, String> entry : jarNameToMd5Map.entrySet()) {
@@ -75,14 +77,13 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
ReadWriteIOUtils.write(jarNameToReferenceCountMap.get(entry.getKey()), outputStream);
}
- ReadWriteIOUtils.write(pipeNameToPipeMetaMap.size(), outputStream);
- for (PipePluginMeta pipePluginMeta : pipeNameToPipeMetaMap.values()) {
- ReadWriteIOUtils.write(pipePluginMeta.serialize(), outputStream);
- }
+ super.processTakeSnapshot(outputStream);
}
+ @Override
public void processLoadSnapshot(InputStream inputStream) throws IOException {
- clear();
+ jarNameToMd5Map.clear();
+ jarNameToReferenceCountMap.clear();
final int jarSize = ReadWriteIOUtils.readInt(inputStream);
for (int i = 0; i < jarSize; i++) {
@@ -93,16 +94,6 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
jarNameToReferenceCountMap.put(jarName, count);
}
- final int pipePluginMetaSize = ReadWriteIOUtils.readInt(inputStream);
- for (int i = 0; i < pipePluginMetaSize; i++) {
- final PipePluginMeta pipePluginMeta = PipePluginMeta.deserialize(inputStream);
- addPipePluginMeta(pipePluginMeta.getPluginName().toUpperCase(), pipePluginMeta);
- }
- }
-
- public void clear() {
- pipeNameToPipeMetaMap.clear();
- jarNameToMd5Map.clear();
- jarNameToReferenceCountMap.clear();
+ super.processLoadSnapshot(inputStream);
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/DataNodePipePluginMetaKeeper.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/DataNodePipePluginMetaKeeper.java
index 64d7c8ef94..42656ad870 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/DataNodePipePluginMetaKeeper.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/DataNodePipePluginMetaKeeper.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.commons.pipe.plugin.meta;
+import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoader;
import java.util.Map;
@@ -26,28 +27,39 @@ import java.util.concurrent.ConcurrentHashMap;
public class DataNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
- private final Map<String, Class<?>> pipeNameToPipeClassMap;
+ private final Map<String, Class<?>> pipePluginNameToClassMap;
public DataNodePipePluginMetaKeeper() {
- super();
- pipeNameToPipeClassMap = new ConcurrentHashMap<>();
+ pipePluginNameToClassMap = new ConcurrentHashMap<>();
+
+ loadBuiltInPlugins();
+ }
+
+ @Override
+ protected void loadBuiltInPlugins() {
+ super.loadBuiltInPlugins();
+
+ for (final BuiltinPipePlugin builtinPipePlugin : BuiltinPipePlugin.values()) {
+ addPluginAndClass(
+ builtinPipePlugin.getPipePluginName(), builtinPipePlugin.getPipePluginClass());
+ }
}
public void addPluginAndClass(String pluginName, Class<?> clazz) {
- pipeNameToPipeClassMap.put(pluginName.toUpperCase(), clazz);
+ pipePluginNameToClassMap.put(pluginName.toUpperCase(), clazz);
}
public Class<?> getPluginClass(String pluginName) {
- return pipeNameToPipeClassMap.get(pluginName.toUpperCase());
+ return pipePluginNameToClassMap.get(pluginName.toUpperCase());
}
public void removePluginClass(String pluginName) {
- pipeNameToPipeClassMap.remove(pluginName.toUpperCase());
+ pipePluginNameToClassMap.remove(pluginName.toUpperCase());
}
public void updatePluginClass(PipePluginMeta pipePluginMeta, PipePluginClassLoader classLoader)
throws ClassNotFoundException {
- Class<?> functionClass = Class.forName(pipePluginMeta.getClassName(), true, classLoader);
- pipeNameToPipeClassMap.put(pipePluginMeta.getPluginName().toUpperCase(), functionClass);
+ final Class<?> functionClass = Class.forName(pipePluginMeta.getClassName(), true, classLoader);
+ pipePluginNameToClassMap.put(pipePluginMeta.getPluginName().toUpperCase(), functionClass);
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMeta.java
index 08bcc614f0..517d5f6384 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMeta.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMeta.java
@@ -26,24 +26,45 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.util.Objects;
public class PipePluginMeta {
- private String pluginName;
-
- private String className;
-
- private String jarName;
+ private final String pluginName;
+ private final String className;
+
+ // jarName and jarMD5 are used to identify the jar file.
+ // they could be null if the plugin is built-in. they should be both null or both not null.
+ private final boolean isBuiltin;
+ private final String jarName;
+ private final String jarMD5;
+
+ public PipePluginMeta(
+ String pluginName, String className, boolean isBuiltin, String jarName, String jarMD5) {
+ this.pluginName = Objects.requireNonNull(pluginName).toUpperCase();
+ this.className = Objects.requireNonNull(className);
+
+ this.isBuiltin = isBuiltin;
+ if (isBuiltin) {
+ this.jarName = jarName;
+ this.jarMD5 = jarMD5;
+ } else {
+ this.jarName = Objects.requireNonNull(jarName);
+ this.jarMD5 = Objects.requireNonNull(jarMD5);
+ }
+ }
- private String jarMD5;
+ public PipePluginMeta(String pluginName, String className) {
+ this.pluginName = Objects.requireNonNull(pluginName).toUpperCase();
+ this.className = Objects.requireNonNull(className);
- private PipePluginMeta() {}
+ this.isBuiltin = true;
+ this.jarName = null;
+ this.jarMD5 = null;
+ }
- public PipePluginMeta(String pluginName, String className, String jarName, String jarMD5) {
- this.pluginName = pluginName.toUpperCase();
- this.className = className;
- this.jarName = jarName;
- this.jarMD5 = jarMD5;
+ public boolean isBuiltin() {
+ return isBuiltin;
}
public String getPluginName() {
@@ -72,17 +93,18 @@ public class PipePluginMeta {
public void serialize(DataOutputStream outputStream) throws IOException {
ReadWriteIOUtils.write(pluginName, outputStream);
ReadWriteIOUtils.write(className, outputStream);
+ ReadWriteIOUtils.write(isBuiltin, outputStream);
ReadWriteIOUtils.write(jarName, outputStream);
ReadWriteIOUtils.write(jarMD5, outputStream);
}
public static PipePluginMeta deserialize(ByteBuffer byteBuffer) {
- PipePluginMeta pipePluginMeta = new PipePluginMeta();
- pipePluginMeta.pluginName = ReadWriteIOUtils.readString(byteBuffer);
- pipePluginMeta.className = ReadWriteIOUtils.readString(byteBuffer);
- pipePluginMeta.jarName = ReadWriteIOUtils.readString(byteBuffer);
- pipePluginMeta.jarMD5 = ReadWriteIOUtils.readString(byteBuffer);
- return pipePluginMeta;
+ final String pluginName = ReadWriteIOUtils.readString(byteBuffer);
+ final String className = ReadWriteIOUtils.readString(byteBuffer);
+ final boolean isBuiltin = ReadWriteIOUtils.readBool(byteBuffer);
+ final String jarName = ReadWriteIOUtils.readString(byteBuffer);
+ final String jarMD5 = ReadWriteIOUtils.readString(byteBuffer);
+ return new PipePluginMeta(pluginName, className, isBuiltin, jarName, jarMD5);
}
public static PipePluginMeta deserialize(InputStream inputStream) throws IOException {
@@ -101,8 +123,9 @@ public class PipePluginMeta {
PipePluginMeta that = (PipePluginMeta) obj;
return pluginName.equals(that.pluginName)
&& className.equals(that.className)
- && jarName.equals(that.jarName)
- && jarMD5.equals(that.jarMD5);
+ && isBuiltin == that.isBuiltin
+ && Objects.equals(jarName, that.jarName)
+ && Objects.equals(jarMD5, that.jarMD5);
}
@Override
@@ -119,6 +142,8 @@ public class PipePluginMeta {
+ ", className='"
+ className
+ '\''
+ + ", isBuiltin="
+ + isBuiltin
+ ", jarName='"
+ jarName
+ '\''
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
index 4417141106..9a47fcfb89 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
@@ -19,40 +19,78 @@
package org.apache.iotdb.commons.pipe.plugin.meta;
+import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
public abstract class PipePluginMetaKeeper {
- protected final Map<String, PipePluginMeta> pipeNameToPipeMetaMap;
+ protected final Map<String, PipePluginMeta> pipePluginNameToMetaMap = new ConcurrentHashMap<>();
- public PipePluginMetaKeeper() {
- pipeNameToPipeMetaMap = new ConcurrentHashMap<>();
+ protected void loadBuiltInPlugins() {
+ for (final BuiltinPipePlugin builtinPipePlugin : BuiltinPipePlugin.values()) {
+ addPipePluginMeta(
+ builtinPipePlugin.getPipePluginName(),
+ new PipePluginMeta(
+ builtinPipePlugin.getPipePluginName(), builtinPipePlugin.getClassName()));
+ }
}
public void addPipePluginMeta(String pluginName, PipePluginMeta pipePluginMeta) {
- pipeNameToPipeMetaMap.put(pluginName.toUpperCase(), pipePluginMeta);
+ pipePluginNameToMetaMap.put(pluginName.toUpperCase(), pipePluginMeta);
}
public void removePipePluginMeta(String pluginName) {
- pipeNameToPipeMetaMap.remove(pluginName.toUpperCase());
+ pipePluginNameToMetaMap.remove(pluginName.toUpperCase());
}
public PipePluginMeta getPipePluginMeta(String pluginName) {
- return pipeNameToPipeMetaMap.get(pluginName.toUpperCase());
+ return pipePluginNameToMetaMap.get(pluginName.toUpperCase());
}
public PipePluginMeta[] getAllPipePluginMeta() {
- return pipeNameToPipeMetaMap.values().toArray(new PipePluginMeta[0]);
+ return pipePluginNameToMetaMap.values().toArray(new PipePluginMeta[0]);
}
public boolean containsPipePlugin(String pluginName) {
- return pipeNameToPipeMetaMap.containsKey(pluginName.toUpperCase());
+ return pipePluginNameToMetaMap.containsKey(pluginName.toUpperCase());
+ }
+
+ protected void processTakeSnapshot(OutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(
+ (int)
+ pipePluginNameToMetaMap.values().stream()
+ .filter(pipePluginMeta -> !pipePluginMeta.isBuiltin())
+ .count(),
+ outputStream);
+
+ for (PipePluginMeta pipePluginMeta : pipePluginNameToMetaMap.values()) {
+ if (pipePluginMeta.isBuiltin()) {
+ continue;
+ }
+ ReadWriteIOUtils.write(pipePluginMeta.serialize(), outputStream);
+ }
}
- public void clear() {
- pipeNameToPipeMetaMap.clear();
+ protected void processLoadSnapshot(InputStream inputStream) throws IOException {
+ pipePluginNameToMetaMap.forEach(
+ (pluginName, pluginMeta) -> {
+ if (!pluginMeta.isBuiltin()) {
+ pipePluginNameToMetaMap.remove(pluginName);
+ }
+ });
+
+ final int pipePluginMetaSize = ReadWriteIOUtils.readInt(inputStream);
+ for (int i = 0; i < pipePluginMetaSize; i++) {
+ final PipePluginMeta pipePluginMeta = PipePluginMeta.deserialize(inputStream);
+ addPipePluginMeta(pipePluginMeta.getPluginName().toUpperCase(), pipePluginMeta);
+ }
}
@Override
@@ -64,11 +102,11 @@ public abstract class PipePluginMetaKeeper {
return false;
}
PipePluginMetaKeeper that = (PipePluginMetaKeeper) o;
- return pipeNameToPipeMetaMap.equals(that.pipeNameToPipeMetaMap);
+ return pipePluginNameToMetaMap.equals(that.pipePluginNameToMetaMap);
}
@Override
public int hashCode() {
- return Objects.hash(pipeNameToPipeMetaMap);
+ return Objects.hash(pipePluginNameToMetaMap);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
index bab90568f3..7f2e41bf82 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
@@ -98,6 +98,7 @@ public class ColumnHeaderConstant {
// column names for show pipe plugins statement
public static final String PLUGIN_NAME = "PluginName";
+ public static final String PLUGIN_TYPE = "PluginType";
public static final String PLUGIN_JAR = "PluginJar";
// show cluster status
@@ -354,6 +355,7 @@ public class ColumnHeaderConstant {
public static final List<ColumnHeader> showPipePluginsColumnHeaders =
ImmutableList.of(
new ColumnHeader(PLUGIN_NAME, TSDataType.TEXT),
+ new ColumnHeader(PLUGIN_TYPE, TSDataType.TEXT),
new ColumnHeader(CLASS_NAME, TSDataType.TEXT),
new ColumnHeader(PLUGIN_JAR, TSDataType.TEXT));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 577b5b0f86..cecd844275 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -175,6 +175,7 @@ import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement;
import org.apache.iotdb.db.trigger.service.TriggerClassLoader;
+import org.apache.iotdb.pipe.api.PipePlugin;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -762,7 +763,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
try (PipePluginClassLoader classLoader = new PipePluginClassLoader(libRoot)) {
// ensure that jar file contains the class and the class is a pipe plugin
Class<?> clazz = Class.forName(createPipePluginStatement.getClassName(), true, classLoader);
- clazz.getDeclaredConstructor().newInstance();
+ PipePlugin ignored = (PipePlugin) clazz.getDeclaredConstructor().newInstance();
} catch (ClassNotFoundException
| NoSuchMethodException
| InstantiationException
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowPipePluginsTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowPipePluginsTask.java
index a2900d6d97..d2bdcc9fee 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowPipePluginsTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowPipePluginsTask.java
@@ -42,6 +42,11 @@ import java.util.stream.Collectors;
public class ShowPipePluginsTask implements IConfigTask {
+ private static final Binary PIPE_PLUGIN_TYPE_BUILTIN = Binary.valueOf("Builtin");
+ private static final Binary PIPE_PLUGIN_TYPE_EXTERNAL = Binary.valueOf("External");
+
+ private static final Binary PIPE_JAR_NAME_EMPTY_FIELD = Binary.valueOf("");
+
@Override
public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
throws InterruptedException {
@@ -66,8 +71,17 @@ public class ShowPipePluginsTask implements IConfigTask {
for (final PipePluginMeta pipePluginMeta : pipePluginMetaList) {
builder.getTimeColumnBuilder().writeLong(0L);
builder.getColumnBuilder(0).writeBinary(Binary.valueOf(pipePluginMeta.getPluginName()));
- builder.getColumnBuilder(1).writeBinary(Binary.valueOf(pipePluginMeta.getClassName()));
- builder.getColumnBuilder(2).writeBinary(Binary.valueOf(pipePluginMeta.getJarName()));
+ builder
+ .getColumnBuilder(1)
+ .writeBinary(
+ pipePluginMeta.isBuiltin() ? PIPE_PLUGIN_TYPE_BUILTIN : PIPE_PLUGIN_TYPE_EXTERNAL);
+ builder.getColumnBuilder(2).writeBinary(Binary.valueOf(pipePluginMeta.getClassName()));
+ builder
+ .getColumnBuilder(3)
+ .writeBinary(
+ pipePluginMeta.getJarName() == null
+ ? PIPE_JAR_NAME_EMPTY_FIELD
+ : Binary.valueOf(pipePluginMeta.getJarName()));
builder.declarePosition();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
index 1c50fe5b40..a38d86f974 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.pipe.agent;
-import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
import org.apache.iotdb.db.pipe.agent.plugin.PipePluginAgent;
import org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent;
import org.apache.iotdb.db.pipe.agent.task.PipeTaskAgent;
@@ -33,9 +32,7 @@ public class PipeAgent {
/** Private constructor to prevent users from creating a new instance. */
private PipeAgent() {
- final DataNodePipePluginMetaKeeper pipePluginMetaKeeper = new DataNodePipePluginMetaKeeper();
-
- pipePluginAgent = PipePluginAgent.setupAndGetInstance(pipePluginMetaKeeper);
+ pipePluginAgent = PipePluginAgent.setupAndGetInstance();
pipeTaskAgent = PipeTaskAgent.setupAndGetInstance();
pipeRuntimeAgent = PipeRuntimeAgent.setupAndGetInstance();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index d4c8359bcc..1f5f70bd3d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -58,6 +58,10 @@ public class PipePluginAgent {
public void register(PipePluginMeta pipePluginMeta, ByteBuffer jarFile) throws Exception {
acquireLock();
try {
+ // try to deregister first to avoid inconsistent state
+ deregister(pipePluginMeta.getPluginName(), false);
+
+ // register process from here
checkIfRegistered(pipePluginMeta);
saveJarFileIfNeeded(pipePluginMeta.getJarName(), jarFile);
doRegister(pipePluginMeta);
@@ -73,6 +77,15 @@ public class PipePluginAgent {
return;
}
+ if (information.isBuiltin()) {
+ String errorMessage =
+ String.format(
+ "Failed to register PipePlugin %s, because the given PipePlugin name is the same as a built-in PipePlugin name.",
+ pluginName);
+ LOGGER.warn(errorMessage);
+ throw new PipeManagementException(errorMessage);
+ }
+
if (PipePluginExecutableManager.getInstance()
.hasFileUnderInstallDir(pipePluginMeta.getJarName())
&& !PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) {
@@ -84,6 +97,9 @@ public class PipePluginAgent {
LOGGER.warn(errMsg);
throw new PipeManagementException(errMsg);
}
+
+ // if the pipe plugin is already registered and the jar file is the same, do nothing
+ // we allow users to register the same pipe plugin multiple times without any error
}
private void saveJarFileIfNeeded(String jarName, ByteBuffer byteBuffer) throws IOException {
@@ -111,7 +127,7 @@ public class PipePluginAgent {
final Class<?> pluginClass = Class.forName(className, true, currentActiveClassLoader);
// ensure that it is a PipePlugin class
- PipePlugin ignored = (PipePlugin) pluginClass.getDeclaredConstructor().newInstance();
+ final PipePlugin ignored = (PipePlugin) pluginClass.getDeclaredConstructor().newInstance();
pipePluginMetaKeeper.addPipePluginMeta(pluginName, pipePluginMeta);
pipePluginMetaKeeper.addPluginAndClass(pluginName, pluginClass);
@@ -141,15 +157,21 @@ public class PipePluginAgent {
public void deregister(String pluginName, boolean needToDeleteJar) throws Exception {
acquireLock();
try {
- PipePluginMeta information = pipePluginMetaKeeper.getPipePluginMeta(pluginName);
- if (information == null) {
- return;
+ final PipePluginMeta information = pipePluginMetaKeeper.getPipePluginMeta(pluginName);
+
+ if (information != null && information.isBuiltin()) {
+ String errorMessage =
+ String.format("Failed to deregister builtin PipePlugin %s.", pluginName);
+ LOGGER.warn(errorMessage);
+ throw new PipeManagementException(errorMessage);
}
+ // remove anyway
pipePluginMetaKeeper.removePipePluginMeta(pluginName);
pipePluginMetaKeeper.removePluginClass(pluginName);
- if (needToDeleteJar) {
+ // if it is needed to delete jar file of the pipe plugin, delete both jar file and md5
+ if (information != null && needToDeleteJar) {
PipePluginExecutableManager.getInstance().removeFileUnderLibRoot(information.getJarName());
PipePluginExecutableManager.getInstance()
.removeFileUnderTemporaryRoot(pluginName.toUpperCase() + ".txt");
@@ -188,18 +210,17 @@ public class PipePluginAgent {
///////////////////////// Singleton Instance Holder /////////////////////////
- private PipePluginAgent(DataNodePipePluginMetaKeeper pipePluginMetaKeeper) {
- this.pipePluginMetaKeeper = pipePluginMetaKeeper;
+ private PipePluginAgent() {
+ this.pipePluginMetaKeeper = new DataNodePipePluginMetaKeeper();
}
private static class PipePluginAgentServiceHolder {
private static PipePluginAgent instance = null;
}
- public static PipePluginAgent setupAndGetInstance(
- DataNodePipePluginMetaKeeper pipePluginMetaKeeper) {
+ public static PipePluginAgent setupAndGetInstance() {
if (PipePluginAgentServiceHolder.instance == null) {
- PipePluginAgentServiceHolder.instance = new PipePluginAgent(pipePluginMetaKeeper);
+ PipePluginAgentServiceHolder.instance = new PipePluginAgent();
}
return PipePluginAgentServiceHolder.instance;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 365eae6310..3e0f39ac4f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -861,6 +861,9 @@ public class DataNode implements DataNodeMBean {
// create instances of pipe plugins and do registration
try {
for (PipePluginMeta meta : resourcesInformationHolder.getPipePluginMetaList()) {
+ if (meta.isBuiltin()) {
+ continue;
+ }
PipeAgent.plugin().doRegister(meta);
}
} catch (Exception e) {
@@ -881,6 +884,9 @@ public class DataNode implements DataNodeMBean {
private List<PipePluginMeta> getJarListForPipePlugin() {
List<PipePluginMeta> res = new ArrayList<>();
for (PipePluginMeta pipePluginMeta : resourcesInformationHolder.getPipePluginMetaList()) {
+ if (pipePluginMeta.isBuiltin()) {
+ continue;
+ }
// If jar does not exist, add current pipePluginMeta to list
if (!PipePluginExecutableManager.getInstance()
.hasFileUnderInstallDir(pipePluginMeta.getJarName())) {