You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/04/23 14:24:12 UTC
[iotdb] 02/04: 1. support built-in pipe plugins. 2. allow to create / drop the same function multiple times without errors on PipePluginAgent / PipePluginInfo to enable retry policy. 3. allow to drop a function that is not registered on leader config node so that user can have a chance to change the inconsistant registration state over different config/data nodes.
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch IOTDB-5788
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 270802123d38f6bab29027964526522b46dafe14
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun Apr 23 20:44:55 2023 +0800
1. support built-in pipe plugins.
2. allow to create / drop the same function multiple times without errors on PipePluginAgent / PipePluginInfo to enable retry policy.
3. allow to drop a function that is not registered on leader config node so that user can have a chance to change the inconsistant registration state over different config/data nodes.
---
.../persistence/pipe/PipePluginInfo.java | 37 +++++++------
.../impl/pipe/plugin/DropPipePluginProcedure.java | 6 +--
.../thrift/ConfigNodeRPCServiceProcessor.java | 1 -
.../pipe/plugin/builtin/BuiltinPipePlugin.java | 36 ++++++++++++-
.../builtin/connector/DoNothingConnector.java | 53 +++++++++++++++++-
.../builtin/processor/DoNothingProcessor.java | 49 ++++++++++++++++-
.../meta/ConfigNodePipePluginMetaKeeper.java | 25 +++------
.../plugin/meta/DataNodePipePluginMetaKeeper.java | 28 +++++++---
.../commons/pipe/plugin/meta/PipePluginMeta.java | 57 +++++++++++++-------
.../pipe/plugin/meta/PipePluginMetaKeeper.java | 62 +++++++++++++++++-----
.../config/executor/ClusterConfigTaskExecutor.java | 3 +-
.../org/apache/iotdb/db/pipe/agent/PipeAgent.java | 5 +-
.../db/pipe/agent/plugin/PipePluginAgent.java | 41 ++++++++++----
13 files changed, 311 insertions(+), 92 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 0b47dccb6c..3818070dac 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -84,6 +84,7 @@ public class PipePluginInfo implements SnapshotProcessor {
/////////////////////////////// Validator ///////////////////////////////
public void validateBeforeCreatingPipePlugin(String pluginName, String jarName, String jarMD5) {
+ // both build-in and user defined pipe plugin should be unique
if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
throw new PipeManagementException(
String.format(
@@ -100,12 +101,13 @@ public class PipePluginInfo implements SnapshotProcessor {
}
public void validateBeforeDroppingPipePlugin(String pluginName) {
- if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
- return;
+ if (pipePluginMetaKeeper.containsPipePlugin(pluginName)
+ && pipePluginMetaKeeper.getPipePluginMeta(pluginName).isBuiltin()) {
+ throw new PipeManagementException(
+ String.format(
+ "Failed to drop PipePlugin [%s], the PipePlugin is a built-in PipePlugin",
+ pluginName));
}
-
- throw new PipeManagementException(
- String.format("Failed to drop PipePlugin [%s], the PipePlugin does not exist", pluginName));
}
public boolean isJarNeededToBeSavedWhenCreatingPipePlugin(String jarName) {
@@ -119,32 +121,37 @@ public class PipePluginInfo implements SnapshotProcessor {
/////////////////////////////// Pipe Plugin Management ///////////////////////////////
- public TSStatus createPipePlugin(CreatePipePluginPlan physicalPlan) {
+ public TSStatus createPipePlugin(CreatePipePluginPlan createPipePluginPlan) {
try {
- final PipePluginMeta pipePluginMeta = physicalPlan.getPipePluginMeta();
+ final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta();
+
+ // try to drop the old pipe plugin if exists to reduce the effect of the inconsistency
+ dropPipePlugin(new DropPipePluginPlan(pipePluginMeta.getPluginName()));
+
pipePluginMetaKeeper.addPipePluginMeta(pipePluginMeta.getPluginName(), pipePluginMeta);
pipePluginMetaKeeper.addJarNameAndMd5(
pipePluginMeta.getJarName(), pipePluginMeta.getJarMD5());
- if (physicalPlan.getJarFile() != null) {
+ if (createPipePluginPlan.getJarFile() != null) {
pipePluginExecutableManager.saveToInstallDir(
- ByteBuffer.wrap(physicalPlan.getJarFile().getValues()), pipePluginMeta.getJarName());
+ ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()),
+ pipePluginMeta.getJarName());
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (Exception e) {
final String errorMessage =
String.format(
- "Failed to add PipePlugin [%s] in PipePlugin_Table on Config Nodes, because of %s",
- physicalPlan.getPipePluginMeta().getPluginName(), e);
+ "Failed to execute createPipePlugin(%s) on config nodes, because of %s",
+ createPipePluginPlan.getPipePluginMeta().getPluginName(), e);
LOGGER.warn(errorMessage, e);
return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
.setMessage(errorMessage);
}
}
- public TSStatus dropPipePlugin(DropPipePluginPlan physicalPlan) {
- final String pluginName = physicalPlan.getPluginName();
+ public TSStatus dropPipePlugin(DropPipePluginPlan dropPipePluginPlan) {
+ final String pluginName = dropPipePluginPlan.getPluginName();
if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
pipePluginMetaKeeper.removeJarNameAndMd5IfPossible(
@@ -161,10 +168,10 @@ public class PipePluginInfo implements SnapshotProcessor {
Arrays.asList(pipePluginMetaKeeper.getAllPipePluginMeta()));
}
- public JarResp getPipePluginJar(GetPipePluginJarPlan physicalPlan) {
+ public JarResp getPipePluginJar(GetPipePluginJarPlan getPipePluginJarPlan) {
try {
List<ByteBuffer> jarList = new ArrayList<>();
- for (String jarName : physicalPlan.getJarNames()) {
+ for (String jarName : getPipePluginJarPlan.getJarNames()) {
jarList.add(
ExecutableManager.transferToBytebuffer(
PipePluginExecutableManager.getInstance()
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index 6aa4d6109d..7c8268ecd0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -110,7 +110,7 @@ public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePlugi
try {
pipePluginCoordinator.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName);
} catch (PipeManagementException e) {
- // if the pipe plugin is not exist, we should end the procedure
+ // if the pipe plugin is a built-in plugin, we should not drop it
LOGGER.warn(e.getMessage());
setFailure(new ProcedureException(e.getMessage()));
pipePluginCoordinator.unlock();
@@ -180,14 +180,14 @@ public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePlugi
LOGGER.info("DropPipePluginProcedure: rollbackFromDropOnDataNodes({})", pluginName);
// do nothing but wait for rolling back to the previous state: LOCK
- // TODO: we should drop the pipe plugin on data nodes
+ // TODO: we should drop the pipe plugin on data nodes properly with RuntimeAgent's help
}
private void rollbackFromDropOnConfigNodes(ConfigNodeProcedureEnv env) {
LOGGER.info("DropPipePluginProcedure: rollbackFromDropOnConfigNodes({})", pluginName);
// do nothing but wait for rolling back to the previous state: DROP_ON_DATA_NODES
- // TODO: we should drop the pipe plugin on config nodes
+ // TODO: we should drop the pipe plugin on config nodes properly with RuntimeCoordinator's help
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 2e80a9b0d1..efc3541a8c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -897,7 +897,6 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
}
@Override
- @Deprecated
public TGetAllPipeInfoResp getAllPipeInfo() {
return configManager.getAllPipeInfo();
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index 2e21cbf893..d6972646d4 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -17,5 +17,39 @@
* under the License.
*/
-package org.apache.iotdb.commons.pipe.plugin.builtin;public enum BuiltinPipePlugin {
+package org.apache.iotdb.commons.pipe.plugin.builtin;
+
+import org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector;
+import org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor;
+
+public enum BuiltinPipePlugin {
+
+ // processors
+ DO_NOTHING_PROCESSOR("do_nothing_processor", DoNothingProcessor.class),
+
+ // connectors
+ DO_NOTHING_CONNECTOR("do_nothing_connector", DoNothingConnector.class),
+ ;
+
+ private final String pipePluginName;
+ private final Class<?> pipePluginClass;
+ private final String className;
+
+ BuiltinPipePlugin(String functionName, Class<?> pipePluginClass) {
+ this.pipePluginName = functionName;
+ this.pipePluginClass = pipePluginClass;
+ this.className = pipePluginClass.getName();
+ }
+
+ public String getPipePluginName() {
+ return pipePluginName;
+ }
+
+ public Class<?> getPipePluginClass() {
+ return pipePluginClass;
+ }
+
+ public String getClassName() {
+ return className;
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
index e50d3c3c1e..503005fefa 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/DoNothingConnector.java
@@ -17,5 +17,56 @@
* under the License.
*/
-package org.apache.iotdb.commons.pipe.plugin.builtin.connector;public class DoNothingConnector {
+package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
+
+import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.connector.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+
+public class DoNothingConnector implements PipeConnector {
+
+ @Override
+ public void validate(PipeParameterValidator validator) {
+ // do nothing
+ }
+
+ @Override
+ public void customize(
+ PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration) {
+ // do nothing
+ }
+
+ @Override
+ public void handshake() {
+ // do nothing
+ }
+
+ @Override
+ public void heartbeat() {
+ // do nothing
+ }
+
+ @Override
+ public void transfer(TabletInsertionEvent tabletInsertionEvent) {
+ // do nothing
+ }
+
+ @Override
+ public void transfer(TsFileInsertionEvent tsFileInsertionEvent) {
+ // do nothing
+ }
+
+ @Override
+ public void transfer(DeletionEvent deletionEvent) {
+ // do nothing
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
index 572a9d365a..45634119db 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
@@ -17,5 +17,52 @@
* under the License.
*/
-package org.apache.iotdb.commons.pipe.plugin.builtin.processor;public class DoNothingProcessor {
+package org.apache.iotdb.commons.pipe.plugin.builtin.processor;
+
+import org.apache.iotdb.pipe.api.PipeProcessor;
+import org.apache.iotdb.pipe.api.collector.EventCollector;
+import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.PipeParameters;
+import org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.event.deletion.DeletionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.insertion.TsFileInsertionEvent;
+
+import java.io.IOException;
+
+public class DoNothingProcessor implements PipeProcessor {
+
+ @Override
+ public void validate(PipeParameterValidator validator) {
+ // do nothing
+ }
+
+ @Override
+ public void customize(
+ PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) {
+ // do nothing
+ }
+
+ @Override
+ public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector)
+ throws IOException {
+ eventCollector.collectTabletInsertionEvent(tabletInsertionEvent);
+ }
+
+ @Override
+ public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector)
+ throws IOException {
+ eventCollector.collectTsFileInsertionEvent(tsFileInsertionEvent);
+ }
+
+ @Override
+ public void process(DeletionEvent deletionEvent, EventCollector eventCollector)
+ throws IOException {
+ eventCollector.collectDeletionEvent(deletionEvent);
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java
index c02fdeadb6..7c3632e4b4 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/ConfigNodePipePluginMetaKeeper.java
@@ -33,7 +33,8 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
protected final Map<String, Integer> jarNameToReferenceCountMap;
public ConfigNodePipePluginMetaKeeper() {
- super();
+ loadBuiltInPlugins();
+
jarNameToMd5Map = new HashMap<>();
jarNameToReferenceCountMap = new HashMap<>();
}
@@ -67,6 +68,7 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
}
}
+ @Override
public void processTakeSnapshot(OutputStream outputStream) throws IOException {
ReadWriteIOUtils.write(jarNameToMd5Map.size(), outputStream);
for (Map.Entry<String, String> entry : jarNameToMd5Map.entrySet()) {
@@ -75,14 +77,13 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
ReadWriteIOUtils.write(jarNameToReferenceCountMap.get(entry.getKey()), outputStream);
}
- ReadWriteIOUtils.write(pipeNameToPipeMetaMap.size(), outputStream);
- for (PipePluginMeta pipePluginMeta : pipeNameToPipeMetaMap.values()) {
- ReadWriteIOUtils.write(pipePluginMeta.serialize(), outputStream);
- }
+ super.processTakeSnapshot(outputStream);
}
+ @Override
public void processLoadSnapshot(InputStream inputStream) throws IOException {
- clear();
+ jarNameToMd5Map.clear();
+ jarNameToReferenceCountMap.clear();
final int jarSize = ReadWriteIOUtils.readInt(inputStream);
for (int i = 0; i < jarSize; i++) {
@@ -93,16 +94,6 @@ public class ConfigNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
jarNameToReferenceCountMap.put(jarName, count);
}
- final int pipePluginMetaSize = ReadWriteIOUtils.readInt(inputStream);
- for (int i = 0; i < pipePluginMetaSize; i++) {
- final PipePluginMeta pipePluginMeta = PipePluginMeta.deserialize(inputStream);
- addPipePluginMeta(pipePluginMeta.getPluginName().toUpperCase(), pipePluginMeta);
- }
- }
-
- public void clear() {
- pipeNameToPipeMetaMap.clear();
- jarNameToMd5Map.clear();
- jarNameToReferenceCountMap.clear();
+ super.processLoadSnapshot(inputStream);
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/DataNodePipePluginMetaKeeper.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/DataNodePipePluginMetaKeeper.java
index 64d7c8ef94..42656ad870 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/DataNodePipePluginMetaKeeper.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/DataNodePipePluginMetaKeeper.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.commons.pipe.plugin.meta;
+import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoader;
import java.util.Map;
@@ -26,28 +27,39 @@ import java.util.concurrent.ConcurrentHashMap;
public class DataNodePipePluginMetaKeeper extends PipePluginMetaKeeper {
- private final Map<String, Class<?>> pipeNameToPipeClassMap;
+ private final Map<String, Class<?>> pipePluginNameToClassMap;
public DataNodePipePluginMetaKeeper() {
- super();
- pipeNameToPipeClassMap = new ConcurrentHashMap<>();
+ pipePluginNameToClassMap = new ConcurrentHashMap<>();
+
+ loadBuiltInPlugins();
+ }
+
+ @Override
+ protected void loadBuiltInPlugins() {
+ super.loadBuiltInPlugins();
+
+ for (final BuiltinPipePlugin builtinPipePlugin : BuiltinPipePlugin.values()) {
+ addPluginAndClass(
+ builtinPipePlugin.getPipePluginName(), builtinPipePlugin.getPipePluginClass());
+ }
}
public void addPluginAndClass(String pluginName, Class<?> clazz) {
- pipeNameToPipeClassMap.put(pluginName.toUpperCase(), clazz);
+ pipePluginNameToClassMap.put(pluginName.toUpperCase(), clazz);
}
public Class<?> getPluginClass(String pluginName) {
- return pipeNameToPipeClassMap.get(pluginName.toUpperCase());
+ return pipePluginNameToClassMap.get(pluginName.toUpperCase());
}
public void removePluginClass(String pluginName) {
- pipeNameToPipeClassMap.remove(pluginName.toUpperCase());
+ pipePluginNameToClassMap.remove(pluginName.toUpperCase());
}
public void updatePluginClass(PipePluginMeta pipePluginMeta, PipePluginClassLoader classLoader)
throws ClassNotFoundException {
- Class<?> functionClass = Class.forName(pipePluginMeta.getClassName(), true, classLoader);
- pipeNameToPipeClassMap.put(pipePluginMeta.getPluginName().toUpperCase(), functionClass);
+ final Class<?> functionClass = Class.forName(pipePluginMeta.getClassName(), true, classLoader);
+ pipePluginNameToClassMap.put(pipePluginMeta.getPluginName().toUpperCase(), functionClass);
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMeta.java
index 08bcc614f0..5b11cd4d44 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMeta.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMeta.java
@@ -26,24 +26,39 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.util.Objects;
public class PipePluginMeta {
- private String pluginName;
+ private final String pluginName;
+ private final String className;
- private String className;
+ // jarName and jarMD5 are used to identify the jar file.
+ // they could be null if the plugin is built-in. they should be both null or both not null.
+ private final boolean isBuiltin;
+ private final String jarName;
+ private final String jarMD5;
- private String jarName;
+ public PipePluginMeta(String pluginName, String className, String jarName, String jarMD5) {
+ this.pluginName = Objects.requireNonNull(pluginName).toUpperCase();
+ this.className = Objects.requireNonNull(className);
- private String jarMD5;
+ isBuiltin = false;
+ this.jarName = Objects.requireNonNull(jarName);
+ this.jarMD5 = Objects.requireNonNull(jarMD5);
+ }
- private PipePluginMeta() {}
+ public PipePluginMeta(String pluginName, String className) {
+ this.pluginName = Objects.requireNonNull(pluginName).toUpperCase();
+ this.className = Objects.requireNonNull(className);
- public PipePluginMeta(String pluginName, String className, String jarName, String jarMD5) {
- this.pluginName = pluginName.toUpperCase();
- this.className = className;
- this.jarName = jarName;
- this.jarMD5 = jarMD5;
+ this.isBuiltin = true;
+ this.jarName = null;
+ this.jarMD5 = null;
+ }
+
+ public boolean isBuiltin() {
+ return isBuiltin;
}
public String getPluginName() {
@@ -69,6 +84,10 @@ public class PipePluginMeta {
return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
}
+ /**
+ * All built-in plugins' information is kept in Java class {@code BuiltinPipePlugin }. So we never
+ * serialize the built-in plugins, then we don't need to serialize the isBuiltin field.
+ */
public void serialize(DataOutputStream outputStream) throws IOException {
ReadWriteIOUtils.write(pluginName, outputStream);
ReadWriteIOUtils.write(className, outputStream);
@@ -77,12 +96,11 @@ public class PipePluginMeta {
}
public static PipePluginMeta deserialize(ByteBuffer byteBuffer) {
- PipePluginMeta pipePluginMeta = new PipePluginMeta();
- pipePluginMeta.pluginName = ReadWriteIOUtils.readString(byteBuffer);
- pipePluginMeta.className = ReadWriteIOUtils.readString(byteBuffer);
- pipePluginMeta.jarName = ReadWriteIOUtils.readString(byteBuffer);
- pipePluginMeta.jarMD5 = ReadWriteIOUtils.readString(byteBuffer);
- return pipePluginMeta;
+ final String pluginName = ReadWriteIOUtils.readString(byteBuffer);
+ final String className = ReadWriteIOUtils.readString(byteBuffer);
+ final String jarName = ReadWriteIOUtils.readString(byteBuffer);
+ final String jarMD5 = ReadWriteIOUtils.readString(byteBuffer);
+ return new PipePluginMeta(pluginName, className, jarName, jarMD5);
}
public static PipePluginMeta deserialize(InputStream inputStream) throws IOException {
@@ -101,8 +119,9 @@ public class PipePluginMeta {
PipePluginMeta that = (PipePluginMeta) obj;
return pluginName.equals(that.pluginName)
&& className.equals(that.className)
- && jarName.equals(that.jarName)
- && jarMD5.equals(that.jarMD5);
+ && isBuiltin == that.isBuiltin
+ && Objects.equals(jarName, that.jarName)
+ && Objects.equals(jarMD5, that.jarMD5);
}
@Override
@@ -119,6 +138,8 @@ public class PipePluginMeta {
+ ", className='"
+ className
+ '\''
+ + ", isBuiltin="
+ + isBuiltin
+ ", jarName='"
+ jarName
+ '\''
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
index 4417141106..9a47fcfb89 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMetaKeeper.java
@@ -19,40 +19,78 @@
package org.apache.iotdb.commons.pipe.plugin.meta;
+import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
public abstract class PipePluginMetaKeeper {
- protected final Map<String, PipePluginMeta> pipeNameToPipeMetaMap;
+ protected final Map<String, PipePluginMeta> pipePluginNameToMetaMap = new ConcurrentHashMap<>();
- public PipePluginMetaKeeper() {
- pipeNameToPipeMetaMap = new ConcurrentHashMap<>();
+ protected void loadBuiltInPlugins() {
+ for (final BuiltinPipePlugin builtinPipePlugin : BuiltinPipePlugin.values()) {
+ addPipePluginMeta(
+ builtinPipePlugin.getPipePluginName(),
+ new PipePluginMeta(
+ builtinPipePlugin.getPipePluginName(), builtinPipePlugin.getClassName()));
+ }
}
public void addPipePluginMeta(String pluginName, PipePluginMeta pipePluginMeta) {
- pipeNameToPipeMetaMap.put(pluginName.toUpperCase(), pipePluginMeta);
+ pipePluginNameToMetaMap.put(pluginName.toUpperCase(), pipePluginMeta);
}
public void removePipePluginMeta(String pluginName) {
- pipeNameToPipeMetaMap.remove(pluginName.toUpperCase());
+ pipePluginNameToMetaMap.remove(pluginName.toUpperCase());
}
public PipePluginMeta getPipePluginMeta(String pluginName) {
- return pipeNameToPipeMetaMap.get(pluginName.toUpperCase());
+ return pipePluginNameToMetaMap.get(pluginName.toUpperCase());
}
public PipePluginMeta[] getAllPipePluginMeta() {
- return pipeNameToPipeMetaMap.values().toArray(new PipePluginMeta[0]);
+ return pipePluginNameToMetaMap.values().toArray(new PipePluginMeta[0]);
}
public boolean containsPipePlugin(String pluginName) {
- return pipeNameToPipeMetaMap.containsKey(pluginName.toUpperCase());
+ return pipePluginNameToMetaMap.containsKey(pluginName.toUpperCase());
+ }
+
+ protected void processTakeSnapshot(OutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(
+ (int)
+ pipePluginNameToMetaMap.values().stream()
+ .filter(pipePluginMeta -> !pipePluginMeta.isBuiltin())
+ .count(),
+ outputStream);
+
+ for (PipePluginMeta pipePluginMeta : pipePluginNameToMetaMap.values()) {
+ if (pipePluginMeta.isBuiltin()) {
+ continue;
+ }
+ ReadWriteIOUtils.write(pipePluginMeta.serialize(), outputStream);
+ }
}
- public void clear() {
- pipeNameToPipeMetaMap.clear();
+ protected void processLoadSnapshot(InputStream inputStream) throws IOException {
+ pipePluginNameToMetaMap.forEach(
+ (pluginName, pluginMeta) -> {
+ if (!pluginMeta.isBuiltin()) {
+ pipePluginNameToMetaMap.remove(pluginName);
+ }
+ });
+
+ final int pipePluginMetaSize = ReadWriteIOUtils.readInt(inputStream);
+ for (int i = 0; i < pipePluginMetaSize; i++) {
+ final PipePluginMeta pipePluginMeta = PipePluginMeta.deserialize(inputStream);
+ addPipePluginMeta(pipePluginMeta.getPluginName().toUpperCase(), pipePluginMeta);
+ }
}
@Override
@@ -64,11 +102,11 @@ public abstract class PipePluginMetaKeeper {
return false;
}
PipePluginMetaKeeper that = (PipePluginMetaKeeper) o;
- return pipeNameToPipeMetaMap.equals(that.pipeNameToPipeMetaMap);
+ return pipePluginNameToMetaMap.equals(that.pipePluginNameToMetaMap);
}
@Override
public int hashCode() {
- return Objects.hash(pipeNameToPipeMetaMap);
+ return Objects.hash(pipePluginNameToMetaMap);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 2dfbdef2c8..9f98d4979e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -172,6 +172,7 @@ import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkStatement;
import org.apache.iotdb.db.trigger.service.TriggerClassLoader;
+import org.apache.iotdb.pipe.api.PipePlugin;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -759,7 +760,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
try (PipePluginClassLoader classLoader = new PipePluginClassLoader(libRoot)) {
// ensure that jar file contains the class and the class is a pipe plugin
Class<?> clazz = Class.forName(createPipePluginStatement.getClassName(), true, classLoader);
- clazz.getDeclaredConstructor().newInstance();
+ PipePlugin ignored = (PipePlugin) clazz.getDeclaredConstructor().newInstance();
} catch (ClassNotFoundException
| NoSuchMethodException
| InstantiationException
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
index 1c50fe5b40..a38d86f974 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/PipeAgent.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.pipe.agent;
-import org.apache.iotdb.commons.pipe.plugin.meta.DataNodePipePluginMetaKeeper;
import org.apache.iotdb.db.pipe.agent.plugin.PipePluginAgent;
import org.apache.iotdb.db.pipe.agent.runtime.PipeRuntimeAgent;
import org.apache.iotdb.db.pipe.agent.task.PipeTaskAgent;
@@ -33,9 +32,7 @@ public class PipeAgent {
/** Private constructor to prevent users from creating a new instance. */
private PipeAgent() {
- final DataNodePipePluginMetaKeeper pipePluginMetaKeeper = new DataNodePipePluginMetaKeeper();
-
- pipePluginAgent = PipePluginAgent.setupAndGetInstance(pipePluginMetaKeeper);
+ pipePluginAgent = PipePluginAgent.setupAndGetInstance();
pipeTaskAgent = PipeTaskAgent.setupAndGetInstance();
pipeRuntimeAgent = PipeRuntimeAgent.setupAndGetInstance();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index d4c8359bcc..1f5f70bd3d 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -58,6 +58,10 @@ public class PipePluginAgent {
public void register(PipePluginMeta pipePluginMeta, ByteBuffer jarFile) throws Exception {
acquireLock();
try {
+ // try to deregister first to avoid inconsistent state
+ deregister(pipePluginMeta.getPluginName(), false);
+
+ // register process from here
checkIfRegistered(pipePluginMeta);
saveJarFileIfNeeded(pipePluginMeta.getJarName(), jarFile);
doRegister(pipePluginMeta);
@@ -73,6 +77,15 @@ public class PipePluginAgent {
return;
}
+ if (information.isBuiltin()) {
+ String errorMessage =
+ String.format(
+ "Failed to register PipePlugin %s, because the given PipePlugin name is the same as a built-in PipePlugin name.",
+ pluginName);
+ LOGGER.warn(errorMessage);
+ throw new PipeManagementException(errorMessage);
+ }
+
if (PipePluginExecutableManager.getInstance()
.hasFileUnderInstallDir(pipePluginMeta.getJarName())
&& !PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) {
@@ -84,6 +97,9 @@ public class PipePluginAgent {
LOGGER.warn(errMsg);
throw new PipeManagementException(errMsg);
}
+
+ // if the pipe plugin is already registered and the jar file is the same, do nothing
+ // we allow users to register the same pipe plugin multiple times without any error
}
private void saveJarFileIfNeeded(String jarName, ByteBuffer byteBuffer) throws IOException {
@@ -111,7 +127,7 @@ public class PipePluginAgent {
final Class<?> pluginClass = Class.forName(className, true, currentActiveClassLoader);
// ensure that it is a PipePlugin class
- PipePlugin ignored = (PipePlugin) pluginClass.getDeclaredConstructor().newInstance();
+ final PipePlugin ignored = (PipePlugin) pluginClass.getDeclaredConstructor().newInstance();
pipePluginMetaKeeper.addPipePluginMeta(pluginName, pipePluginMeta);
pipePluginMetaKeeper.addPluginAndClass(pluginName, pluginClass);
@@ -141,15 +157,21 @@ public class PipePluginAgent {
public void deregister(String pluginName, boolean needToDeleteJar) throws Exception {
acquireLock();
try {
- PipePluginMeta information = pipePluginMetaKeeper.getPipePluginMeta(pluginName);
- if (information == null) {
- return;
+ final PipePluginMeta information = pipePluginMetaKeeper.getPipePluginMeta(pluginName);
+
+ if (information != null && information.isBuiltin()) {
+ String errorMessage =
+ String.format("Failed to deregister builtin PipePlugin %s.", pluginName);
+ LOGGER.warn(errorMessage);
+ throw new PipeManagementException(errorMessage);
}
+ // remove anyway
pipePluginMetaKeeper.removePipePluginMeta(pluginName);
pipePluginMetaKeeper.removePluginClass(pluginName);
- if (needToDeleteJar) {
+ // if it is needed to delete jar file of the pipe plugin, delete both jar file and md5
+ if (information != null && needToDeleteJar) {
PipePluginExecutableManager.getInstance().removeFileUnderLibRoot(information.getJarName());
PipePluginExecutableManager.getInstance()
.removeFileUnderTemporaryRoot(pluginName.toUpperCase() + ".txt");
@@ -188,18 +210,17 @@ public class PipePluginAgent {
///////////////////////// Singleton Instance Holder /////////////////////////
- private PipePluginAgent(DataNodePipePluginMetaKeeper pipePluginMetaKeeper) {
- this.pipePluginMetaKeeper = pipePluginMetaKeeper;
+ private PipePluginAgent() {
+ this.pipePluginMetaKeeper = new DataNodePipePluginMetaKeeper();
}
private static class PipePluginAgentServiceHolder {
private static PipePluginAgent instance = null;
}
- public static PipePluginAgent setupAndGetInstance(
- DataNodePipePluginMetaKeeper pipePluginMetaKeeper) {
+ public static PipePluginAgent setupAndGetInstance() {
if (PipePluginAgentServiceHolder.instance == null) {
- PipePluginAgentServiceHolder.instance = new PipePluginAgent(pipePluginMetaKeeper);
+ PipePluginAgentServiceHolder.instance = new PipePluginAgent();
}
return PipePluginAgentServiceHolder.instance;
}