You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/03/04 19:02:29 UTC
[iotdb] branch master updated: [IOTDB-5598] Pipe Plugins Management: from SQL to CN (#9175)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new dfdb844142 [IOTDB-5598] Pipe Plugins Management: from SQL to CN (#9175)
dfdb844142 is described below
commit dfdb84414288d4bc006deb3a9fda99c053c2d5af
Author: Itami Sho <42...@users.noreply.github.com>
AuthorDate: Sun Mar 5 03:02:23 2023 +0800
[IOTDB-5598] Pipe Plugins Management: from SQL to CN (#9175)
* init
* add licenses
* format code style in the antlr file
* add auth control
* pipe plugin common package structure minor adjustment
* fix build errors
---------
Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
.../org/apache/iotdb/db/qp/sql/IdentifierParser.g4 | 2 +
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 27 +++-
.../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 8 +
.../iotdb/confignode/conf/ConfigNodeConfig.java | 26 +++
.../confignode/conf/ConfigNodeDescriptor.java | 2 +
.../iotdb/confignode/manager/ConfigManager.java | 24 +++
.../apache/iotdb/confignode/manager/IManager.java | 11 ++
.../thrift/ConfigNodeRPCServiceProcessor.java | 18 +++
.../iotdb/commons/auth/entity/PrivilegeType.java | 6 +-
.../apache/iotdb/commons/conf/IoTDBConstant.java | 1 +
.../commons/pipe/plugin/meta/PipePluginMeta.java | 136 ++++++++++++++++
.../pipe/plugin/service/PipePluginClassLoader.java | 90 +++++++++++
.../service/PipePluginClassLoaderManager.java | 103 ++++++++++++
.../service/PipePluginExecutableManager.java | 50 ++++++
.../apache/iotdb/commons/service/ServiceType.java | 5 +-
.../org/apache/iotdb/db/audit/AuditLogger.java | 3 +
.../org/apache/iotdb/db/auth/AuthorityChecker.java | 6 +
.../apache/iotdb/db/client/ConfigNodeClient.java | 66 ++++++++
.../db/mpp/common/header/ColumnHeaderConstant.java | 10 ++
.../db/mpp/common/header/DatasetHeaderFactory.java | 4 +
.../plan/execution/config/ConfigTaskVisitor.java | 24 +++
.../config/executor/ClusterConfigTaskExecutor.java | 178 ++++++++++++++++++++-
.../config/executor/IConfigTaskExecutor.java | 7 +
.../config/metadata/CreatePipePluginTask.java | 42 +++++
.../config/metadata/DropPipePluginTask.java | 42 +++++
.../config/metadata/ShowPipePluginsTask.java | 80 +++++++++
.../iotdb/db/mpp/plan/parser/ASTVisitor.java | 34 +++-
.../iotdb/db/mpp/plan/statement/StatementType.java | 6 +-
.../db/mpp/plan/statement/StatementVisitor.java | 16 ++
.../metadata/CreatePipePluginStatement.java | 72 +++++++++
.../metadata/DropPipePluginStatement.java | 60 +++++++
.../metadata/ShowPipePluginsStatement.java | 52 ++++++
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 10 +-
.../src/main/thrift/confignode.thrift | 44 +++++
34 files changed, 1244 insertions(+), 21 deletions(-)
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
index 5889464db9..11450e868c 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
@@ -131,6 +131,8 @@ keyWords
| PIPESINK
| PIPESINKS
| PIPESINKTYPE
+ | PIPEPLUGIN
+ | PIPEPLUGINS
| POLICY
| PREVIOUS
| PREVIOUSUNTILLAST
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 5f04e5406d..d805cb9b9a 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -37,12 +37,12 @@ statement
ddlStatement
: createStorageGroup | createTimeseries | createSchemaTemplate | createTimeseriesOfSchemaTemplate
- | createFunction | createTrigger | createContinuousQuery
+ | createFunction | createTrigger | createPipePlugin | createContinuousQuery
| alterTimeseries | alterStorageGroup | deleteStorageGroup | deleteTimeseries | deletePartition | deleteTimeseriesOfSchemaTemplate
- | dropFunction | dropTrigger | dropContinuousQuery | dropSchemaTemplate
+ | dropFunction | dropTrigger | dropPipePlugin | dropContinuousQuery | dropSchemaTemplate
| setTTL | unsetTTL | startTrigger | stopTrigger | setSchemaTemplate | unsetSchemaTemplate
| showStorageGroup | showDevices | showTimeseries | showChildPaths | showChildNodes
- | showFunctions | showTriggers | showContinuousQueries | showTTL | showAllTTL | showCluster | showVariables | showRegion | showDataNodes | showConfigNodes
+ | showFunctions | showTriggers | showPipePlugins | showContinuousQueries | showTTL | showAllTTL | showCluster | showVariables | showRegion | showDataNodes | showConfigNodes
| showSchemaTemplates | showNodesInSchemaTemplate
| showPathsUsingSchemaTemplate | showPathsSetSchemaTemplate
| countStorageGroup | countDevices | countTimeseries | countNodes
@@ -119,10 +119,10 @@ createTimeseriesOfSchemaTemplate
// Create Function
createFunction
- : CREATE FUNCTION udfName=identifier AS className=STRING_LITERAL uriClasue?
+ : CREATE FUNCTION udfName=identifier AS className=STRING_LITERAL uriClause?
;
-uriClasue
+uriClause
: USING URI uri
;
@@ -136,7 +136,7 @@ createTrigger
triggerEventClause
ON prefixPath
AS className=STRING_LITERAL
- uriClasue?
+ uriClause?
triggerAttributeClause?
;
@@ -156,6 +156,11 @@ triggerAttribute
: key=attributeKey operator_eq value=attributeValue
;
+// Create Pipe Plugin
+createPipePlugin
+ : CREATE PIPEPLUGIN pluginName=identifier AS className=STRING_LITERAL uriClause
+ ;
+
// Create Continuous Query
createContinuousQuery
: CREATE (CONTINUOUS QUERY | CQ) cqId=identifier
@@ -225,6 +230,11 @@ dropFunction
: DROP FUNCTION udfName=identifier
;
+// Drop Pipe Plugin
+dropPipePlugin
+ : DROP PIPEPLUGIN pluginName=identifier
+ ;
+
// Drop Trigger
dropTrigger
: DROP TRIGGER triggerName=identifier
@@ -329,6 +339,11 @@ showTriggers
: SHOW TRIGGERS
;
+// Show Pipe Plugins
+showPipePlugins
+ : SHOW PIPEPLUGINS
+ ;
+
// Show Continuous Queries
showContinuousQueries
: SHOW (CONTINUOUS QUERIES | CQS)
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index 24ba612b93..cdfcea1ad3 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -454,6 +454,14 @@ PIPESINKTYPE
: P I P E S I N K T Y P E
;
+PIPEPLUGIN
+ : P I P E P L U G I N
+ ;
+
+PIPEPLUGINS
+ : P I P E P L U G I N S
+ ;
+
POLICY
: P O L I C Y
;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 71cac276b7..0a725b42b5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -168,6 +168,13 @@ public class ConfigNodeConfig {
private String triggerTemporaryLibDir =
triggerDir + File.separator + IoTDBConstant.TMP_FOLDER_NAME;
+ /** External lib directory for pipe, stores user-uploaded JAR files */
+ private String pipeDir =
+ IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.PIPE_FOLDER_NAME;
+
+ /** External temporary lib directory for storing downloaded pipe JAR files */
+ private String pipeTemporaryLibDir = pipeDir + File.separator + IoTDBConstant.TMP_FOLDER_NAME;
+
/** Time partition interval in milliseconds */
private long timePartitionInterval = 604_800_000;
@@ -305,6 +312,8 @@ public class ConfigNodeConfig {
udfTemporaryLibDir = addHomeDir(udfTemporaryLibDir);
triggerDir = addHomeDir(triggerDir);
triggerTemporaryLibDir = addHomeDir(triggerTemporaryLibDir);
+ pipeDir = addHomeDir(pipeDir);
+ pipeTemporaryLibDir = addHomeDir(pipeTemporaryLibDir);
}
private String addHomeDir(String dir) {
@@ -620,6 +629,23 @@ public class ConfigNodeConfig {
this.triggerTemporaryLibDir = triggerDir + File.separator + IoTDBConstant.TMP_FOLDER_NAME;
}
+ public String getPipeDir() {
+ return pipeDir;
+ }
+
+ public void setPipeDir(String pipeDir) {
+ this.pipeDir = pipeDir;
+ updatePipeTemporaryLibDir();
+ }
+
+ public String getPipeTemporaryLibDir() {
+ return pipeTemporaryLibDir;
+ }
+
+ public void updatePipeTemporaryLibDir() {
+ this.pipeTemporaryLibDir = pipeDir + File.separator + IoTDBConstant.TMP_FOLDER_NAME;
+ }
+
public int getSchemaReplicationFactor() {
return schemaReplicationFactor;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 4aa8d54522..d212c6819c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -330,6 +330,8 @@ public class ConfigNodeDescriptor {
conf.setTriggerDir(properties.getProperty("trigger_lib_dir", conf.getTriggerDir()).trim());
+ conf.setPipeDir(properties.getProperty("pipe_lib_dir", conf.getPipeDir()).trim());
+
conf.setTimePartitionInterval(
Long.parseLong(
properties
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 1c3023588f..8680b11ff7 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -107,6 +107,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateModelReq;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
@@ -127,6 +128,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
@@ -1222,6 +1224,28 @@ public class ConfigManager implements IManager {
: new TGetJarInListResp(status, Collections.emptyList());
}
+ @Override
+ public TSStatus createPipePlugin(TCreatePipePluginReq req) {
+ // TODO: implement PipeManager
+ TSStatus status = confirmLeader();
+ LOGGER.info("createPipePlugin: {}", req);
+ return status;
+ }
+
+ @Override
+ public TSStatus dropPipePlugin(String pipePluginName) {
+ // TODO: implement PipeManager
+ TSStatus status = confirmLeader();
+ return status;
+ }
+
+ @Override
+ public TGetPipePluginTableResp getPipePluginTable() {
+ // TODO: implement PipeManager
+ TSStatus status = confirmLeader();
+ return new TGetPipePluginTableResp(status, Collections.emptyList());
+ }
+
@Override
public TSStatus merge() {
TSStatus status = confirmLeader();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index b82577df94..9b1e1d55f6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -53,6 +53,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateModelReq;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
@@ -72,6 +73,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
@@ -417,6 +419,15 @@ public interface IManager {
/** Get Trigger jar */
TGetJarInListResp getTriggerJar(TGetJarInListReq req);
+ /** Create pipe plugin */
+ TSStatus createPipePlugin(TCreatePipePluginReq req);
+
+ /** Drop pipe plugin */
+ TSStatus dropPipePlugin(String pluginName);
+
+ /** Show pipe plugins */
+ TGetPipePluginTableResp getPipePluginTable();
+
/** Merge on all DataNodes */
TSStatus merge();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 248426f1c5..f8dc8c7efa 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -77,6 +77,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCountDatabaseResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateModelReq;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
@@ -98,6 +99,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
@@ -107,6 +109,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
@@ -691,6 +694,21 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
return configManager.getTriggerJar(req);
}
+ @Override
+ public TSStatus createPipePlugin(TCreatePipePluginReq req) {
+ return configManager.createPipePlugin(req);
+ }
+
+ @Override
+ public TSStatus dropPipePlugin(TDropPipePluginReq req) {
+ return configManager.dropPipePlugin(req.getPluginName());
+ }
+
+ @Override
+ public TGetPipePluginTableResp getPipePluginTable() {
+ return configManager.getPipePluginTable();
+ }
+
@Override
public TSStatus merge() throws TException {
return configManager.merge();
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/auth/entity/PrivilegeType.java b/node-commons/src/main/java/org/apache/iotdb/commons/auth/entity/PrivilegeType.java
index 36c2ecadca..34a7b68472 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/auth/entity/PrivilegeType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/auth/entity/PrivilegeType.java
@@ -56,7 +56,11 @@ public enum PrivilegeType {
READ_TEMPLATE,
APPLY_TEMPLATE(true),
READ_TEMPLATE_APPLICATION,
- SHOW_CONTINUOUS_QUERIES;
+ SHOW_CONTINUOUS_QUERIES,
+ CREATE_PIPEPLUGIN,
+ DROP_PIPEPLUGIN,
+ SHOW_PIPEPLUGINS,
+ ;
private static final int PRIVILEGE_COUNT = values().length;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index 628fdbf8a2..92c6a6aa77 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -223,6 +223,7 @@ public class IoTDBConstant {
public static final String EXT_FOLDER_NAME = "ext";
public static final String UDF_FOLDER_NAME = "udf";
public static final String TRIGGER_FOLDER_NAME = "trigger";
+ public static final String PIPE_FOLDER_NAME = "pipe";
public static final String TMP_FOLDER_NAME = "tmp";
public static final String MQTT_FOLDER_NAME = "mqtt";
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMeta.java
new file mode 100644
index 0000000000..c4ab9ef69d
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/meta/PipePluginMeta.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.meta;
+
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class PipePluginMeta {
+ private String pluginName;
+ private String className;
+ private String pluginType;
+ private String jarName;
+ private String jarMD5;
+
+ private PipePluginMeta() {}
+
+ public PipePluginMeta(String pluginName, String className, String pluginType) {
+ this.pluginName = pluginName;
+ this.className = className;
+ this.pluginType = pluginType;
+ }
+
+ public String getPluginName() {
+ return pluginName;
+ }
+
+ public String getClassName() {
+ return className;
+ }
+
+ public String getPluginType() {
+ return pluginType;
+ }
+
+ public String getJarName() {
+ return jarName;
+ }
+
+ public String getJarMD5() {
+ return jarMD5;
+ }
+
+ public void setPluginName(String pluginName) {
+ this.pluginName = pluginName.toUpperCase();
+ }
+
+ public void setClassName(String className) {
+ this.className = className;
+ }
+
+ public void setPluginType(String pluginType) {
+ this.pluginType = pluginType;
+ }
+
+ public void setJarName(String jarName) {
+ this.jarName = jarName;
+ }
+
+ public void setJarMD5(String jarMD5) {
+ this.jarMD5 = jarMD5;
+ }
+
+ public ByteBuffer serialize() throws IOException {
+ PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+ serialize(outputStream);
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+ }
+
+ public void serialize(DataOutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(pluginName, outputStream);
+ ReadWriteIOUtils.write(className, outputStream);
+ ReadWriteIOUtils.write(pluginType, outputStream);
+ ReadWriteIOUtils.write(jarName, outputStream);
+ ReadWriteIOUtils.write(jarMD5, outputStream);
+ }
+
+ public static PipePluginMeta deserialize(ByteBuffer buffer) {
+ PipePluginMeta pipePluginMeta = new PipePluginMeta();
+ pipePluginMeta.setPluginName(Objects.requireNonNull(ReadWriteIOUtils.readString(buffer)));
+ pipePluginMeta.setClassName(ReadWriteIOUtils.readString(buffer));
+ pipePluginMeta.setPluginType(ReadWriteIOUtils.readString(buffer));
+ pipePluginMeta.setJarName(ReadWriteIOUtils.readString(buffer));
+ pipePluginMeta.setJarMD5(ReadWriteIOUtils.readString(buffer));
+ return pipePluginMeta;
+ }
+
+ public static PipePluginMeta deserialize(InputStream inputStream) throws IOException {
+ return deserialize(
+ ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(inputStream)));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PipePluginMeta that = (PipePluginMeta) o;
+ return Objects.equals(pluginName, that.pluginName)
+ && Objects.equals(className, that.className)
+ && Objects.equals(pluginType, that.pluginType)
+ && Objects.equals(jarName, that.jarName)
+ && Objects.equals(jarMD5, that.jarMD5);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pluginName);
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoader.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoader.java
new file mode 100644
index 0000000000..d221e21a96
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoader.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.service;
+
+import org.apache.iotdb.commons.file.SystemFileFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@ThreadSafe
+public class PipePluginClassLoader extends URLClassLoader {
+
+ private final String libRoot;
+
+ /**
+ * If activeInstanceCount is equals to 0, it means that there is no instance using this
+ * classloader. This classloader can only be closed when activeInstanceCount is equals to 0.
+ */
+ private final AtomicLong activeInstanceCount;
+
+ /**
+ * If this classloader is marked as deprecated, then this classloader can be closed after all
+ * instances that use this classloader are closed.
+ */
+ private volatile boolean deprecated;
+
+ public PipePluginClassLoader(String libRoot) throws IOException {
+ super(new URL[0]);
+ this.libRoot = libRoot;
+ activeInstanceCount = new AtomicLong(0);
+ deprecated = false;
+ addURLs();
+ }
+
+ private void addURLs() throws IOException {
+ try (Stream<Path> pathStream =
+ Files.walk(SystemFileFactory.INSTANCE.getFile(libRoot).toPath())) {
+ // skip directory
+ for (Path path :
+ pathStream.filter(path -> !path.toFile().isDirectory()).collect(Collectors.toList())) {
+ super.addURL(path.toUri().toURL());
+ }
+ }
+ }
+
+ public synchronized void acquire() {
+ activeInstanceCount.incrementAndGet();
+ }
+
+ public synchronized void release() throws IOException {
+ activeInstanceCount.decrementAndGet();
+ closeIfPossible();
+ }
+
+ public synchronized void markAsDeprecated() throws IOException {
+ deprecated = true;
+ closeIfPossible();
+ }
+
+ private void closeIfPossible() throws IOException {
+ if (deprecated && activeInstanceCount.get() == 0) {
+ close();
+ }
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoaderManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoaderManager.java
new file mode 100644
index 0000000000..3766af8bd1
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginClassLoaderManager.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.service;
+
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.commons.service.IService;
+import org.apache.iotdb.commons.service.ServiceType;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+
+@NotThreadSafe
+public class PipePluginClassLoaderManager implements IService {
+
+ private final String libRoot;
+
+ /**
+ * activeClassLoader is used to load all classes under libRoot. libRoot may be updated before the
+ * user executes CREATE PIPEPLUGIN or after the user executes DROP PIPEPLUGIN. Therefore, we need
+ * to continuously maintain the activeClassLoader so that the classes it loads are always
+ * up-to-date.
+ */
+ private volatile PipePluginClassLoader activeClassLoader;
+
+ private PipePluginClassLoaderManager(String libRoot) throws IOException {
+ this.libRoot = libRoot;
+ activeClassLoader = new PipePluginClassLoader(libRoot);
+ }
+
+ public PipePluginClassLoader updateAndGetActiveClassLoader() throws IOException {
+ PipePluginClassLoader deprecatedClassLoader = activeClassLoader;
+ activeClassLoader = new PipePluginClassLoader(libRoot);
+ if (deprecatedClassLoader != null) {
+ deprecatedClassLoader.markAsDeprecated();
+ }
+ return activeClassLoader;
+ }
+
+ public PipePluginClassLoader getActiveClassLoader() {
+ return activeClassLoader;
+ }
+
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+ // IService
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public void start() throws StartupException {
+ try {
+ SystemFileFactory.INSTANCE.makeDirIfNecessary(libRoot);
+ activeClassLoader = new PipePluginClassLoader(libRoot);
+ } catch (IOException e) {
+ throw new StartupException(this.getID().getName(), e.getMessage());
+ }
+ }
+
+ @Override
+ public void stop() {
+ // nothing to do
+ }
+
+ @Override
+ public ServiceType getID() {
+ return ServiceType.PIPE_PLUGIN_CLASSLOADER_MANAGER_SERVICE;
+ }
+
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+ // singleton instance holder
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+
+ private static PipePluginClassLoaderManager INSTANCE = null;
+
+ public static synchronized PipePluginClassLoaderManager getInstance(String libRoot)
+ throws IOException {
+ if (INSTANCE == null) {
+ INSTANCE = new PipePluginClassLoaderManager(libRoot);
+ }
+ return INSTANCE;
+ }
+
+ public static PipePluginClassLoaderManager getInstance() {
+ return INSTANCE;
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java
new file mode 100644
index 0000000000..0466f888ee
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/service/PipePluginExecutableManager.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.plugin.service;
+
+import org.apache.iotdb.commons.executable.ExecutableManager;
+import org.apache.iotdb.commons.file.SystemFileFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+public class PipePluginExecutableManager extends ExecutableManager {
+
+ private static PipePluginExecutableManager INSTANCE = null;
+
+ public PipePluginExecutableManager(String temporaryLibRoot, String libRoot) {
+ super(temporaryLibRoot, libRoot);
+ }
+
+ public static synchronized PipePluginExecutableManager setupAndGetInstance(
+ String temporaryLibRoot, String libRoot) throws IOException {
+ if (INSTANCE == null) {
+ SystemFileFactory.INSTANCE.makeDirIfNecessary(temporaryLibRoot);
+ SystemFileFactory.INSTANCE.makeDirIfNecessary(libRoot);
+ SystemFileFactory.INSTANCE.makeDirIfNecessary(libRoot + File.separator + INSTALL_DIR);
+ INSTANCE = new PipePluginExecutableManager(temporaryLibRoot, libRoot);
+ }
+ return INSTANCE;
+ }
+
+ public static PipePluginExecutableManager getInstance() {
+ return INSTANCE;
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index 4dcfd16a3d..101b50405f 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -72,8 +72,9 @@ public enum ServiceType {
FRAGMENT_INSTANCE_MANAGER_SERVICE("Fragment instance manager", "FragmentInstanceManager"),
MPP_DATA_EXCHANGE_SERVICE("MPP Data exchange manager", "MPPDataExchangeManager"),
INTERNAL_SERVICE("Internal Service", "InternalService"),
- IOT_CONSENSUS_SERVICE("IoTConsensus Service", "IoTConsensusRPCService");
-
+ IOT_CONSENSUS_SERVICE("IoTConsensus Service", "IoTConsensusRPCService"),
+ PIPE_PLUGIN_CLASSLOADER_MANAGER_SERVICE(
+ "Pipe Plugin Classloader Manager Service", "PipePluginClassLoader");
private final String name;
private final String jmxName;
diff --git a/server/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java b/server/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
index 3d1fe71dea..c3e05f4ffc 100644
--- a/server/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
@@ -186,6 +186,8 @@ public class AuditLogger {
case STOP_PIPE:
case DROP_PIPE:
case DEACTIVATE_TEMPLATE:
+ case CREATE_PIPEPLUGIN:
+ case DROP_PIPEPLUGIN:
return AuditLogOperation.DDL;
case LOAD_DATA:
case INSERT:
@@ -229,6 +231,7 @@ public class AuditLogger {
case FETCH_SCHEMA:
case COUNT:
case SHOW_TRIGGERS:
+ case SHOW_PIPEPLUGINS:
return AuditLogOperation.QUERY;
default:
logger.error("Unrecognizable operator type ({}) for audit log", type);
diff --git a/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java b/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
index c661c27421..094328ea79 100644
--- a/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
+++ b/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
@@ -237,6 +237,12 @@ public class AuthorityChecker {
return PrivilegeType.READ_TEMPLATE_APPLICATION.ordinal();
case SHOW_CONTINUOUS_QUERIES:
return PrivilegeType.SHOW_CONTINUOUS_QUERIES.ordinal();
+ case CREATE_PIPEPLUGIN:
+ return PrivilegeType.CREATE_PIPEPLUGIN.ordinal();
+ case DROP_PIPEPLUGIN:
+ return PrivilegeType.DROP_PIPEPLUGIN.ordinal();
+ case SHOW_PIPEPLUGINS:
+ return PrivilegeType.SHOW_PIPEPLUGINS.ordinal();
default:
logger.error("Unrecognizable operator type ({}) for AuthorityChecker.", type);
return -1;
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 7ebe15a043..a0673904c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCountDatabaseResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateModelReq;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
@@ -64,6 +65,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
@@ -73,6 +75,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
@@ -1401,6 +1404,69 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
throw new TException(MSG_RECONNECTION_FAIL);
}
+ @Override
+ public TSStatus createPipePlugin(TCreatePipePluginReq req) throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TSStatus status = client.createPipePlugin(req);
+ if (!updateConfigNodeLeader(status)) {
+ return status;
+ }
+ } catch (TException e) {
+ logger.warn(
+ "Failed to connect to ConfigNode {} from DataNode {} when executing {}",
+ configNode,
+ config.getAddressAndPort(),
+ Thread.currentThread().getStackTrace()[1].getMethodName());
+ configLeader = null;
+ }
+ waitAndReconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
+ @Override
+ public TSStatus dropPipePlugin(TDropPipePluginReq req) throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TSStatus status = client.dropPipePlugin(req);
+ if (!updateConfigNodeLeader(status)) {
+ return status;
+ }
+ } catch (TException e) {
+ logger.warn(
+ "Failed to connect to ConfigNode {} from DataNode {} when executing {}",
+ configNode,
+ config.getAddressAndPort(),
+ Thread.currentThread().getStackTrace()[1].getMethodName());
+ configLeader = null;
+ }
+ waitAndReconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
+ @Override
+ public TGetPipePluginTableResp getPipePluginTable() throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TGetPipePluginTableResp resp = client.getPipePluginTable();
+ if (!updateConfigNodeLeader(resp.getStatus())) {
+ return resp;
+ }
+ } catch (TException e) {
+ logger.warn(
+ "Failed to connect to ConfigNode {} from DataNode {} when executing {}",
+ configNode,
+ config.getAddressAndPort(),
+ Thread.currentThread().getStackTrace()[1].getMethodName());
+ configLeader = null;
+ }
+ waitAndReconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
@Override
public TSStatus createSchemaTemplate(TCreateSchemaTemplateReq req) throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
index baaf9a773c..61713b5e34 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
@@ -96,6 +96,10 @@ public class ColumnHeaderConstant {
public static final String PATH_PATTERN = "PathPattern";
public static final String CLASS_NAME = "ClassName";
+ // column names for show pipe plugins statement
+ public static final String PLUGIN_NAME = "PluginName";
+ public static final String PLUGIN_TYPE = "PluginType";
+
// show cluster status
public static final String NODE_TYPE_CONFIG_NODE = "ConfigNode";
public static final String NODE_TYPE_DATA_NODE = "DataNode";
@@ -325,6 +329,12 @@ public class ColumnHeaderConstant {
new ColumnHeader(CLASS_NAME, TSDataType.TEXT),
new ColumnHeader(NODE_ID, TSDataType.TEXT));
+ public static final List<ColumnHeader> showPipePluginsColumnHeaders =
+ ImmutableList.of(
+ new ColumnHeader(PLUGIN_NAME, TSDataType.TEXT),
+ new ColumnHeader(PLUGIN_TYPE, TSDataType.TEXT),
+ new ColumnHeader(CLASS_NAME, TSDataType.TEXT));
+
public static final List<ColumnHeader> showSchemaTemplateHeaders =
ImmutableList.of(new ColumnHeader(TEMPLATE_NAME, TSDataType.TEXT));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
index 7e8eb19b05..e47e4370ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
@@ -99,6 +99,10 @@ public class DatasetHeaderFactory {
return new DatasetHeader(ColumnHeaderConstant.showTriggersColumnHeaders, true);
}
+ public static DatasetHeader getShowPipePluginsHeader() {
+ return new DatasetHeader(ColumnHeaderConstant.showPipePluginsColumnHeaders, true);
+ }
+
public static DatasetHeader getShowRegionHeader() {
return new DatasetHeader(ColumnHeaderConstant.showRegionColumnHeaders, true);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
index b5d71185e6..8aa4efdfac 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
@@ -22,12 +22,14 @@ package org.apache.iotdb.db.mpp.plan.execution.config;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.CountStorageGroupTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.CreateContinuousQueryTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.CreateFunctionTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.metadata.CreatePipePluginTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.CreateTriggerTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.DatabaseSchemaTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.DeleteStorageGroupTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.DeleteTimeSeriesTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.DropContinuousQueryTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.DropFunctionTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.metadata.DropPipePluginTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.DropTriggerTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.GetRegionIdTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.GetSeriesSlotListTask;
@@ -40,6 +42,7 @@ import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowConfigNodesTas
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowContinuousQueriesTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowDataNodesTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowFunctionsTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowPipePluginsTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowRegionTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowStorageGroupTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowTTLTask;
@@ -75,12 +78,14 @@ import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateContinuousQueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateFunctionStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreatePipePluginStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTriggerStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DatabaseSchemaStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DropContinuousQueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DropFunctionStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.DropPipePluginStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DropTriggerStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.GetRegionIdStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.GetSeriesSlotListStatement;
@@ -92,6 +97,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowConfigNodesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowContinuousQueriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDataNodesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowFunctionsStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowPipePluginsStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowRegionStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTTLStatement;
@@ -269,6 +275,24 @@ public class ConfigTaskVisitor
return new ShowTriggersTask();
}
+ @Override
+ public IConfigTask visitCreatePipePlugin(
+ CreatePipePluginStatement createPipePluginStatement, TaskContext context) {
+ return new CreatePipePluginTask(createPipePluginStatement);
+ }
+
+ @Override
+ public IConfigTask visitDropPipePlugin(
+ DropPipePluginStatement dropPipePluginStatement, TaskContext context) {
+ return new DropPipePluginTask(dropPipePluginStatement);
+ }
+
+ @Override
+ public IConfigTask visitShowPipePlugins(
+ ShowPipePluginsStatement showPipePluginStatement, TaskContext context) {
+ return new ShowPipePluginsTask();
+ }
+
@Override
public IConfigTask visitShowRegion(ShowRegionStatement showRegionStatement, TaskContext context) {
return new ShowRegionTask(showRegionStatement);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index a079c260e4..14838989c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -32,12 +32,15 @@ import org.apache.iotdb.commons.executable.ExecutableManager;
import org.apache.iotdb.commons.executable.ExecutableResource;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoader;
+import org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFClassLoader;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.confignode.rpc.thrift.TCountDatabaseResp;
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
@@ -47,8 +50,10 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteDatabasesReq;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
@@ -109,6 +114,7 @@ import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeTask;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateContinuousQueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateFunctionStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreatePipePluginStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTriggerStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DatabaseSchemaStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
@@ -336,7 +342,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
future.setException(
new IoTDBException(
"URI is empty, please specify the URI.",
- TSStatusCode.TRIGGER_DOWNLOAD_ERROR.getStatusCode()));
+ TSStatusCode.UDF_DOWNLOAD_ERROR.getStatusCode()));
return future;
}
jarFileName = new File(uriString).getName();
@@ -346,7 +352,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
future.setException(
new IoTDBException(
"The scheme of URI is not set, please specify the scheme of URI.",
- TSStatusCode.TRIGGER_DOWNLOAD_ERROR.getStatusCode()));
+ TSStatusCode.UDF_DOWNLOAD_ERROR.getStatusCode()));
return future;
}
if (!uri.getScheme().equals("file")) {
@@ -470,7 +476,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
getUDFTableResp.getStatus().message, getUDFTableResp.getStatus().code));
return future;
}
- // convert triggerTable and buildTsBlock
+ // convert UDFTable and buildTsBlock
ShowFunctionsTask.buildTsBlock(getUDFTableResp.getAllUDFInformation(), future);
} catch (ClientManagerException | TException e) {
future.setException(e);
@@ -507,7 +513,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
future.setException(
new IoTDBException(
"URI is empty, please specify the URI.",
- TSStatusCode.UDF_DOWNLOAD_ERROR.getStatusCode()));
+ TSStatusCode.TRIGGER_DOWNLOAD_ERROR.getStatusCode()));
return future;
}
jarFileName = new File(uriString).getName();
@@ -654,6 +660,170 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
return future;
}
+ @Override
+ public SettableFuture<ConfigTaskResult> createPipePlugin(
+ CreatePipePluginStatement createPipePluginStatement) {
+ final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ final String pluginName = createPipePluginStatement.getPluginName();
+ final String className = createPipePluginStatement.getClassName();
+ final String uriString = createPipePluginStatement.getUriString();
+
+ if (uriString == null || uriString.isEmpty()) {
+ future.setException(
+ new IoTDBException(
+ "Failed to create pipe plugin, because the URI is empty.",
+ TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode()));
+ return future;
+ }
+
+ try (final ConfigNodeClient client =
+ CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ String libRoot;
+ ByteBuffer jarFile;
+ String jarMd5;
+
+ final String jarFileName = new File(uriString).getName();
+ try {
+ URI uri = new URI(uriString);
+ if (uri.getScheme() == null) {
+ future.setException(
+ new IoTDBException(
+ "The scheme of URI is not set, please specify the scheme of URI.",
+ TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode()));
+ return future;
+ }
+ if (!uri.getScheme().equals("file")) {
+ // Download executable
+ ExecutableResource resource =
+ PipePluginExecutableManager.getInstance()
+ .request(Collections.singletonList(uriString));
+ String jarFilePathUnderTempDir =
+ PipePluginExecutableManager.getInstance()
+ .getDirStringUnderTempRootByRequestId(resource.getRequestId())
+ + File.separator
+ + jarFileName;
+ // libRoot should be the path of the specified jar
+ libRoot = jarFilePathUnderTempDir;
+ jarFile = ExecutableManager.transferToBytebuffer(jarFilePathUnderTempDir);
+ jarMd5 = DigestUtils.md5Hex(Files.newInputStream(Paths.get(jarFilePathUnderTempDir)));
+ } else {
+ // libRoot should be the path of the specified jar
+ libRoot = new File(new URI(uriString)).getAbsolutePath();
+ // If jarPath is a file path on datanode, we transfer it to ByteBuffer and send it to
+ // ConfigNode.
+ jarFile = ExecutableManager.transferToBytebuffer(libRoot);
+ // Set md5 of the jar file
+ jarMd5 = DigestUtils.md5Hex(Files.newInputStream(Paths.get(libRoot)));
+ }
+ } catch (IOException | URISyntaxException e) {
+ LOGGER.warn(
+ "Failed to get executable for PipePlugin({}) using URI: {}, the cause is: {}",
+ createPipePluginStatement.getPluginName(),
+ createPipePluginStatement.getUriString(),
+ e);
+ future.setException(
+ new IoTDBException(
+ "Failed to get executable for PipePlugin"
+ + createPipePluginStatement.getPluginName()
+ + "', please check the URI.",
+ TSStatusCode.PIPE_PLUGIN_DOWNLOAD_ERROR.getStatusCode()));
+ return future;
+ }
+
+ // try to create instance, this request will fail if creation is not successful
+ try (PipePluginClassLoader classLoader = new PipePluginClassLoader(libRoot)) {
+ // ensure that jar file contains the class and the class is a pipe plugin
+ Class<?> clazz = Class.forName(createPipePluginStatement.getClassName(), true, classLoader);
+ clazz.getDeclaredConstructor().newInstance();
+ } catch (ClassNotFoundException
+ | NoSuchMethodException
+ | InstantiationException
+ | IllegalAccessException
+ | InvocationTargetException
+ | ClassCastException e) {
+ LOGGER.warn(
+ "Failed to create function when try to create PipePlugin({}) instance first, the cause is: {}",
+ createPipePluginStatement.getPluginName(),
+ e);
+ future.setException(
+ new IoTDBException(
+ "Failed to load class '"
+ + createPipePluginStatement.getClassName()
+ + "', because it's not found in jar file: "
+ + createPipePluginStatement.getUriString(),
+ TSStatusCode.PIPE_PLUGIN_LOAD_CLASS_ERROR.getStatusCode()));
+ return future;
+ }
+
+ final TSStatus executionStatus =
+ client.createPipePlugin(
+ new TCreatePipePluginReq()
+ .setPluginName(pluginName)
+ .setClassName(className)
+ .setJarFile(jarFile)
+ .setJarMD5(jarMd5)
+ .setJarName(
+ String.format(
+ "%s-%s.%s",
+ jarFileName.substring(0, jarFileName.lastIndexOf(".")),
+ jarMd5,
+ jarFileName.substring(jarFileName.lastIndexOf(".") + 1))));
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
+ LOGGER.warn(
+ "Failed to create PipePlugin {}({}) because {}",
+ pluginName,
+ className,
+ executionStatus.getMessage());
+ future.setException(new IoTDBException(executionStatus.message, executionStatus.code));
+ } else {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
+ } catch (ClientManagerException | TException | IOException e) {
+ future.setException(e);
+ }
+ return future;
+ }
+
+ @Override
+ public SettableFuture<ConfigTaskResult> dropPipePlugin(String pluginName) {
+ final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try (final ConfigNodeClient client =
+ CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ final TSStatus executionStatus = client.dropPipePlugin(new TDropPipePluginReq(pluginName));
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
+ LOGGER.warn("[{}] Failed to drop pipe plugin {}.", executionStatus, pluginName);
+ future.setException(new IoTDBException(executionStatus.message, executionStatus.code));
+ } else {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
+ } catch (ClientManagerException | TException e) {
+ future.setException(e);
+ }
+ return future;
+ }
+
+ @Override
+ public SettableFuture<ConfigTaskResult> showPipePlugins() {
+ final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try (final ConfigNodeClient client =
+ CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ TGetPipePluginTableResp getPipePluginTableResp = client.getPipePluginTable();
+ if (getPipePluginTableResp.getStatus().getCode()
+ != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ future.setException(
+ new IoTDBException(
+ getPipePluginTableResp.getStatus().message,
+ getPipePluginTableResp.getStatus().code));
+ return future;
+ }
+ // convert PipePluginTable and buildTsBlock
+ ShowFunctionsTask.buildTsBlock(getPipePluginTableResp.getAllPipePluginInformation(), future);
+ } catch (ClientManagerException | TException e) {
+ future.setException(e);
+ }
+ return future;
+ }
+
@Override
public SettableFuture<ConfigTaskResult> setTTL(SetTTLStatement setTTLStatement, String taskName) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
index 03ac9fa524..08260d55a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateContinuousQueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateFunctionStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreatePipePluginStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTriggerStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DatabaseSchemaStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
@@ -86,6 +87,12 @@ public interface IConfigTaskExecutor {
SettableFuture<ConfigTaskResult> showTriggers();
+ SettableFuture<ConfigTaskResult> createPipePlugin(CreatePipePluginStatement createPipeStatement);
+
+ SettableFuture<ConfigTaskResult> dropPipePlugin(String pluginName);
+
+ SettableFuture<ConfigTaskResult> showPipePlugins();
+
SettableFuture<ConfigTaskResult> setTTL(SetTTLStatement setTTLStatement, String taskName);
SettableFuture<ConfigTaskResult> merge(boolean onCluster);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreatePipePluginTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreatePipePluginTask.java
new file mode 100644
index 0000000000..91c0d68dbb
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreatePipePluginTask.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.config.metadata;
+
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreatePipePluginStatement;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class CreatePipePluginTask implements IConfigTask {
+
+ private final CreatePipePluginStatement createPipePluginStatement;
+
+ public CreatePipePluginTask(CreatePipePluginStatement createPipePluginStatement) {
+ this.createPipePluginStatement = createPipePluginStatement;
+ }
+
+ @Override
+ public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
+ throws InterruptedException {
+ return configTaskExecutor.createPipePlugin(createPipePluginStatement);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/DropPipePluginTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/DropPipePluginTask.java
new file mode 100644
index 0000000000..a73bfff76b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/DropPipePluginTask.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.config.metadata;
+
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.DropPipePluginStatement;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class DropPipePluginTask implements IConfigTask {
+
+ private final String pluginName;
+
+ public DropPipePluginTask(DropPipePluginStatement dropPipePluginStatement) {
+ this.pluginName = dropPipePluginStatement.getPluginName();
+ }
+
+ @Override
+ public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
+ throws InterruptedException {
+ return configTaskExecutor.dropPipePlugin(pluginName);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowPipePluginsTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowPipePluginsTask.java
new file mode 100644
index 0000000000..bab468ee77
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowPipePluginsTask.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.config.metadata;
+
+import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory;
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ShowPipePluginsTask implements IConfigTask {
+
+ @Override
+ public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
+ throws InterruptedException {
+ return configTaskExecutor.showPipePlugins();
+ }
+
+ public static void buildTsBlock(
+ List<ByteBuffer> allPipePluginsInformation, SettableFuture<ConfigTaskResult> future) {
+ final List<PipePluginMeta> pipePluginMetaList = new ArrayList<>();
+ if (allPipePluginsInformation != null) {
+ for (final ByteBuffer pipePluginInformationByteBuffer : allPipePluginsInformation) {
+ pipePluginMetaList.add(PipePluginMeta.deserialize(pipePluginInformationByteBuffer));
+ }
+ }
+ pipePluginMetaList.sort(Comparator.comparing(PipePluginMeta::getPluginName));
+
+ final List<TSDataType> outputDataTypes =
+ ColumnHeaderConstant.showPipePluginsColumnHeaders.stream()
+ .map(ColumnHeader::getColumnType)
+ .collect(Collectors.toList());
+ final TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
+ for (final PipePluginMeta pipePluginMeta : pipePluginMetaList) {
+ builder.getTimeColumnBuilder().writeLong(0L);
+ builder.getColumnBuilder(0).writeBinary(Binary.valueOf(pipePluginMeta.getPluginName()));
+ builder.getColumnBuilder(1).writeBinary(Binary.valueOf(pipePluginMeta.getPluginType()));
+ builder.getColumnBuilder(2).writeBinary(Binary.valueOf(pipePluginMeta.getClassName()));
+ builder.declarePosition();
+ }
+
+ future.set(
+ new ConfigTaskResult(
+ TSStatusCode.SUCCESS_STATUS,
+ builder.build(),
+ DatasetHeaderFactory.getShowPipePluginsHeader()));
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index c5ac196776..b31ccbd50f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -106,6 +106,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateContinuousQueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateFunctionStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreatePipePluginStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTriggerStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DatabaseSchemaStatement;
@@ -113,6 +114,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStateme
import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DropContinuousQueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DropFunctionStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.DropPipePluginStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DropTriggerStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.GetRegionIdStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.GetSeriesSlotListStatement;
@@ -127,6 +129,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowContinuousQueriesStat
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDataNodesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowFunctionsStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowPipePluginsStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowRegionStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTTLStatement;
@@ -713,13 +716,13 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
// Create Function
@Override
public Statement visitCreateFunction(CreateFunctionContext ctx) {
- if (ctx.uriClasue() == null) {
+ if (ctx.uriClause() == null) {
return new CreateFunctionStatement(
parseIdentifier(ctx.udfName.getText()),
parseStringLiteral(ctx.className.getText()),
false);
} else {
- String uriString = parseAndValidateURI(ctx.uriClasue());
+ String uriString = parseAndValidateURI(ctx.uriClause());
return new CreateFunctionStatement(
parseIdentifier(ctx.udfName.getText()),
parseStringLiteral(ctx.className.getText()),
@@ -728,7 +731,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
}
}
- private String parseAndValidateURI(IoTDBSqlParser.UriClasueContext ctx) {
+ private String parseAndValidateURI(IoTDBSqlParser.UriClauseContext ctx) {
String uriString = parseStringLiteral(ctx.uri().getText());
try {
URI uri = new URI(uriString);
@@ -768,7 +771,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
parseAttributeValue(triggerAttributeContext.value));
}
}
- if (ctx.uriClasue() == null) {
+ if (ctx.uriClause() == null) {
return new CreateTriggerStatement(
parseIdentifier(ctx.triggerName.getText()),
parseStringLiteral(ctx.className.getText()),
@@ -781,7 +784,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
parsePrefixPath(ctx.prefixPath()),
attributes);
} else {
- String uriString = parseAndValidateURI(ctx.uriClasue());
+ String uriString = parseAndValidateURI(ctx.uriClause());
return new CreateTriggerStatement(
parseIdentifier(ctx.triggerName.getText()),
parseStringLiteral(ctx.className.getText()),
@@ -808,6 +811,27 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
return new ShowTriggersStatement();
}
+ // Create PipePlugin =====================================================================
+ @Override
+ public Statement visitCreatePipePlugin(IoTDBSqlParser.CreatePipePluginContext ctx) {
+ return new CreatePipePluginStatement(
+ parseIdentifier(ctx.pluginName.getText()),
+ parseStringLiteral(ctx.className.getText()),
+ parseAndValidateURI(ctx.uriClause()));
+ }
+
+ // Drop PipePlugin =====================================================================
+ @Override
+ public Statement visitDropPipePlugin(IoTDBSqlParser.DropPipePluginContext ctx) {
+ return new DropPipePluginStatement(parseIdentifier(ctx.pluginName.getText()));
+ }
+
+ // Show PipePlugins =====================================================================
+ @Override
+ public Statement visitShowPipePlugins(IoTDBSqlParser.ShowPipePluginsContext ctx) {
+ return new ShowPipePluginsStatement();
+ }
+
// Show Child Paths =====================================================================
@Override
public Statement visitShowChildPaths(IoTDBSqlParser.ShowChildPathsContext ctx) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementType.java
index c4d4fdfeec..df70af6c4e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementType.java
@@ -152,5 +152,9 @@ public enum StatementType {
DEACTIVATE_TEMPLATE,
INTERNAL_BATCH_ACTIVATE_TEMPLATE,
- INTERNAL_CREATE_MULTI_TIMESERIES
+ INTERNAL_CREATE_MULTI_TIMESERIES,
+
+ CREATE_PIPEPLUGIN,
+ DROP_PIPEPLUGIN,
+ SHOW_PIPEPLUGINS,
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index 9cac1d0646..c9304ece7e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesSt
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateContinuousQueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateFunctionStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreatePipePluginStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTriggerStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DatabaseSchemaStatement;
@@ -49,6 +50,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStateme
import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DropContinuousQueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DropFunctionStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.DropPipePluginStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DropTriggerStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.GetRegionIdStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.GetSeriesSlotListStatement;
@@ -63,6 +65,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowContinuousQueriesStat
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDataNodesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowFunctionsStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowPipePluginsStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowRegionStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTTLStatement;
@@ -214,6 +217,19 @@ public abstract class StatementVisitor<R, C> {
return visitStatement(showTriggersStatement, context);
}
+ // Pipe Plugin
+ public R visitCreatePipePlugin(CreatePipePluginStatement createPipePluginStatement, C context) {
+ return visitStatement(createPipePluginStatement, context);
+ }
+
+ public R visitDropPipePlugin(DropPipePluginStatement dropPipePluginStatement, C context) {
+ return visitStatement(dropPipePluginStatement, context);
+ }
+
+ public R visitShowPipePlugins(ShowPipePluginsStatement showPipePluginsStatement, C context) {
+ return visitStatement(showPipePluginsStatement, context);
+ }
+
/** Data Manipulation Language (DML) */
// Select Statement
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreatePipePluginStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreatePipePluginStatement.java
new file mode 100644
index 0000000000..332ee48c41
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreatePipePluginStatement.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.metadata;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+
+import java.util.Collections;
+import java.util.List;
+
+public class CreatePipePluginStatement extends Statement implements IConfigStatement {
+
+ private final String pluginName;
+ private final String className;
+ private final String uriString;
+
+ public CreatePipePluginStatement(String pluginName, String className, String uriString) {
+ super();
+ statementType = StatementType.CREATE_PIPEPLUGIN;
+ this.pluginName = pluginName;
+ this.className = className;
+ this.uriString = uriString;
+ }
+
+ public String getPluginName() {
+ return pluginName;
+ }
+
+ public String getClassName() {
+ return className;
+ }
+
+ public String getUriString() {
+ return uriString;
+ }
+
+ @Override
+ public QueryType getQueryType() {
+ return QueryType.WRITE;
+ }
+
+ @Override
+ public List<? extends PartialPath> getPaths() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitCreatePipePlugin(this, context);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/DropPipePluginStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/DropPipePluginStatement.java
new file mode 100644
index 0000000000..a5d883962e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/DropPipePluginStatement.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.metadata;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+
+import java.util.Collections;
+import java.util.List;
+
+public class DropPipePluginStatement extends Statement implements IConfigStatement {
+
+ private final String pluginName;
+
+ public DropPipePluginStatement(String pluginName) {
+ super();
+ statementType = StatementType.DROP_PIPEPLUGIN;
+ this.pluginName = pluginName;
+ }
+
+ public String getPluginName() {
+ return pluginName;
+ }
+
+ @Override
+ public QueryType getQueryType() {
+ return QueryType.WRITE;
+ }
+
+ @Override
+ public List<? extends PartialPath> getPaths() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitDropPipePlugin(this, context);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowPipePluginsStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowPipePluginsStatement.java
new file mode 100644
index 0000000000..c5466b3e15
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowPipePluginsStatement.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.metadata;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+
+import java.util.Collections;
+import java.util.List;
+
+public class ShowPipePluginsStatement extends ShowStatement implements IConfigStatement {
+
+ public ShowPipePluginsStatement() {
+ super();
+ statementType = StatementType.SHOW_PIPEPLUGINS;
+ }
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitShowPipePlugins(this, context);
+ }
+
+ @Override
+ public QueryType getQueryType() {
+ return QueryType.READ;
+ }
+
+ @Override
+ public List<PartialPath> getPaths() {
+ return Collections.emptyList();
+ }
+}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 1702db68fe..b089d9248e 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -184,8 +184,14 @@ public enum TSStatusCode {
CQ_UPDATE_LAST_EXEC_TIME_ERROR(1403),
// ML Model
- CREATE_MODEL_ERROR(1400),
- DROP_MODEL_ERROR(1401);
+ CREATE_MODEL_ERROR(1500),
+ DROP_MODEL_ERROR(1501),
+
+ // Pipe Plugin
+ CREATE_PIPE_PLUGIN_ERROR(1600),
+ DROP_PIPE_PLUGIN_ERROR(1601),
+ PIPE_PLUGIN_LOAD_CLASS_ERROR(1602),
+ PIPE_PLUGIN_DOWNLOAD_ERROR(1603);
private final int statusCode;
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index f27532fac5..72797d0119 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -435,6 +435,25 @@ struct TGetDataNodeLocationsResp {
2: required list<common.TDataNodeLocation> dataNodeLocationList
}
+// Pipe Plugin
+struct TCreatePipePluginReq {
+ 1: required string pluginName
+ 2: required string className
+ 4: required string jarName
+ 5: required binary jarFile
+ 6: required string jarMD5
+}
+
+struct TDropPipePluginReq {
+ 1: required string pluginName
+}
+
+// Get PipePlugin table from config node
+struct TGetPipePluginTableResp {
+ 1: required common.TSStatus status
+ 2: required list<binary> allPipePluginInformation
+}
+
// Show cluster
struct TShowClusterResp {
1: required common.TSStatus status
@@ -1055,6 +1074,31 @@ service IConfigNodeRPCService {
*/
TGetJarInListResp getTriggerJar(TGetJarInListReq req)
+ // ======================================================
+ // Pipe Plugin
+ // ======================================================
+
+ /**
+ * Create a pipe plugin on the specified DataNode
+ *
+ * @return SUCCESS_STATUS if the pipe plugin was created successfully
+ * EXECUTE_STATEMENT_ERROR if operations on any node failed
+ */
+ common.TSStatus createPipePlugin(TCreatePipePluginReq req)
+
+ /**
+ * Remove a pipe plugin on the DataNodes
+ *
+ * @return SUCCESS_STATUS if the pipe plugin was removed successfully
+ * EXECUTE_STATEMENT_ERROR if operations on any node failed
+ */
+ common.TSStatus dropPipePlugin(TDropPipePluginReq req)
+
+ /**
+ * Return the pipe plugin table
+ */
+ TGetPipePluginTableResp getPipePluginTable();
+
// ======================================================
// Maintenance Tools
// ======================================================