You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/26 12:30:37 UTC
[iotdb] branch master updated: [IOTDB-4756] Register UDF when restart/register DataNode
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b268472003 [IOTDB-4756] Register UDF when restart/register DataNode
b268472003 is described below
commit b268472003a45f7916351e546f0aad4cddb41bb9
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Wed Oct 26 20:30:30 2022 +0800
[IOTDB-4756] Register UDF when restart/register DataNode
---
.../thrift/ConfigNodeRPCServiceProcessor.java | 9 ++
.../commons/udf/service/UDFManagementService.java | 6 +-
.../apache/iotdb/db/client/ConfigNodeClient.java | 19 ++++
.../java/org/apache/iotdb/db/service/DataNode.java | 117 ++++++++++++++++++---
.../db/service/ResourcesInformationHolder.java | 12 +++
.../src/main/thrift/confignode.thrift | 32 ++++--
6 files changed, 173 insertions(+), 22 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 624ed79a66..50300e641b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -107,6 +107,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetUDFJarReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetUDFJarResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
@@ -504,6 +506,12 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
return null;
}
+ @Override
+ public TGetUDFJarResp getUDFJar(TGetUDFJarReq req) {
+ // todo: implementation
+ return null;
+ }
+
@Override
public TSStatus createTrigger(TCreateTriggerReq req) {
return configManager.createTrigger(req);
@@ -529,6 +537,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
return configManager.getLocationOfStatefulTrigger(triggerName);
}
+ @Override
public TGetTriggerJarResp getTriggerJar(TGetTriggerJarReq req) {
return configManager.getTriggerJar(req);
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java
index 6b4ec14a24..47f9821a82 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java
@@ -190,7 +190,11 @@ public class UDFManagementService {
}
}
- private void doRegister(UDFInformation udfInformation) throws UDFManagementException {
+ /**
+ * Only call this method directly for registering new data node, otherwise you need to call
+ * register().
+ */
+ public void doRegister(UDFInformation udfInformation) throws UDFManagementException {
String functionName = udfInformation.getFunctionName();
String className = udfInformation.getClassName();
try {
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index fd2907734b..0e99bed42c 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -74,6 +74,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetUDFJarReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetUDFJarResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
@@ -892,6 +894,7 @@ public class ConfigNodeClient
throw new TException(MSG_RECONNECTION_FAIL);
}
+ @Override
public TGetUDFTableResp getUDFTable() throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
try {
@@ -907,6 +910,22 @@ public class ConfigNodeClient
throw new TException(MSG_RECONNECTION_FAIL);
}
+ @Override
+ public TGetUDFJarResp getUDFJar(TGetUDFJarReq req) throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TGetUDFJarResp resp = client.getUDFJar(req);
+ if (!updateConfigNodeLeader(resp.getStatus())) {
+ return resp;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
@Override
public TSStatus createTrigger(TCreateTriggerReq req) throws TException {
for (int i = 0; i < 5; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 2e4726fd06..fd6b215800 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -35,12 +35,16 @@ import org.apache.iotdb.commons.service.StartupChecks;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
+import org.apache.iotdb.commons.udf.UDFInformation;
import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
+import org.apache.iotdb.commons.udf.service.UDFManagementService;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetUDFJarReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetUDFJarResp;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.client.ConfigNodeClient;
import org.apache.iotdb.db.client.ConfigNodeInfo;
@@ -74,6 +78,7 @@ import org.apache.iotdb.db.trigger.service.TriggerManagementService;
import org.apache.iotdb.db.wal.WALManager;
import org.apache.iotdb.db.wal.utils.WALMode;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.udf.api.exception.UDFManagementException;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -201,6 +206,9 @@ public class DataNode implements DataNodeMBean {
ClusterTemplateManager.getInstance()
.updateTemplateSetInfo(dataNodeRegisterResp.getTemplateInfo());
+ // store udfInformationList
+ getUDFInformationList(dataNodeRegisterResp.getAllUDFInformation());
+
// store triggerInformationList
getTriggerInformationList(dataNodeRegisterResp.getAllTriggerInformation());
@@ -270,8 +278,8 @@ public class DataNode implements DataNodeMBean {
}
private void prepareResources() throws StartupException {
- prepareTriggerResources();
prepareUDFResources();
+ prepareTriggerResources();
}
/** register services and set up DataNode */
@@ -407,6 +415,101 @@ public class DataNode implements DataNodeMBean {
registerManager.register(UDFClassLoaderManager.setupAndGetInstance(config.getUdfDir()));
}
+ private void initUDFRelatedInstance() throws StartupException {
+ try {
+ UDFExecutableManager.setupAndGetInstance(config.getUdfTemporaryLibDir(), config.getUdfDir());
+ } catch (IOException e) {
+ throw new StartupException(e);
+ }
+ }
+
+ private void prepareUDFResources() throws StartupException {
+ initUDFRelatedInstance();
+ if (resourcesInformationHolder.getUDFInformationList() == null
+ || resourcesInformationHolder.getUDFInformationList().isEmpty()) {
+ return;
+ }
+
+ // get jars from config node
+ List<UDFInformation> udfNeedJarList = getJarListForUDF();
+ int index = 0;
+ while (index < udfNeedJarList.size()) {
+ List<UDFInformation> curList = new ArrayList<>();
+ int offset = 0;
+ while (offset < ResourcesInformationHolder.getJarNumOfOneRpc()
+ && index + offset < udfNeedJarList.size()) {
+ curList.add(udfNeedJarList.get(index + offset));
+ offset++;
+ }
+ index += (offset + 1);
+ getJarOfUDFs(curList);
+ }
+
+ // create instances of triggers and do registration
+ try {
+ for (UDFInformation udfInformation : resourcesInformationHolder.getUDFInformationList()) {
+ UDFManagementService.getInstance().doRegister(udfInformation);
+ }
+ } catch (Exception e) {
+ throw new StartupException(e);
+ }
+
+ logger.debug("successfully registered all the UDFs");
+ for (UDFInformation udfInformation :
+ UDFManagementService.getInstance().getAllUDFInformation()) {
+ logger.debug("get udf: {}", udfInformation.getFunctionName());
+ }
+ }
+
+ private void getJarOfUDFs(List<UDFInformation> udfInformationList) throws StartupException {
+ try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+ List<String> jarNameList =
+ udfInformationList.stream().map(UDFInformation::getJarName).collect(Collectors.toList());
+ TGetUDFJarResp resp = configNodeClient.getUDFJar(new TGetUDFJarReq(jarNameList));
+ if (resp.getStatus().getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
+ throw new StartupException("Failed to get UDF jar from config node.");
+ }
+ List<ByteBuffer> jarList = resp.getJarList();
+ for (int i = 0; i < udfInformationList.size(); i++) {
+ UDFExecutableManager.getInstance()
+ .writeToLibDir(jarList.get(i), udfInformationList.get(i).getJarName());
+ }
+ } catch (IOException | TException e) {
+ throw new StartupException(e);
+ }
+ }
+
+ /** Generate a list for UDFs that do not have jar on this node. */
+ private List<UDFInformation> getJarListForUDF() {
+ List<UDFInformation> res = new ArrayList<>();
+ for (UDFInformation udfInformation : resourcesInformationHolder.getUDFInformationList()) {
+ // jar does not exist, add current triggerInformation to list
+ if (!UDFExecutableManager.getInstance().hasFileUnderLibRoot(udfInformation.getJarName())) {
+ res.add(udfInformation);
+ } else {
+ try {
+ // local jar has conflicts with jar on config node, add current triggerInformation to list
+ if (UDFManagementService.getInstance().isLocalJarConflicted(udfInformation)) {
+ res.add(udfInformation);
+ }
+ } catch (UDFManagementException e) {
+ res.add(udfInformation);
+ }
+ }
+ }
+ return res;
+ }
+
+ private void getUDFInformationList(List<ByteBuffer> allUDFInformation) {
+ if (allUDFInformation != null && !allUDFInformation.isEmpty()) {
+ List<UDFInformation> list = new ArrayList<>();
+ for (ByteBuffer UDFInformationByteBuffer : allUDFInformation) {
+ list.add(UDFInformation.deserialize(UDFInformationByteBuffer));
+ }
+ resourcesInformationHolder.setUDFInformationList(list);
+ }
+ }
+
private void initTriggerRelatedInstance() throws StartupException {
try {
TriggerExecutableManager.setupAndGetInstance(
@@ -516,18 +619,6 @@ public class DataNode implements DataNodeMBean {
}
}
- private void initUDFRelatedInstance() throws StartupException {
- try {
- UDFExecutableManager.setupAndGetInstance(config.getUdfTemporaryLibDir(), config.getUdfDir());
- } catch (IOException e) {
- throw new StartupException(e);
- }
- }
-
- private void prepareUDFResources() throws StartupException {
- initUDFRelatedInstance();
- }
-
private void initSchemaEngine() {
long time = System.currentTimeMillis();
SchemaEngine.getInstance().init();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ResourcesInformationHolder.java b/server/src/main/java/org/apache/iotdb/db/service/ResourcesInformationHolder.java
index 480b78e008..f26436c1d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ResourcesInformationHolder.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ResourcesInformationHolder.java
@@ -20,12 +20,16 @@
package org.apache.iotdb.db.service;
import org.apache.iotdb.commons.trigger.TriggerInformation;
+import org.apache.iotdb.commons.udf.UDFInformation;
import java.util.List;
public class ResourcesInformationHolder {
private static final int JAR_NUM_OF_ONE_RPC = 10;
+ /** store the list when registering in config node for preparing udf related resources */
+ private List<UDFInformation> udfInformationList;
+
/** store the list when registering in config node for preparing trigger related resources */
private List<TriggerInformation> triggerInformationList;
@@ -33,6 +37,14 @@ public class ResourcesInformationHolder {
return JAR_NUM_OF_ONE_RPC;
}
+ public List<UDFInformation> getUDFInformationList() {
+ return udfInformationList;
+ }
+
+ public void setUDFInformationList(List<UDFInformation> udfInformationList) {
+ this.udfInformationList = udfInformationList;
+ }
+
public List<TriggerInformation> getTriggerInformationList() {
return triggerInformationList;
}
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 670e80ad7f..3a5be47ff4 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -37,6 +37,7 @@ struct TDataNodeRegisterResp {
5: optional binary templateInfo
6: optional TRatisConfig ratisConfig
7: optional list<binary> allTriggerInformation
+ 8: optional list<binary> allUDFInformation
}
struct TGlobalConfig {
@@ -320,6 +321,16 @@ struct TGetUDFTableResp {
2: required list<binary> allUDFInformation
}
+// Get jars of the corresponding trigger
+struct TGetUDFJarReq {
+ 1: required list<string> jarNameList
+}
+
+struct TGetUDFJarResp {
+ 1: required common.TSStatus status
+ 2: required list<binary> jarList
+}
+
// Trigger
enum TTriggerState {
// The intermediate state of Create trigger, the trigger need to create has not yet activated on any DataNodes.
@@ -361,14 +372,6 @@ struct TGetTriggerTableResp {
2: required list<binary> allTriggerInformation
}
-// Show cluster
-struct TShowClusterResp {
- 1: required common.TSStatus status
- 2: required list<common.TConfigNodeLocation> configNodeList
- 3: required list<common.TDataNodeLocation> dataNodeList
- 4: required map<i32, string> nodeStatus
-}
-
// Get jars of the corresponding trigger
struct TGetTriggerJarReq {
1: required list<string> jarNameList
@@ -379,6 +382,14 @@ struct TGetTriggerJarResp {
2: required list<binary> jarList
}
+// Show cluster
+struct TShowClusterResp {
+ 1: required common.TSStatus status
+ 2: required list<common.TConfigNodeLocation> configNodeList
+ 3: required list<common.TDataNodeLocation> dataNodeList
+ 4: required map<i32, string> nodeStatus
+}
+
// Show datanodes
struct TDataNodeInfo {
1: required i32 dataNodeId
@@ -792,6 +803,11 @@ service IConfigNodeRPCService {
*/
TGetUDFTableResp getUDFTable()
+ /**
+ * Return the UDF jar list of the jar name list
+ */
+ TGetUDFJarResp getUDFJar(TGetUDFJarReq req)
+
// ======================================================
// Trigger
// ======================================================