You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/05/26 02:29:04 UTC
[iotdb] branch master updated: [IOTDB-3227] UDF Management in MPP Cluster: CREATE FUNCTION (#6014)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d7d69709f1 [IOTDB-3227] UDF Management in MPP Cluster: CREATE FUNCTION (#6014)
d7d69709f1 is described below
commit d7d69709f11f3293a9a2da109c29fe89eb0ff041
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu May 26 10:28:59 2022 +0800
[IOTDB-3227] UDF Management in MPP Cluster: CREATE FUNCTION (#6014)
New Syntax: CREATE FUNCTION AS function-name USING uris
---
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 6 +-
.../resources/conf/iotdb-confignode.properties | 28 ++-
.../confignode/client/AsyncDataNodeClientPool.java | 16 ++
.../client/handlers/CreateFunctionHandler.java | 58 ++++++
.../iotdb/confignode/conf/ConfigNodeConf.java | 36 +++-
.../confignode/conf/ConfigNodeDescriptor.java | 5 +-
.../consensus/request/ConfigRequest.java | 4 +
.../consensus/request/ConfigRequestType.java | 1 +
.../consensus/request/write/CreateFunctionReq.java | 85 +++++++++
.../iotdb/confignode/manager/ConfigManager.java | 21 ++-
.../apache/iotdb/confignode/manager/Manager.java | 9 +
.../iotdb/confignode/manager/UDFManager.java | 119 +++++++++++++
.../iotdb/confignode/persistence/UDFInfo.java | 117 ++++++++++++
.../executor/ConfigRequestExecutor.java | 11 +-
.../iotdb/confignode/service/ConfigNode.java | 18 ++
.../thrift/ConfigNodeRPCServiceProcessor.java | 6 +
.../apache/iotdb/commons/conf/IoTDBConstant.java | 1 +
.../apache/iotdb/commons/service/ServiceType.java | 1 +
.../iotdb/commons/udf/service/SnapshotUtils.java | 96 ++++++++++
.../commons/udf/service/UDFClassLoaderManager.java | 8 +
.../commons/udf/service/UDFExecutableManager.java | 196 +++++++++++++++++++++
.../commons/udf/service/UDFExecutableResource.java | 39 ++++
.../iotdb/commons/udf/service/UDFLogWriter.java | 21 ++-
.../udf/service/UDFRegistrationInformation.java | 13 +-
.../udf/service/UDFRegistrationService.java | 127 +++++++++++--
.../apache/iotdb/db/client/ConfigNodeClient.java | 17 ++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 9 +
.../plan/execution/config/ConfigTaskVisitor.java | 8 +
.../plan/execution/config/CreateFunctionTask.java | 117 ++++++++++++
.../iotdb/db/mpp/plan/parser/ASTVisitor.java | 29 +++
.../db/mpp/plan/statement/StatementVisitor.java | 6 +
.../metadata/CreateFunctionStatement.java | 73 ++++++++
.../java/org/apache/iotdb/db/service/DataNode.java | 30 ++--
.../service/thrift/impl/InternalServiceImpl.java | 20 +++
.../src/main/thrift/confignode.thrift | 11 ++
thrift/src/main/thrift/mpp.thrift | 15 +-
36 files changed, 1329 insertions(+), 48 deletions(-)
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 73e6a76121..326eac60b0 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -115,7 +115,11 @@ createTimeseriesOfSchemaTemplate
// Create Function
createFunction
- : CREATE FUNCTION udfName=identifier AS className=STRING_LITERAL
+ : CREATE FUNCTION udfName=identifier AS className=STRING_LITERAL (USING uri (COMMA uri)*)?
+ ;
+
+uri
+ : STRING_LITERAL
;
// Create Trigger
diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index b16a399ec3..8fb85749b6 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -212,9 +212,9 @@ target_confignode=0.0.0.0:22277
# proc_wal_dir=data/proc
-# UDF dir
+# UDF lib dir
# If this property is unset, system will save the data in the default relative path directory under
-# the UDF folder(i.e., %CONFIGNODE_HOME%/data/udf).
+# the UDF folder(i.e., %CONFIGNODE_HOME%/ext/udf).
#
# If it is absolute, system will save the data in exact location it points to.
# If it is relative, system will save the data in the relative path directory it indicates under the
@@ -225,11 +225,31 @@ target_confignode=0.0.0.0:22277
# For Window platform
# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is
# absolute. Otherwise, it is relative.
-# udf_dir=data\\udf
+# udf_lib_dir=ext\\udf
#
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
-# udf_dir=data/udf
+# udf_lib_dir=ext/udf
+
+
+# temporary lib dir
+# If this property is unset, system will save the data in the default relative path directory under
+# the UDF folder(i.e., %CONFIGNODE_HOME%/ext/temporary).
+#
+# If it is absolute, system will save the data in exact location it points to.
+# If it is relative, system will save the data in the relative path directory it indicates under the
+# UDF folder.
+# Note: If data_dir is assigned an empty string(i.e.,zero-size), it will be handled as a relative
+# path.
+#
+# For Window platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is
+# absolute. Otherwise, it is relative.
+# temporary_lib_dir=ext\\temporary
+#
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+# temporary_lib_dir=ext/temporary
####################
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
index 96d2fe5f1e..17d93b8f85 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
@@ -25,10 +25,12 @@ import org.apache.iotdb.common.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
+import org.apache.iotdb.confignode.client.handlers.CreateFunctionHandler;
import org.apache.iotdb.confignode.client.handlers.CreateRegionHandler;
import org.apache.iotdb.confignode.client.handlers.HeartbeatHandler;
import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
import org.apache.thrift.TException;
@@ -233,6 +235,20 @@ public class AsyncDataNodeClientPool {
clientManager.clear(endPoint);
}
+ /**
+ * Only used in UDFManager
+ *
+ * @param endPoint The specific DataNode
+ */
+ public void createFunction(
+ TEndPoint endPoint, TCreateFunctionRequest request, CreateFunctionHandler handler) {
+ try {
+ clientManager.borrowClient(endPoint).createFunction(request, handler);
+ } catch (Exception e) {
+ LOGGER.error("Failed to asking DataNode to create function: {}", endPoint, e);
+ }
+ }
+
// TODO: Is the ClientPool must be a singleton?
private static class ClientPoolHolder {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/CreateFunctionHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/CreateFunctionHandler.java
new file mode 100644
index 0000000000..e740a5c6a9
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/CreateFunctionHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.client.handlers;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+public class CreateFunctionHandler implements AsyncMethodCallback<TSStatus> {
+
+ private final CountDownLatch countDownLatch;
+ private final List<TSStatus> dataNodeResponseStatus;
+ private final String ip;
+ private final int port;
+
+ public CreateFunctionHandler(
+ CountDownLatch countDownLatch, List<TSStatus> dataNodeResponseStatus, String ip, int port) {
+ this.countDownLatch = countDownLatch;
+ this.dataNodeResponseStatus = dataNodeResponseStatus;
+ this.ip = ip;
+ this.port = port;
+ }
+
+ @Override
+ public void onComplete(TSStatus response) {
+ countDownLatch.countDown();
+ dataNodeResponseStatus.add(response);
+ }
+
+ @Override
+ public void onError(Exception exception) {
+ countDownLatch.countDown();
+ dataNodeResponseStatus.add(
+ new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage("[" + ip + ":" + port + "] " + exception.getMessage()));
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
index 5362f9f95b..0ca14d6dd5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
@@ -103,9 +103,16 @@ public class ConfigNodeConf {
private String consensusDir =
ConfigNodeConstant.DATA_DIR + File.separator + ConfigNodeConstant.CONSENSUS_FOLDER;
- /** UDF directory, storage udf metadata and jars */
- private String udfDir =
- ConfigNodeConstant.DATA_DIR + File.separator + ConfigNodeConstant.UDF_FOLDER;
+ /** External lib directory, stores user-uploaded JAR files */
+ private String extLibDir = IoTDBConstant.EXT_FOLDER_NAME;
+
+ /** External lib directory for UDF, stores user-uploaded JAR files */
+ private String udfLibDir =
+ IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.UDF_FOLDER_NAME;
+
+ /** External temporary lib directory for storing downloaded JAR files */
+ private String temporaryLibDir =
+ IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.TMP_FOLDER_NAME;
/** Time partition interval in seconds */
private long timePartitionInterval = 604800;
@@ -149,6 +156,9 @@ public class ConfigNodeConf {
private void formulateFolders() {
systemDir = addHomeDir(systemDir);
consensusDir = addHomeDir(consensusDir);
+ extLibDir = addHomeDir(extLibDir);
+ udfLibDir = addHomeDir(udfLibDir);
+ temporaryLibDir = addHomeDir(temporaryLibDir);
}
private String addHomeDir(String dir) {
@@ -336,12 +346,24 @@ public class ConfigNodeConf {
this.systemDir = systemDir;
}
- public String getUdfDir() {
- return udfDir;
+ public String getSystemUdfDir() {
+ return getSystemDir() + File.separator + "udf" + File.separator;
+ }
+
+ public String getUdfLibDir() {
+ return udfLibDir;
+ }
+
+ public void setUdfLibDir(String udfLibDir) {
+ this.udfLibDir = udfLibDir;
+ }
+
+ public String getTemporaryLibDir() {
+ return temporaryLibDir;
}
- public void setUdfDir(String udfDir) {
- this.udfDir = udfDir;
+ public void setTemporaryLibDir(String temporaryLibDir) {
+ this.temporaryLibDir = temporaryLibDir;
}
public int getSchemaReplicationFactor() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index b5c58efd92..2a024dd3b0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -175,7 +175,10 @@ public class ConfigNodeDescriptor {
conf.setConsensusDir(properties.getProperty("consensus_dir", conf.getConsensusDir()));
- conf.setUdfDir(properties.getProperty("udf_dir", conf.getUdfDir()));
+ conf.setUdfLibDir(properties.getProperty("udf_lib_dir", conf.getUdfLibDir()));
+
+ conf.setTemporaryLibDir(
+ properties.getProperty("temporary_lib_dir", conf.getTemporaryLibDir()));
conf.setTimePartitionInterval(
Long.parseLong(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
index 50aa61adea..fd7d01ab92 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequest.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionReq;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupReq;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodeReq;
import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionReq;
import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionReq;
import org.apache.iotdb.confignode.consensus.request.write.DeleteProcedureReq;
@@ -187,6 +188,9 @@ public abstract class ConfigRequest implements IConsensusRequest {
case ApplyConfigNode:
req = new ApplyConfigNodeReq();
break;
+ case CreateFunction:
+ req = new CreateFunctionReq();
+ break;
case GetChildPathsPartition:
req = new GetChildPathsPartitionReq();
break;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
index a4b1606371..11fbc14ff4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestType.java
@@ -59,6 +59,7 @@ public enum ConfigRequestType {
ListUserRoles,
ListRoleUsers,
ApplyConfigNode,
+ CreateFunction,
GetChildPathsPartition,
GetChildNodesPartition;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateFunctionReq.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateFunctionReq.java
new file mode 100644
index 0000000000..4cc034a335
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateFunctionReq.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
+import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class CreateFunctionReq extends ConfigRequest {
+
+ private String functionName;
+ private String className;
+ private List<String> uris;
+
+ public CreateFunctionReq() {
+ super(ConfigRequestType.CreateFunction);
+ }
+
+ public CreateFunctionReq(String functionName, String className, List<String> uris) {
+ super(ConfigRequestType.CreateFunction);
+ this.functionName = functionName;
+ this.className = className;
+ this.uris = uris;
+ }
+
+ public String getFunctionName() {
+ return functionName;
+ }
+
+ public String getClassName() {
+ return className;
+ }
+
+ public List<String> getUris() {
+ return uris;
+ }
+
+ @Override
+ protected void serializeImpl(ByteBuffer buffer) {
+ buffer.putInt(getType().ordinal());
+
+ ReadWriteIOUtils.write(functionName, buffer);
+ ReadWriteIOUtils.write(className, buffer);
+
+ final int size = uris.size();
+ ReadWriteIOUtils.write(size, buffer);
+ for (String uri : uris) {
+ ReadWriteIOUtils.write(uri, buffer);
+ }
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ functionName = ReadWriteIOUtils.readString(buffer);
+ className = ReadWriteIOUtils.readString(buffer);
+
+ final int size = ReadWriteIOUtils.readInt(buffer);
+ uris = new ArrayList<>(size);
+ for (int i = 0; i < size; ++i) {
+ uris.add(ReadWriteIOUtils.readString(buffer));
+ }
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index fcdd767d39..83775e0fd8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -59,6 +59,7 @@ import org.apache.iotdb.confignode.persistence.ClusterSchemaInfo;
import org.apache.iotdb.confignode.persistence.NodeInfo;
import org.apache.iotdb.confignode.persistence.PartitionInfo;
import org.apache.iotdb.confignode.persistence.ProcedureInfo;
+import org.apache.iotdb.confignode.persistence.UDFInfo;
import org.apache.iotdb.confignode.persistence.executor.ConfigRequestExecutor;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
@@ -104,6 +105,9 @@ public class ConfigManager implements Manager {
/** Manage procedure */
private final ProcedureManager procedureManager;
+ /** UDF */
+ private final UDFManager udfManager;
+
public ConfigManager() throws IOException {
// Build the persistence module
NodeInfo nodeInfo = new NodeInfo();
@@ -111,11 +115,12 @@ public class ConfigManager implements Manager {
PartitionInfo partitionInfo = new PartitionInfo();
AuthorInfo authorInfo = new AuthorInfo();
ProcedureInfo procedureInfo = new ProcedureInfo();
+ UDFInfo udfInfo = new UDFInfo();
// Build state machine and executor
ConfigRequestExecutor executor =
new ConfigRequestExecutor(
- nodeInfo, clusterSchemaInfo, partitionInfo, authorInfo, procedureInfo);
+ nodeInfo, clusterSchemaInfo, partitionInfo, authorInfo, procedureInfo, udfInfo);
PartitionRegionStateMachine stateMachine = new PartitionRegionStateMachine(this, executor);
// Build the manager module
@@ -124,6 +129,7 @@ public class ConfigManager implements Manager {
this.partitionManager = new PartitionManager(this, partitionInfo);
this.permissionManager = new PermissionManager(this, authorInfo);
this.procedureManager = new ProcedureManager(this, procedureInfo);
+ this.udfManager = new UDFManager(this, udfInfo);
this.loadManager = new LoadManager(this);
this.consensusManager = new ConsensusManager(stateMachine);
@@ -599,6 +605,19 @@ public class ConfigManager implements Manager {
return nodeManager.applyConfigNode(applyConfigNodeReq);
}
+ @Override
+ public TSStatus createFunction(String udfName, String className, List<String> uris) {
+ TSStatus status = confirmLeader();
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ? udfManager.createFunction(udfName, className, uris)
+ : status;
+ }
+
+ @Override
+ public UDFManager getUDFManager() {
+ return udfManager;
+ }
+
public ProcedureManager getProcedureManager() {
return procedureManager;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
index 16a07d050a..47892d1fbc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
@@ -90,6 +90,13 @@ public interface Manager {
*/
LoadManager getLoadManager();
+ /**
+ * Get UDFManager
+ *
+ * @return UDFManager instance
+ */
+ UDFManager getUDFManager();
+
/**
* Register DataNode
*
@@ -218,4 +225,6 @@ public interface Manager {
* @return status
*/
TSStatus applyConfigNode(ApplyConfigNodeReq applyConfigNodeReq);
+
+ TSStatus createFunction(String udfName, String className, List<String> uris);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
new file mode 100644
index 0000000000..a3f7b03034
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.handlers.CreateFunctionHandler;
+import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionReq;
+import org.apache.iotdb.confignode.persistence.UDFInfo;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
+
+public class UDFManager {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(UDFManager.class);
+
+ private final ConfigManager configManager;
+ private final UDFInfo udfInfo;
+
+ public UDFManager(ConfigManager configManager, UDFInfo udfInfo) {
+ this.configManager = configManager;
+ this.udfInfo = udfInfo;
+ }
+
+ public TSStatus createFunction(String functionName, String className, List<String> uris) {
+ try {
+ udfInfo.validateBeforeRegistration(functionName, className, uris);
+
+ final TSStatus configNodeStatus =
+ configManager
+ .getConsensusManager()
+ .write(new CreateFunctionReq(functionName, className, uris))
+ .getStatus();
+ if (configNodeStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return configNodeStatus;
+ }
+
+ return squashDataNodeResponseStatusList(
+ createFunctionOnDataNodes(functionName, className, uris));
+ } catch (Exception e) {
+ final String errorMessage =
+ String.format(
+ "Failed to register UDF %s(class name: %s, uris: %s), because of exception: %s",
+ functionName, className, uris, e);
+ LOGGER.warn(errorMessage);
+ return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage(errorMessage);
+ }
+ }
+
+ private List<TSStatus> createFunctionOnDataNodes(
+ String functionName, String className, List<String> uris) {
+ final List<TDataNodeInfo> onlineDataNodes =
+ configManager.getNodeManager().getOnlineDataNodes(-1);
+ final List<TSStatus> dataNodeResponseStatus =
+ Collections.synchronizedList(new ArrayList<>(onlineDataNodes.size()));
+ final CountDownLatch countDownLatch = new CountDownLatch(onlineDataNodes.size());
+ final TCreateFunctionRequest request =
+ new TCreateFunctionRequest(functionName, className, uris);
+
+ for (TDataNodeInfo dataNodeInfo : onlineDataNodes) {
+ final TEndPoint endPoint = dataNodeInfo.getLocation().getInternalEndPoint();
+ AsyncDataNodeClientPool.getInstance()
+ .createFunction(
+ endPoint,
+ request,
+ new CreateFunctionHandler(
+ countDownLatch, dataNodeResponseStatus, endPoint.getIp(), endPoint.getPort()));
+ }
+
+ try {
+ countDownLatch.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.error("UDFManager was interrupted during creating functions on data nodes", e);
+ }
+
+ return dataNodeResponseStatus;
+ }
+
+ private TSStatus squashDataNodeResponseStatusList(List<TSStatus> dataNodeResponseStatusList) {
+ final List<TSStatus> failedStatus =
+ dataNodeResponseStatusList.stream()
+ .filter(status -> status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode())
+ .collect(Collectors.toList());
+ return failedStatus.isEmpty()
+ ? new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())
+ : new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage(failedStatus.toString());
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java
new file mode 100644
index 0000000000..f0aa594d44
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.iotdb.commons.udf.service.UDFClassLoader;
+import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
+import org.apache.iotdb.commons.udf.service.UDFExecutableResource;
+import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
+import org.apache.iotdb.confignode.conf.ConfigNodeConf;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+public class UDFInfo implements SnapshotProcessor {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(UDFInfo.class);
+
+ private static final ConfigNodeConf CONFIG_NODE_CONF =
+ ConfigNodeDescriptor.getInstance().getConf();
+
+ private final UDFExecutableManager udfExecutableManager;
+ private final UDFRegistrationService udfRegistrationService;
+
+ public UDFInfo() {
+ udfExecutableManager = UDFExecutableManager.getInstance();
+ udfRegistrationService = UDFRegistrationService.getInstance();
+ }
+
+ public synchronized void validateBeforeRegistration(
+ String functionName, String className, List<String> uris) throws Exception {
+ udfRegistrationService.validate(functionName, className);
+
+ if (uris.isEmpty()) {
+ fetchExecutablesAndCheckInstantiation(className);
+ } else {
+ fetchExecutablesAndCheckInstantiation(className, uris);
+ }
+ }
+
+ private void fetchExecutablesAndCheckInstantiation(String className) throws Exception {
+ try (UDFClassLoader temporaryUdfClassLoader =
+ new UDFClassLoader(CONFIG_NODE_CONF.getUdfLibDir())) {
+ Class.forName(className, true, temporaryUdfClassLoader)
+ .getDeclaredConstructor()
+ .newInstance();
+ }
+ }
+
+ private void fetchExecutablesAndCheckInstantiation(String className, List<String> uris)
+ throws Exception {
+ final UDFExecutableResource resource = udfExecutableManager.request(uris);
+ try (UDFClassLoader temporaryUdfClassLoader = new UDFClassLoader(resource.getResourceDir())) {
+ Class.forName(className, true, temporaryUdfClassLoader)
+ .getDeclaredConstructor()
+ .newInstance();
+ } finally {
+ udfExecutableManager.removeFromTemporaryLibRoot(resource);
+ }
+ }
+
+ public synchronized TSStatus createFunction(CreateFunctionReq req) {
+ final String functionName = req.getFunctionName();
+ final String className = req.getClassName();
+ final List<String> uris = req.getUris();
+
+ try {
+ udfRegistrationService.register(functionName, className, uris, udfExecutableManager, true);
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } catch (Exception e) {
+ final String errorMessage =
+ String.format(
+ "Failed to register UDF %s(class name: %s, uris: %s), because of exception: %s",
+ functionName, className, uris, e);
+ LOGGER.warn(errorMessage, e);
+ return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage(errorMessage);
+ }
+ }
+
+ @Override
+ public synchronized boolean processTakeSnapshot(File snapshotDir) throws IOException {
+ return udfExecutableManager.processTakeSnapshot(snapshotDir)
+ && udfRegistrationService.processTakeSnapshot(snapshotDir);
+ }
+
+ @Override
+ public synchronized void processLoadSnapshot(File snapshotDir) throws IOException {
+ udfExecutableManager.processLoadSnapshot(snapshotDir);
+ udfRegistrationService.processLoadSnapshot(snapshotDir);
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
index dfad8c10c4..1a008ffbcf 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigRequestExecutor.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionReq;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupReq;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodeReq;
import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionReq;
+import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionReq;
import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionReq;
import org.apache.iotdb.confignode.consensus.request.write.DeleteProcedureReq;
@@ -54,6 +55,7 @@ import org.apache.iotdb.confignode.persistence.ClusterSchemaInfo;
import org.apache.iotdb.confignode.persistence.NodeInfo;
import org.apache.iotdb.confignode.persistence.PartitionInfo;
import org.apache.iotdb.confignode.persistence.ProcedureInfo;
+import org.apache.iotdb.confignode.persistence.UDFInfo;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -83,17 +85,21 @@ public class ConfigRequestExecutor {
private final ProcedureInfo procedureInfo;
+ private final UDFInfo udfInfo;
+
public ConfigRequestExecutor(
NodeInfo nodeInfo,
ClusterSchemaInfo clusterSchemaInfo,
PartitionInfo partitionInfo,
AuthorInfo authorInfo,
- ProcedureInfo procedureInfo) {
+ ProcedureInfo procedureInfo,
+ UDFInfo udfInfo) {
this.nodeInfo = nodeInfo;
this.clusterSchemaInfo = clusterSchemaInfo;
this.partitionInfo = partitionInfo;
this.authorInfo = authorInfo;
this.procedureInfo = procedureInfo;
+ this.udfInfo = udfInfo;
}
public DataSet executorQueryPlan(ConfigRequest req)
@@ -181,6 +187,8 @@ public class ConfigRequestExecutor {
return authorInfo.authorNonQuery((AuthorReq) req);
case ApplyConfigNode:
return nodeInfo.updateConfigNodeList((ApplyConfigNodeReq) req);
+ case CreateFunction:
+ return udfInfo.createFunction((CreateFunctionReq) req);
default:
throw new UnknownPhysicalPlanTypeException(req.getType());
}
@@ -290,6 +298,7 @@ public class ConfigRequestExecutor {
allAttributes.add(clusterSchemaInfo);
allAttributes.add(partitionInfo);
allAttributes.add(nodeInfo);
+ allAttributes.add(udfInfo);
return allAttributes;
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index 8a431f2522..c4641bc4d9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -21,7 +21,12 @@ package org.apache.iotdb.confignode.service;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.JMXService;
import org.apache.iotdb.commons.service.RegisterManager;
+import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
+import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
+import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
+import org.apache.iotdb.confignode.conf.ConfigNodeConf;
import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCService;
import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCServiceProcessor;
@@ -75,11 +80,24 @@ public class ConfigNode implements ConfigNodeMBean {
registerManager.register(new JMXService());
JMXService.registerMBean(this, mbeanName);
+ registerUdfServices();
+
configNodeRPCService.initSyncedServiceImpl(configNodeRPCServiceProcessor);
registerManager.register(configNodeRPCService);
LOGGER.info("Init rpc server success");
}
+ private void registerUdfServices() throws StartupException {
+ final ConfigNodeConf configNodeConf = ConfigNodeDescriptor.getInstance().getConf();
+ registerManager.register(
+ UDFExecutableManager.setupAndGetInstance(
+ configNodeConf.getTemporaryLibDir(), configNodeConf.getUdfLibDir()));
+ registerManager.register(
+ UDFClassLoaderManager.setupAndGetInstance(configNodeConf.getUdfLibDir()));
+ registerManager.register(
+ UDFRegistrationService.setupAndGetInstance(configNodeConf.getSystemUdfDir()));
+ }
+
public void active() {
try {
setUp();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 6cc00fb56f..cbb0e54424 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -57,6 +57,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
@@ -388,6 +389,11 @@ public class ConfigNodeRPCServiceProcessor implements ConfigIService.Iface {
return status;
}
+ @Override
+ public TSStatus createFunction(TCreateFunctionReq req) {
+ return configManager.createFunction(req.getUdfName(), req.getClassName(), req.getUris());
+ }
+
public void handleClientExit() {}
// TODO: Interfaces for data operations
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index 7a481933c1..10e335f910 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -175,6 +175,7 @@ public class IoTDBConstant {
public static final String TRACING_LOG = "tracing.txt";
public static final String EXT_FOLDER_NAME = "ext";
public static final String UDF_FOLDER_NAME = "udf";
+ public static final String TMP_FOLDER_NAME = "temporary";
public static final String TRIGGER_FOLDER_NAME = "trigger";
public static final String MQTT_FOLDER_NAME = "mqtt";
public static final String WAL_FOLDER_NAME = "wal";
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index 1e913818cc..02ca71b26c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -44,6 +44,7 @@ public enum ServiceType {
TVLIST_ALLOCATOR_SERVICE("TVList Allocator", ""),
UDF_CLASSLOADER_MANAGER_SERVICE("UDF Classloader Manager Service", ""),
UDF_REGISTRATION_SERVICE("UDF Registration Service", ""),
+ UDF_EXECUTABLE_MANAGER_SERVICE("UDF Executable Manager Service", ""),
TEMPORARY_QUERY_DATA_FILE_SERVICE("Temporary Query Data File Service", ""),
TRIGGER_REGISTRATION_SERVICE("Trigger Registration Service", ""),
CACHE_HIT_RATIO_DISPLAY_SERVICE(
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/SnapshotUtils.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/SnapshotUtils.java
new file mode 100644
index 0000000000..9c30095196
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/SnapshotUtils.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.service;
+
+import org.apache.iotdb.commons.file.SystemFileFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+public class SnapshotUtils {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(UDFExecutableManager.class);
+
+ public static boolean takeSnapshotForDir(String source, String snapshotDestination)
+ throws IOException {
+ final SystemFileFactory systemFileFactory = SystemFileFactory.INSTANCE;
+ final File sourceFile = systemFileFactory.getFile(source);
+ final File destinationFile = systemFileFactory.getFile(snapshotDestination);
+ final File temporaryFile =
+ systemFileFactory.getFile(destinationFile.getAbsolutePath() + "-" + UUID.randomUUID());
+
+ if (!sourceFile.exists()) {
+ return true;
+ }
+
+ FileUtils.deleteQuietly(temporaryFile);
+ FileUtils.forceMkdir(temporaryFile);
+
+ try {
+ FileUtils.copyDirectory(sourceFile, temporaryFile);
+ FileUtils.deleteQuietly(destinationFile);
+ return temporaryFile.renameTo(destinationFile);
+ } finally {
+ FileUtils.deleteQuietly(temporaryFile);
+ }
+ }
+
+ public static void loadSnapshotForDir(String snapshotSource, String destination)
+ throws IOException {
+ final SystemFileFactory systemFileFactory = SystemFileFactory.INSTANCE;
+ final File sourceFile = systemFileFactory.getFile(snapshotSource);
+ final File destinationFile = systemFileFactory.getFile(destination);
+ final File temporaryFile =
+ systemFileFactory.getFile(destinationFile.getAbsolutePath() + "-" + UUID.randomUUID());
+
+ if (!sourceFile.exists()) {
+ return;
+ }
+
+ try {
+ if (destinationFile.exists()) {
+ FileUtils.deleteQuietly(temporaryFile);
+ FileUtils.moveDirectory(destinationFile, temporaryFile);
+ }
+
+ FileUtils.forceMkdir(destinationFile);
+
+ try {
+ FileUtils.copyDirectory(sourceFile, destinationFile);
+ } catch (Exception e) {
+ LOGGER.error("Failed to load udf snapshot and rollback.");
+ FileUtils.deleteQuietly(destinationFile);
+
+ if (temporaryFile.exists()) {
+ FileUtils.moveDirectory(temporaryFile, destinationFile);
+ }
+ }
+ } finally {
+ FileUtils.deleteQuietly(temporaryFile);
+ }
+ }
+
+ private SnapshotUtils() {}
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java
index 02fb489021..8d92f4ccff 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java
@@ -82,6 +82,10 @@ public class UDFClassLoaderManager implements IService {
return activeClassLoader;
}
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+ // IService
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+
@Override
public void start() throws StartupException {
try {
@@ -110,6 +114,10 @@ public class UDFClassLoaderManager implements IService {
return ServiceType.UDF_CLASSLOADER_MANAGER_SERVICE;
}
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+ // singleton instance holder
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+
private static UDFClassLoaderManager INSTANCE = null;
public static synchronized UDFClassLoaderManager setupAndGetInstance(String libRoot) {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
new file mode 100644
index 0000000000..f7f18d7171
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.service;
+
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.commons.service.IService;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+
+import org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class UDFExecutableManager implements IService, SnapshotProcessor {
+
+ private final String temporaryLibRoot;
+ private final String udfLibRoot;
+
+ private final AtomicLong requestCounter;
+
+ private UDFExecutableManager(String temporaryLibRoot, String udfLibRoot) {
+ this.temporaryLibRoot = temporaryLibRoot;
+ this.udfLibRoot = udfLibRoot;
+
+ requestCounter = new AtomicLong(0);
+ }
+
+ public UDFExecutableResource request(List<String> uris) throws URISyntaxException, IOException {
+ final long requestId = generateNextRequestId();
+ downloadExecutables(uris, requestId);
+ return new UDFExecutableResource(requestId, getDirStringByRequestId(requestId));
+ }
+
+ public void moveToExtLibDir(UDFExecutableResource resource, String functionName)
+ throws IOException {
+ FileUtils.moveDirectory(
+ getDirByRequestId(resource.getRequestId()), getDirByFunctionName(functionName));
+ }
+
+ public void removeFromTemporaryLibRoot(UDFExecutableResource resource) {
+ removeFromTemporaryLibRoot(resource.getRequestId());
+ }
+
+ public void removeFromExtLibDir(String functionName) {
+ FileUtils.deleteQuietly(getDirByFunctionName(functionName));
+ }
+
+ private synchronized long generateNextRequestId() throws IOException {
+ long requestId = requestCounter.getAndIncrement();
+ while (FileUtils.isDirectory(getDirByRequestId(requestId))) {
+ requestId = requestCounter.getAndIncrement();
+ }
+ FileUtils.forceMkdir(getDirByRequestId(requestId));
+ return requestId;
+ }
+
+ private void downloadExecutables(List<String> uris, long requestId)
+ throws IOException, URISyntaxException {
+ // TODO: para download
+ try {
+ for (String uriString : uris) {
+ final URL url = new URI(uriString).toURL();
+ final String fileName = uriString.substring(uriString.lastIndexOf("/") + 1);
+ final String destination =
+ temporaryLibRoot + File.separator + requestId + File.separator + fileName;
+ FileUtils.copyURLToFile(url, FSFactoryProducer.getFSFactory().getFile(destination));
+ }
+ } catch (Exception e) {
+ removeFromTemporaryLibRoot(requestId);
+ throw e;
+ }
+ }
+
+ private void removeFromTemporaryLibRoot(long requestId) {
+ FileUtils.deleteQuietly(getDirByRequestId(requestId));
+ }
+
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+ // dir string and dir file generation
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+
+ public File getDirByRequestId(long requestId) {
+ return FSFactoryProducer.getFSFactory().getFile(getDirStringByRequestId(requestId));
+ }
+
+ public String getDirStringByRequestId(long requestId) {
+ return temporaryLibRoot + File.separator + requestId + File.separator;
+ }
+
+ public File getDirByFunctionName(String functionName) {
+ return FSFactoryProducer.getFSFactory().getFile(getDirStringByFunctionName(functionName));
+ }
+
+ public String getDirStringByFunctionName(String functionName) {
+ return udfLibRoot + File.separator + functionName + File.separator;
+ }
+
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+ // IService
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public void start() throws StartupException {
+ try {
+ makeDirIfNecessary(temporaryLibRoot);
+ makeDirIfNecessary(udfLibRoot);
+ } catch (Exception e) {
+ throw new StartupException(e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ // nothing to do
+ }
+
+ @Override
+ public ServiceType getID() {
+ return ServiceType.UDF_EXECUTABLE_MANAGER_SERVICE;
+ }
+
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+ // singleton instance holder
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+
+ private static UDFExecutableManager INSTANCE = null;
+
+ public static synchronized UDFExecutableManager setupAndGetInstance(
+ String temporaryLibRoot, String udfLibRoot) {
+ if (INSTANCE == null) {
+ INSTANCE = new UDFExecutableManager(temporaryLibRoot, udfLibRoot);
+ }
+ return INSTANCE;
+ }
+
+ private static void makeDirIfNecessary(String dir) throws IOException {
+ File file = SystemFileFactory.INSTANCE.getFile(dir);
+ if (file.exists() && file.isDirectory()) {
+ return;
+ }
+ FileUtils.forceMkdir(file);
+ }
+
+ public static UDFExecutableManager getInstance() {
+ return INSTANCE;
+ }
+
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+ // SnapshotProcessor
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public boolean processTakeSnapshot(File snapshotDir) throws IOException {
+ return SnapshotUtils.takeSnapshotForDir(
+ temporaryLibRoot,
+ snapshotDir.getAbsolutePath() + File.separator + "ext" + File.separator + "temporary")
+ && SnapshotUtils.takeSnapshotForDir(
+ udfLibRoot,
+ snapshotDir.getAbsolutePath() + File.separator + "ext" + File.separator + "udf");
+ }
+
+ @Override
+ public void processLoadSnapshot(File snapshotDir) throws IOException {
+ SnapshotUtils.loadSnapshotForDir(
+ snapshotDir.getAbsolutePath() + File.separator + "ext" + File.separator + "temporary",
+ temporaryLibRoot);
+ SnapshotUtils.loadSnapshotForDir(
+ snapshotDir.getAbsolutePath() + File.separator + "ext" + File.separator + "udf",
+ udfLibRoot);
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableResource.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableResource.java
new file mode 100644
index 0000000000..ec0c375997
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableResource.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.service;
+
+public class UDFExecutableResource {
+
+ private final long requestId;
+ private final String resourceDir;
+
+ public UDFExecutableResource(long requestId, String resourceDir) {
+ this.requestId = requestId;
+ this.resourceDir = resourceDir;
+ }
+
+ public long getRequestId() {
+ return requestId;
+ }
+
+ public String getResourceDir() {
+ return resourceDir;
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFLogWriter.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFLogWriter.java
index 51068cf99b..54e8bde4a0 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFLogWriter.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFLogWriter.java
@@ -25,14 +25,18 @@ import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.util.List;
public class UDFLogWriter {
- public static final Byte REGISTER_TYPE = 0;
+ public static final Byte REGISTER_WITHOUT_URIS_TYPE = 0;
public static final Byte DEREGISTER_TYPE = 1;
+ public static final Byte REGISTER_WITH_URIS_TYPE = 2;
- private static final String REGISTER_TYPE_STRING = REGISTER_TYPE.toString();
+ private static final String REGISTER_WITHOUT_URIS_TYPE_STRING =
+ REGISTER_WITHOUT_URIS_TYPE.toString();
private static final String DEREGISTER_TYPE_STRING = DEREGISTER_TYPE.toString();
+ private static final String REGISTER_WITH_URIS_TYPE_STRING = REGISTER_WITH_URIS_TYPE.toString();
private final File logFile;
private final BufferedWriter writer;
@@ -53,8 +57,17 @@ public class UDFLogWriter {
}
}
- public void register(String functionName, String className) throws IOException {
- writer.write(String.format("%s,%s,%s", REGISTER_TYPE_STRING, functionName, className));
+ public void register(String functionName, String className, List<String> uris)
+ throws IOException {
+ if (uris.isEmpty()) {
+ writer.write(
+ String.format("%s,%s,%s", REGISTER_WITHOUT_URIS_TYPE_STRING, functionName, className));
+ } else {
+ writer.write(
+ String.format(
+ "%s,%s,%s,%s",
+ REGISTER_WITH_URIS_TYPE_STRING, functionName, className, String.join(",", uris)));
+ }
writeLineAndFlush();
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationInformation.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationInformation.java
index a4997ead0b..feb799f2db 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationInformation.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationInformation.java
@@ -22,19 +22,26 @@ package org.apache.iotdb.commons.udf.service;
import org.apache.iotdb.commons.udf.api.UDTF;
import java.lang.reflect.InvocationTargetException;
+import java.util.List;
public class UDFRegistrationInformation {
private final String functionName;
private final String className;
+ private final List<String> uris;
private final boolean isBuiltin;
private Class<?> functionClass;
public UDFRegistrationInformation(
- String functionName, String className, boolean isBuiltin, Class<?> functionClass) {
+ String functionName,
+ String className,
+ List<String> uris,
+ boolean isBuiltin,
+ Class<?> functionClass) {
this.functionName = functionName;
this.className = className;
+ this.uris = uris;
this.isBuiltin = isBuiltin;
this.functionClass = functionClass;
}
@@ -47,6 +54,10 @@ public class UDFRegistrationInformation {
return className;
}
+ public List<String> getUris() {
+ return uris;
+ }
+
public boolean isBuiltin() {
return isBuiltin;
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
index 3f9be546d1..46a37eb356 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFRegistrationService.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.commons.udf.api.UDF;
import org.apache.iotdb.commons.udf.api.exception.UDFRegistrationException;
import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction;
@@ -38,14 +39,17 @@ import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-public class UDFRegistrationService implements IService {
+public class UDFRegistrationService implements IService, SnapshotProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(UDFRegistrationService.class);
@@ -54,7 +58,7 @@ public class UDFRegistrationService implements IService {
private final String temporaryLogFileName;
private final ReentrantLock registrationLock;
- private final ConcurrentHashMap<String, UDFRegistrationInformation> registrationInformation;
+ private ConcurrentHashMap<String, UDFRegistrationInformation> registrationInformation;
private final ReentrantReadWriteLock logWriterLock;
private UDFLogWriter logWriter;
@@ -77,13 +81,36 @@ public class UDFRegistrationService implements IService {
registrationLock.unlock();
}
+ /** invoked by config leader for validation before registration */
+ public void validate(String functionName, String className) {
+ functionName = functionName.toUpperCase();
+ validateFunctionName(functionName, className);
+ checkIfRegistered(functionName, className);
+ }
+
public void register(String functionName, String className, boolean writeToTemporaryLogFile)
throws UDFRegistrationException {
functionName = functionName.toUpperCase();
validateFunctionName(functionName, className);
checkIfRegistered(functionName, className);
- doRegister(functionName, className);
- tryAppendRegistrationLog(functionName, className, writeToTemporaryLogFile);
+ doRegister(functionName, className, Collections.emptyList());
+ tryAppendRegistrationLog(
+ functionName, className, Collections.emptyList(), writeToTemporaryLogFile);
+ }
+
+ public void register(
+ String functionName,
+ String className,
+ List<String> uris,
+ UDFExecutableManager udfExecutableManager,
+ boolean writeToTemporaryLogFile)
+ throws UDFRegistrationException {
+ functionName = functionName.toUpperCase();
+ validateFunctionName(functionName, className);
+ checkIfRegistered(functionName, className);
+ downloadExecutableResources(functionName, className, uris, udfExecutableManager);
+ doRegister(functionName, className, uris);
+ tryAppendRegistrationLog(functionName, className, uris, writeToTemporaryLogFile);
}
private static void validateFunctionName(String functionName, String className)
@@ -132,7 +159,38 @@ public class UDFRegistrationService implements IService {
throw new UDFRegistrationException(errorMessage);
}
- private void doRegister(String functionName, String className) throws UDFRegistrationException {
+ private void downloadExecutableResources(
+ String functionName,
+ String className,
+ List<String> uris,
+ UDFExecutableManager udfExecutableManager)
+ throws UDFRegistrationException {
+ if (uris.isEmpty()) {
+ return;
+ }
+
+ try {
+ final UDFExecutableResource resource = udfExecutableManager.request(uris);
+ try {
+ udfExecutableManager.removeFromExtLibDir(functionName);
+ udfExecutableManager.moveToExtLibDir(resource, functionName);
+ } catch (Exception innerException) {
+ udfExecutableManager.removeFromExtLibDir(functionName);
+ udfExecutableManager.removeFromTemporaryLibRoot(resource);
+ throw innerException;
+ }
+ } catch (Exception outerException) {
+ String errorMessage =
+ String.format(
+ "Failed to register UDF %s(%s) because failed to fetch UDF executables(%s)",
+ functionName, className, uris);
+ LOGGER.warn(errorMessage);
+ throw new UDFRegistrationException(errorMessage, outerException);
+ }
+ }
+
+ private void doRegister(String functionName, String className, List<String> uris)
+ throws UDFRegistrationException {
acquireRegistrationLock();
try {
UDFClassLoader currentActiveClassLoader =
@@ -143,7 +201,7 @@ public class UDFRegistrationService implements IService {
functionClass.getDeclaredConstructor().newInstance();
registrationInformation.put(
functionName,
- new UDFRegistrationInformation(functionName, className, false, functionClass));
+ new UDFRegistrationInformation(functionName, className, uris, false, functionClass));
} catch (IOException
| InstantiationException
| InvocationTargetException
@@ -162,14 +220,14 @@ public class UDFRegistrationService implements IService {
}
private void tryAppendRegistrationLog(
- String functionName, String className, boolean writeToTemporaryLogFile)
+ String functionName, String className, List<String> uris, boolean writeToTemporaryLogFile)
throws UDFRegistrationException {
if (!writeToTemporaryLogFile) {
return;
}
try {
- appendRegistrationLog(functionName, className);
+ appendRegistrationLog(functionName, className, uris);
} catch (IOException e) {
registrationInformation.remove(functionName);
String errorMessage =
@@ -218,10 +276,11 @@ public class UDFRegistrationService implements IService {
}
}
- private void appendRegistrationLog(String functionName, String className) throws IOException {
+ private void appendRegistrationLog(String functionName, String className, List<String> uris)
+ throws IOException {
logWriterLock.writeLock().lock();
try {
- logWriter.register(functionName, className);
+ logWriter.register(functionName, className, uris);
} finally {
logWriterLock.writeLock().unlock();
}
@@ -275,15 +334,20 @@ public class UDFRegistrationService implements IService {
@Override
public void start() throws StartupException {
try {
- registerBuiltinTimeSeriesGeneratingFunctions();
- makeDirIfNecessary();
- doRecovery();
- logWriter = new UDFLogWriter(logFileName);
+ recovery();
} catch (Exception e) {
throw new StartupException(e);
}
}
+ private void recovery() throws Exception {
+ registrationInformation = new ConcurrentHashMap<>();
+ registerBuiltinTimeSeriesGeneratingFunctions();
+ makeDirIfNecessary();
+ doRecovery();
+ logWriter = new UDFLogWriter(logFileName);
+ }
+
private void registerBuiltinTimeSeriesGeneratingFunctions() {
for (BuiltinTimeSeriesGeneratingFunction builtinTimeSeriesGeneratingFunction :
BuiltinTimeSeriesGeneratingFunction.values()) {
@@ -293,6 +357,7 @@ public class UDFRegistrationService implements IService {
new UDFRegistrationInformation(
functionName,
builtinTimeSeriesGeneratingFunction.getClassName(),
+ Collections.emptyList(),
true,
builtinTimeSeriesGeneratingFunction.getFunctionClass()));
}
@@ -331,10 +396,13 @@ public class UDFRegistrationService implements IService {
while ((line = reader.readLine()) != null) {
String[] data = line.split(",");
byte type = Byte.parseByte(data[0]);
- if (type == UDFLogWriter.REGISTER_TYPE) {
+ if (type == UDFLogWriter.REGISTER_WITHOUT_URIS_TYPE
+ || type == UDFLogWriter.REGISTER_WITH_URIS_TYPE) {
recoveredUDFs.put(data[1], data[2]);
} else if (type == UDFLogWriter.DEREGISTER_TYPE) {
recoveredUDFs.remove(data[1]);
+ } else {
+ throw new UnsupportedEncodingException();
}
}
}
@@ -370,7 +438,8 @@ public class UDFRegistrationService implements IService {
if (information.isBuiltin()) {
continue;
}
- temporaryLogFile.register(information.getFunctionName(), information.getClassName());
+ temporaryLogFile.register(
+ information.getFunctionName(), information.getClassName(), information.getUris());
}
temporaryLogFile.close();
}
@@ -391,7 +460,9 @@ public class UDFRegistrationService implements IService {
Class<?> functionClass = Class.forName(className, true, classLoader);
functionName = functionName.toUpperCase();
registrationInformation.put(
- functionName, new UDFRegistrationInformation(functionName, className, true, functionClass));
+ functionName,
+ new UDFRegistrationInformation(
+ functionName, className, Collections.emptyList(), true, functionClass));
}
@TestOnly
@@ -416,4 +487,26 @@ public class UDFRegistrationService implements IService {
public static UDFRegistrationService getInstance() {
return INSTANCE;
}
+
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+ // SnapshotProcessor
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public boolean processTakeSnapshot(File snapshotDir) throws IOException {
+ return SnapshotUtils.takeSnapshotForDir(
+ ulogFileDir, snapshotDir.getAbsolutePath() + File.separator + "udf");
+ }
+
+ @Override
+ public void processLoadSnapshot(File snapshotDir) throws IOException {
+ SnapshotUtils.loadSnapshotForDir(
+ snapshotDir.getAbsolutePath() + File.separator + "udf", ulogFileDir);
+
+ try {
+ recovery();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index ebd6034542..2e0ce66ead 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
@@ -588,6 +589,22 @@ public class ConfigNodeClient implements ConfigIService.Iface, SyncThriftClient,
throw new TException(MSG_RECONNECTION_FAIL);
}
+ @Override
+ public TSStatus createFunction(TCreateFunctionReq req) throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TSStatus status = client.createFunction(req);
+ if (!updateConfigNodeLeader(status)) {
+ return status;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
public static class Factory extends BaseClientFactory<PartitionRegionId, ConfigNodeClient> {
public Factory(
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5b03bbfaa0..adf1e404a9 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -252,6 +252,10 @@ public class IoTDBConfig {
private String udfDir =
IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.UDF_FOLDER_NAME;
+ /** External temporary lib directory for storing downloaded JAR files */
+ private String temporaryLibDir =
+ IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.TMP_FOLDER_NAME;
+
/** External lib directory for trigger, stores user-uploaded JAR files */
private String triggerDir =
IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.TRIGGER_FOLDER_NAME;
@@ -997,6 +1001,7 @@ public class IoTDBConfig {
indexRootFolder = addHomeDir(indexRootFolder);
extDir = addHomeDir(extDir);
udfDir = addHomeDir(udfDir);
+ temporaryLibDir = addHomeDir(temporaryLibDir);
triggerDir = addHomeDir(triggerDir);
mqttDir = addHomeDir(mqttDir);
for (int i = 0; i < walDirs.length; i++) {
@@ -1191,6 +1196,10 @@ public class IoTDBConfig {
return udfDir;
}
+ public String getTemporaryLibDir() {
+ return temporaryLibDir;
+ }
+
public void setUdfDir(String udfDir) {
this.udfDir = udfDir;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
index 671edfb68b..ebd9327e71 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateFunctionStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetTTLStatement;
@@ -51,6 +52,7 @@ public class ConfigTaskVisitor
return new SetStorageGroupTask(statement);
}
+ @Override
public IConfigTask visitDeleteStorageGroup(
DeleteStorageGroupStatement statement, TaskContext context) {
return new DeleteStorageGroupTask(statement);
@@ -88,5 +90,11 @@ public class ConfigTaskVisitor
return new AuthorizerConfigTask(statement);
}
+ @Override
+ public IConfigTask visitCreateFunction(
+ CreateFunctionStatement createFunctionStatement, TaskContext context) {
+ return new CreateFunctionTask(createFunctionStatement);
+ }
+
public static class TaskContext {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/CreateFunctionTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/CreateFunctionTask.java
new file mode 100644
index 0000000000..56b95836bf
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/CreateFunctionTask.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.config;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.consensus.PartitionRegionId;
+import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
+import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
+import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeInfo;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateFunctionStatement;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class CreateFunctionTask implements IConfigTask {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CreateFunctionTask.class);
+ private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+
+ private final String udfName;
+ private final String className;
+ private final List<String> uris;
+
+ public CreateFunctionTask(CreateFunctionStatement createFunctionStatement) {
+ udfName = createFunctionStatement.getUdfName();
+ className = createFunctionStatement.getClassName();
+ uris =
+ createFunctionStatement.getUris().stream().map(URI::toString).collect(Collectors.toList());
+ }
+
+ @Override
+ public ListenableFuture<ConfigTaskResult> execute(
+ IClientManager<PartitionRegionId, ConfigNodeClient> clientManager)
+ throws InterruptedException {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ if (CONFIG.isClusterMode()) {
+ executeCluster(clientManager, future);
+ } else {
+ executeStandalone(future);
+ }
+ return future;
+ }
+
+ private void executeCluster(
+ IClientManager<PartitionRegionId, ConfigNodeClient> clientManager,
+ SettableFuture<ConfigTaskResult> future) {
+ try (ConfigNodeClient client = clientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+ final TSStatus executionStatus =
+ client.createFunction(new TCreateFunctionReq(udfName, className, uris));
+
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
+ LOGGER.error(
+ "[{}] Failed to create function {}({}) in config node, URI: {}.",
+ executionStatus,
+ udfName,
+ className,
+ uris);
+ future.setException(new StatementExecutionException(executionStatus));
+ } else {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
+ } catch (TException | IOException e) {
+ LOGGER.error("Failed to connect to config node.");
+ future.setException(e);
+ }
+ }
+
+ private void executeStandalone(SettableFuture<ConfigTaskResult> future) {
+ try {
+ UDFRegistrationService.getInstance()
+ .register(udfName, className, uris, UDFExecutableManager.getInstance(), true);
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ } catch (Exception e) {
+ final String message =
+ String.format(
+ "Failed to create function %s(%s), URI: %s, because %s.",
+ udfName, className, uris, e.getMessage());
+ LOGGER.error(message, e);
+ future.setException(
+ new StatementExecutionException(
+ new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage(message)));
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 8426e8bbfe..f01b7b1a61 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -78,6 +78,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CountNodesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateFunctionStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
@@ -99,7 +100,9 @@ import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.CountDevicesContext;
import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.CountNodesContext;
import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.CountStorageGroupContext;
import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.CountTimeseriesContext;
+import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.CreateFunctionContext;
import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.ExpressionContext;
+import org.apache.iotdb.db.qp.sql.IoTDBSqlParser.UriContext;
import org.apache.iotdb.db.qp.sql.IoTDBSqlParserBaseVisitor;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -112,6 +115,8 @@ import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
@@ -595,6 +600,30 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
return new CountStorageGroupStatement(path);
}
+ // Create Function
+ @Override
+ public Statement visitCreateFunction(CreateFunctionContext ctx) {
+ return new CreateFunctionStatement(
+ parseIdentifier(ctx.udfName.getText()),
+ parseStringLiteral(ctx.className.getText()),
+ parseUris(ctx.uri()));
+ }
+
+ private List<URI> parseUris(List<UriContext> uriContexts) {
+ List<URI> uris = new ArrayList<>();
+ if (uriContexts != null) {
+ for (UriContext uriContext : uriContexts) {
+ final String uriString = uriContext.getText();
+ try {
+ uris.add(new URI(parseStringLiteral(uriString)));
+ } catch (URISyntaxException e) {
+ throw new SemanticException(String.format("'%s' is not a legal URI.", uriString));
+ }
+ }
+ }
+ return uris;
+ }
+
// Show Child Paths =====================================================================
@Override
public Statement visitShowChildPaths(IoTDBSqlParser.ShowChildPathsContext ctx) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index bdbd99f766..650d15d160 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CountLevelTimeSeriesState
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateFunctionStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesByDeviceStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
@@ -126,6 +127,11 @@ public abstract class StatementVisitor<R, C> {
return visitStatement(showTTLStatement, context);
}
+ // UDF
+ public R visitCreateFunction(CreateFunctionStatement createFunctionStatement, C context) {
+ return visitStatement(createFunctionStatement, context);
+ }
+
/** Data Manipulation Language (DML) */
// Select Statement
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateFunctionStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateFunctionStatement.java
new file mode 100644
index 0000000000..990c9b7fb1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateFunctionStatement.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.metadata;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.constant.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+
+public class CreateFunctionStatement extends Statement implements IConfigStatement {
+
+ private final String udfName;
+ private final String className;
+ private final List<URI> uris;
+
+ public CreateFunctionStatement(String udfName, String className, List<URI> uris) {
+ super();
+ statementType = StatementType.CREATE_FUNCTION;
+ this.udfName = udfName;
+ this.className = className;
+ this.uris = uris;
+ }
+
+ public String getUdfName() {
+ return udfName;
+ }
+
+ public String getClassName() {
+ return className;
+ }
+
+ public List<URI> getUris() {
+ return uris;
+ }
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitCreateFunction(this, context);
+ }
+
+ @Override
+ public QueryType getQueryType() {
+ return QueryType.WRITE;
+ }
+
+ @Override
+ public List<? extends PartialPath> getPaths() {
+ return Collections.emptyList();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 05ad28efcf..1fd483360d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.commons.service.JMXService;
import org.apache.iotdb.commons.service.RegisterManager;
import org.apache.iotdb.commons.service.StartupChecks;
import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
+import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
@@ -277,16 +278,8 @@ public class DataNode implements DataNodeMBean {
.getConfig()
.setRpcImplClassName(DataNodeTSIServiceImpl.class.getName());
- registerManager.register(TemporaryQueryDataFileService.getInstance());
- registerManager.register(
- UDFClassLoaderManager.setupAndGetInstance(
- IoTDBDescriptor.getInstance().getConfig().getUdfDir()));
- registerManager.register(
- UDFRegistrationService.setupAndGetInstance(
- IoTDBDescriptor.getInstance().getConfig().getSystemDir()
- + File.separator
- + "udf"
- + File.separator));
+ registerUdfServices();
+
registerManager.register(ReceiverService.getInstance());
registerManager.register(MetricsService.getInstance());
@@ -328,6 +321,23 @@ public class DataNode implements DataNodeMBean {
logger.info("Congratulation, IoTDB DataNode is set up successfully. Now, enjoy yourself!");
}
+ private void registerUdfServices() throws StartupException {
+ registerManager.register(TemporaryQueryDataFileService.getInstance());
+ registerManager.register(
+ UDFExecutableManager.setupAndGetInstance(
+ IoTDBDescriptor.getInstance().getConfig().getTemporaryLibDir(),
+ IoTDBDescriptor.getInstance().getConfig().getUdfDir()));
+ registerManager.register(
+ UDFClassLoaderManager.setupAndGetInstance(
+ IoTDBDescriptor.getInstance().getConfig().getUdfDir()));
+ registerManager.register(
+ UDFRegistrationService.setupAndGetInstance(
+ IoTDBDescriptor.getInstance().getConfig().getSystemDir()
+ + File.separator
+ + "udf"
+ + File.separator));
+ }
+
private void initConfigManager() {
long time = System.currentTimeMillis();
IoTDB.configManager.init();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index 4c7f08edf0..fb6db492dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -32,6 +32,8 @@ import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
+import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
@@ -72,6 +74,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TCancelPlanFragmentReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelResp;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
@@ -354,5 +357,22 @@ public class InternalServiceImpl implements InternalService.Iface {
return consensusImpl.write(consensusGroupId, fragmentInstance).getStatus();
}
+ @Override
+ public TSStatus createFunction(TCreateFunctionRequest request) {
+ try {
+ UDFRegistrationService.getInstance()
+ .register(
+ request.getUdfName(),
+ request.getClassName(),
+ request.getUris(),
+ UDFExecutableManager.getInstance(),
+ true);
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } catch (Exception e) {
+ return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage(e.getMessage());
+ }
+ }
+
public void handleClientExit() {}
}
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index f66e743b96..ab90f5bb33 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -211,6 +211,13 @@ struct TConfigNodeRegisterResp {
3: optional list<common.TConfigNodeLocation> configNodeList
}
+// UDF
+struct TCreateFunctionReq {
+ 1: required string udfName
+ 2: required string className
+ 3: required list<string> uris
+}
+
service ConfigIService {
/* DataNode */
@@ -270,4 +277,8 @@ service ConfigIService {
TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req)
common.TSStatus applyConfigNode(common.TConfigNodeLocation configNodeLocation)
+
+ /* UDF */
+
+ common.TSStatus createFunction(TCreateFunctionReq req)
}
\ No newline at end of file
diff --git a/thrift/src/main/thrift/mpp.thrift b/thrift/src/main/thrift/mpp.thrift
index d55520a9f4..eaecf45dc7 100644
--- a/thrift/src/main/thrift/mpp.thrift
+++ b/thrift/src/main/thrift/mpp.thrift
@@ -135,7 +135,13 @@ struct TSchemaFetchResponse {
1: required binary serializedSchemaTree
}
-struct TInvalidatePermissionCacheReq{
+struct TCreateFunctionRequest {
+ 1: required string udfName
+ 2: required string className
+ 3: required list<string> uris
+}
+
+struct TInvalidatePermissionCacheReq {
1: required string username
2: required string roleName
}
@@ -216,6 +222,13 @@ service InternalService {
**/
common.THeartbeatResp getHeartBeat(common.THeartbeatReq req)
+ /**
+ * Config node will create a function on a list of data nodes.
+ *
+ * @param function name, function class name, and executable uris
+ **/
+ common.TSStatus createFunction(TCreateFunctionRequest req)
+
/**
* Config node will invalidate permission Info cache.
*