You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/26 12:30:37 UTC

[iotdb] branch master updated: [IOTDB-4756] Register UDF when restart/register DataNode

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b268472003 [IOTDB-4756] Register UDF when restart/register DataNode
b268472003 is described below

commit b268472003a45f7916351e546f0aad4cddb41bb9
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Wed Oct 26 20:30:30 2022 +0800

    [IOTDB-4756] Register UDF when restart/register DataNode
---
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   9 ++
 .../commons/udf/service/UDFManagementService.java  |   6 +-
 .../apache/iotdb/db/client/ConfigNodeClient.java   |  19 ++++
 .../java/org/apache/iotdb/db/service/DataNode.java | 117 ++++++++++++++++++---
 .../db/service/ResourcesInformationHolder.java     |  12 +++
 .../src/main/thrift/confignode.thrift              |  32 ++++--
 6 files changed, 173 insertions(+), 22 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 624ed79a66..50300e641b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -107,6 +107,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetUDFJarReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetUDFJarResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
@@ -504,6 +506,12 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
     return null;
   }
 
+  @Override
+  public TGetUDFJarResp getUDFJar(TGetUDFJarReq req) {
+    // todo: implementation
+    return null;
+  }
+
   @Override
   public TSStatus createTrigger(TCreateTriggerReq req) {
     return configManager.createTrigger(req);
@@ -529,6 +537,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
     return configManager.getLocationOfStatefulTrigger(triggerName);
   }
 
+  @Override
   public TGetTriggerJarResp getTriggerJar(TGetTriggerJarReq req) {
     return configManager.getTriggerJar(req);
   }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java
index 6b4ec14a24..47f9821a82 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java
@@ -190,7 +190,11 @@ public class UDFManagementService {
     }
   }
 
-  private void doRegister(UDFInformation udfInformation) throws UDFManagementException {
+  /**
+   * Only call this method directly for registering new data node, otherwise you need to call
+   * register().
+   */
+  public void doRegister(UDFInformation udfInformation) throws UDFManagementException {
     String functionName = udfInformation.getFunctionName();
     String className = udfInformation.getClassName();
     try {
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index fd2907734b..0e99bed42c 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -74,6 +74,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetUDFJarReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetUDFJarResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
@@ -892,6 +894,7 @@ public class ConfigNodeClient
     throw new TException(MSG_RECONNECTION_FAIL);
   }
 
+  @Override
   public TGetUDFTableResp getUDFTable() throws TException {
     for (int i = 0; i < RETRY_NUM; i++) {
       try {
@@ -907,6 +910,22 @@ public class ConfigNodeClient
     throw new TException(MSG_RECONNECTION_FAIL);
   }
 
+  @Override
+  public TGetUDFJarResp getUDFJar(TGetUDFJarReq req) throws TException {
+    for (int i = 0; i < RETRY_NUM; i++) {
+      try {
+        TGetUDFJarResp resp = client.getUDFJar(req);
+        if (!updateConfigNodeLeader(resp.getStatus())) {
+          return resp;
+        }
+      } catch (TException e) {
+        configLeader = null;
+      }
+      reconnect();
+    }
+    throw new TException(MSG_RECONNECTION_FAIL);
+  }
+
   @Override
   public TSStatus createTrigger(TCreateTriggerReq req) throws TException {
     for (int i = 0; i < 5; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 2e4726fd06..fd6b215800 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -35,12 +35,16 @@ import org.apache.iotdb.commons.service.StartupChecks;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
 import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
 import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
+import org.apache.iotdb.commons.udf.UDFInformation;
 import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
 import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
+import org.apache.iotdb.commons.udf.service.UDFManagementService;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetUDFJarReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetUDFJarResp;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.client.ConfigNodeClient;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
@@ -74,6 +78,7 @@ import org.apache.iotdb.db.trigger.service.TriggerManagementService;
 import org.apache.iotdb.db.wal.WALManager;
 import org.apache.iotdb.db.wal.utils.WALMode;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.udf.api.exception.UDFManagementException;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -201,6 +206,9 @@ public class DataNode implements DataNodeMBean {
         ClusterTemplateManager.getInstance()
             .updateTemplateSetInfo(dataNodeRegisterResp.getTemplateInfo());
 
+        // store udfInformationList
+        getUDFInformationList(dataNodeRegisterResp.getAllUDFInformation());
+
         // store triggerInformationList
         getTriggerInformationList(dataNodeRegisterResp.getAllTriggerInformation());
 
@@ -270,8 +278,8 @@ public class DataNode implements DataNodeMBean {
   }
 
   private void prepareResources() throws StartupException {
-    prepareTriggerResources();
     prepareUDFResources();
+    prepareTriggerResources();
   }
 
   /** register services and set up DataNode */
@@ -407,6 +415,101 @@ public class DataNode implements DataNodeMBean {
     registerManager.register(UDFClassLoaderManager.setupAndGetInstance(config.getUdfDir()));
   }
 
+  private void initUDFRelatedInstance() throws StartupException {
+    try {
+      UDFExecutableManager.setupAndGetInstance(config.getUdfTemporaryLibDir(), config.getUdfDir());
+    } catch (IOException e) {
+      throw new StartupException(e);
+    }
+  }
+
+  private void prepareUDFResources() throws StartupException {
+    initUDFRelatedInstance();
+    if (resourcesInformationHolder.getUDFInformationList() == null
+        || resourcesInformationHolder.getUDFInformationList().isEmpty()) {
+      return;
+    }
+
+    // get jars from config node
+    List<UDFInformation> udfNeedJarList = getJarListForUDF();
+    int index = 0;
+    while (index < udfNeedJarList.size()) {
+      List<UDFInformation> curList = new ArrayList<>();
+      int offset = 0;
+      while (offset < ResourcesInformationHolder.getJarNumOfOneRpc()
+          && index + offset < udfNeedJarList.size()) {
+        curList.add(udfNeedJarList.get(index + offset));
+        offset++;
+      }
+      index += (offset + 1);
+      getJarOfUDFs(curList);
+    }
+
+    // create instances of triggers and do registration
+    try {
+      for (UDFInformation udfInformation : resourcesInformationHolder.getUDFInformationList()) {
+        UDFManagementService.getInstance().doRegister(udfInformation);
+      }
+    } catch (Exception e) {
+      throw new StartupException(e);
+    }
+
+    logger.debug("successfully registered all the UDFs");
+    for (UDFInformation udfInformation :
+        UDFManagementService.getInstance().getAllUDFInformation()) {
+      logger.debug("get udf: {}", udfInformation.getFunctionName());
+    }
+  }
+
+  private void getJarOfUDFs(List<UDFInformation> udfInformationList) throws StartupException {
+    try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+      List<String> jarNameList =
+          udfInformationList.stream().map(UDFInformation::getJarName).collect(Collectors.toList());
+      TGetUDFJarResp resp = configNodeClient.getUDFJar(new TGetUDFJarReq(jarNameList));
+      if (resp.getStatus().getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
+        throw new StartupException("Failed to get UDF jar from config node.");
+      }
+      List<ByteBuffer> jarList = resp.getJarList();
+      for (int i = 0; i < udfInformationList.size(); i++) {
+        UDFExecutableManager.getInstance()
+            .writeToLibDir(jarList.get(i), udfInformationList.get(i).getJarName());
+      }
+    } catch (IOException | TException e) {
+      throw new StartupException(e);
+    }
+  }
+
+  /** Generate a list for UDFs that do not have jar on this node. */
+  private List<UDFInformation> getJarListForUDF() {
+    List<UDFInformation> res = new ArrayList<>();
+    for (UDFInformation udfInformation : resourcesInformationHolder.getUDFInformationList()) {
+      // jar does not exist, add current triggerInformation to list
+      if (!UDFExecutableManager.getInstance().hasFileUnderLibRoot(udfInformation.getJarName())) {
+        res.add(udfInformation);
+      } else {
+        try {
+          // local jar has conflicts with jar on config node, add current triggerInformation to list
+          if (UDFManagementService.getInstance().isLocalJarConflicted(udfInformation)) {
+            res.add(udfInformation);
+          }
+        } catch (UDFManagementException e) {
+          res.add(udfInformation);
+        }
+      }
+    }
+    return res;
+  }
+
+  private void getUDFInformationList(List<ByteBuffer> allUDFInformation) {
+    if (allUDFInformation != null && !allUDFInformation.isEmpty()) {
+      List<UDFInformation> list = new ArrayList<>();
+      for (ByteBuffer UDFInformationByteBuffer : allUDFInformation) {
+        list.add(UDFInformation.deserialize(UDFInformationByteBuffer));
+      }
+      resourcesInformationHolder.setUDFInformationList(list);
+    }
+  }
+
   private void initTriggerRelatedInstance() throws StartupException {
     try {
       TriggerExecutableManager.setupAndGetInstance(
@@ -516,18 +619,6 @@ public class DataNode implements DataNodeMBean {
     }
   }
 
-  private void initUDFRelatedInstance() throws StartupException {
-    try {
-      UDFExecutableManager.setupAndGetInstance(config.getUdfTemporaryLibDir(), config.getUdfDir());
-    } catch (IOException e) {
-      throw new StartupException(e);
-    }
-  }
-
-  private void prepareUDFResources() throws StartupException {
-    initUDFRelatedInstance();
-  }
-
   private void initSchemaEngine() {
     long time = System.currentTimeMillis();
     SchemaEngine.getInstance().init();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ResourcesInformationHolder.java b/server/src/main/java/org/apache/iotdb/db/service/ResourcesInformationHolder.java
index 480b78e008..f26436c1d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ResourcesInformationHolder.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ResourcesInformationHolder.java
@@ -20,12 +20,16 @@
 package org.apache.iotdb.db.service;
 
 import org.apache.iotdb.commons.trigger.TriggerInformation;
+import org.apache.iotdb.commons.udf.UDFInformation;
 
 import java.util.List;
 
 public class ResourcesInformationHolder {
   private static final int JAR_NUM_OF_ONE_RPC = 10;
 
+  /** store the list when registering in config node for preparing udf related resources */
+  private List<UDFInformation> udfInformationList;
+
   /** store the list when registering in config node for preparing trigger related resources */
   private List<TriggerInformation> triggerInformationList;
 
@@ -33,6 +37,14 @@ public class ResourcesInformationHolder {
     return JAR_NUM_OF_ONE_RPC;
   }
 
+  public List<UDFInformation> getUDFInformationList() {
+    return udfInformationList;
+  }
+
+  public void setUDFInformationList(List<UDFInformation> udfInformationList) {
+    this.udfInformationList = udfInformationList;
+  }
+
   public List<TriggerInformation> getTriggerInformationList() {
     return triggerInformationList;
   }
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 670e80ad7f..3a5be47ff4 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -37,6 +37,7 @@ struct TDataNodeRegisterResp {
   5: optional binary templateInfo
   6: optional TRatisConfig ratisConfig
   7: optional list<binary> allTriggerInformation
+  8: optional list<binary> allUDFInformation
 }
 
 struct TGlobalConfig {
@@ -320,6 +321,16 @@ struct TGetUDFTableResp {
   2: required list<binary> allUDFInformation
 }
 
+// Get jars of the corresponding trigger
+struct TGetUDFJarReq {
+  1: required list<string> jarNameList
+}
+
+struct TGetUDFJarResp {
+  1: required common.TSStatus status
+  2: required list<binary> jarList
+}
+
 // Trigger
 enum TTriggerState {
   // The intermediate state of Create trigger, the trigger need to create has not yet activated on any DataNodes.
@@ -361,14 +372,6 @@ struct TGetTriggerTableResp {
   2: required list<binary> allTriggerInformation
 }
 
-// Show cluster
-struct TShowClusterResp {
-  1: required common.TSStatus status
-  2: required list<common.TConfigNodeLocation> configNodeList
-  3: required list<common.TDataNodeLocation> dataNodeList
-  4: required map<i32, string> nodeStatus
-}
-
 // Get jars of the corresponding trigger
 struct TGetTriggerJarReq {
   1: required list<string> jarNameList
@@ -379,6 +382,14 @@ struct TGetTriggerJarResp {
   2: required list<binary> jarList
 }
 
+// Show cluster
+struct TShowClusterResp {
+  1: required common.TSStatus status
+  2: required list<common.TConfigNodeLocation> configNodeList
+  3: required list<common.TDataNodeLocation> dataNodeList
+  4: required map<i32, string> nodeStatus
+}
+
 // Show datanodes
 struct TDataNodeInfo {
   1: required i32 dataNodeId
@@ -792,6 +803,11 @@ service IConfigNodeRPCService {
    */
   TGetUDFTableResp getUDFTable()
 
+  /**
+   * Return the UDF jar list of the jar name list
+   */
+  TGetUDFJarResp getUDFJar(TGetUDFJarReq req)
+
   // ======================================================
   // Trigger
   // ======================================================