You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/28 10:30:50 UTC
[iotdb] branch master updated: [IOTDB-4774]Register/recover UDF when registering/restart a datanode (ConfigNode Process) (#7751)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3648dbea7a [IOTDB-4774]Register/recover UDF when registering/restart a datanode (ConfigNode Process) (#7751)
3648dbea7a is described below
commit 3648dbea7acadd54c3fa9ce06fe014d274a44a91
Author: Weihao Li <60...@users.noreply.github.com>
AuthorDate: Fri Oct 28 18:30:42 2022 +0800
[IOTDB-4774]Register/recover UDF when registering/restart a datanode (ConfigNode Process) (#7751)
---
.../consensus/request/ConfigPhysicalPlan.java | 4 ++
.../consensus/request/ConfigPhysicalPlanType.java | 3 +-
.../consensus/request/read/GetUDFJarPlan.java | 68 ++++++++++++++++++++++
.../consensus/response/DataNodeRegisterResp.java | 6 ++
.../response/{TriggerJarResp.java => JarResp.java} | 10 ++--
.../iotdb/confignode/manager/ConfigManager.java | 19 ++++--
.../apache/iotdb/confignode/manager/IManager.java | 8 ++-
.../iotdb/confignode/manager/TriggerManager.java | 12 ++--
.../iotdb/confignode/manager/UDFManager.java | 25 ++++++++
.../iotdb/confignode/persistence/TriggerInfo.java | 8 +--
.../iotdb/confignode/persistence/UDFInfo.java | 24 ++++++++
.../persistence/executor/ConfigPlanExecutor.java | 3 +
.../thrift/ConfigNodeRPCServiceProcessor.java | 13 ++---
.../request/ConfigPhysicalPlanSerDeTest.java | 13 +++++
.../commons/udf/service/UDFClassLoaderManager.java | 5 +-
.../apache/iotdb/db/client/ConfigNodeClient.java | 14 ++---
.../java/org/apache/iotdb/db/service/DataNode.java | 11 ++--
.../src/main/thrift/confignode.thrift | 20 ++-----
18 files changed, 205 insertions(+), 61 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index 828bd3db0d..d6712b494a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetTransferringTrigger
import org.apache.iotdb.confignode.consensus.request.read.GetTriggerJarPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetTriggerLocationPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetUDFJarPlan;
import org.apache.iotdb.confignode.consensus.request.read.template.CheckTemplateSettablePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetAllSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetAllTemplateSetInfoPlan;
@@ -335,6 +336,9 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest {
case GetFunctionTable:
req = new GetFunctionTablePlan();
break;
+ case GetFunctionJar:
+ req = new GetUDFJarPlan();
+ break;
default:
throw new IOException("unknown PhysicalPlan type: " + typeNum);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index dc64311f05..2b3a0226eb 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -100,5 +100,6 @@ public enum ConfigPhysicalPlanType {
GetTriggerLocation,
GetTemplateSetInfo,
UpdateDataNodeConfiguration,
- GetFunctionTable
+ GetFunctionTable,
+ GetFunctionJar
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetUDFJarPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetUDFJarPlan.java
new file mode 100644
index 0000000000..d9e571e57a
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetUDFJarPlan.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.read;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class GetUDFJarPlan extends ConfigPhysicalPlan {
+
+ private List<String> jarNames;
+
+ public GetUDFJarPlan() {
+ super(ConfigPhysicalPlanType.GetFunctionJar);
+ }
+
+ public GetUDFJarPlan(List<String> triggerNames) {
+ super(ConfigPhysicalPlanType.GetFunctionJar);
+ jarNames = triggerNames;
+ }
+
+ public List<String> getJarNames() {
+ return jarNames;
+ }
+
+ @Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+ stream.writeInt(ConfigPhysicalPlanType.GetFunctionJar.ordinal());
+
+ ReadWriteIOUtils.write(jarNames.size(), stream);
+ for (String jarName : jarNames) {
+ ReadWriteIOUtils.write(jarName, stream);
+ }
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ int size = ReadWriteIOUtils.readInt(buffer);
+ jarNames = new ArrayList<>(size);
+ while (size > 0) {
+ jarNames.add(ReadWriteIOUtils.readString(buffer));
+ size--;
+ }
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java
index 562145610b..33aa3e4126 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java
@@ -38,6 +38,7 @@ public class DataNodeRegisterResp implements DataSet {
private TRatisConfig ratisConfig;
private byte[] templateInfo;
private List<ByteBuffer> allTriggerInformation;
+ private List<ByteBuffer> allUDFInformation;
public DataNodeRegisterResp() {
this.dataNodeId = null;
@@ -80,6 +81,10 @@ public class DataNodeRegisterResp implements DataSet {
this.allTriggerInformation = triggerInformation;
}
+ public void setAllUDFInformation(List<ByteBuffer> allUDFInformation) {
+ this.allUDFInformation = allUDFInformation;
+ }
+
public TDataNodeRegisterResp convertToRpcDataNodeRegisterResp() {
TDataNodeRegisterResp resp = new TDataNodeRegisterResp();
resp.setStatus(status);
@@ -93,6 +98,7 @@ public class DataNodeRegisterResp implements DataSet {
resp.setTemplateInfo(templateInfo);
resp.setRatisConfig(ratisConfig);
resp.setAllTriggerInformation(allTriggerInformation);
+ resp.setAllUDFInformation(allUDFInformation);
}
return resp;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/TriggerJarResp.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/JarResp.java
similarity index 80%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/TriggerJarResp.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/JarResp.java
index fe05faaa3e..59a8a56382 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/TriggerJarResp.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/JarResp.java
@@ -20,20 +20,20 @@
package org.apache.iotdb.confignode.consensus.response;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
import org.apache.iotdb.consensus.common.DataSet;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
-public class TriggerJarResp implements DataSet {
+public class JarResp implements DataSet {
private TSStatus status;
private final List<ByteBuffer> jarList;
- public TriggerJarResp(TSStatus status, List<ByteBuffer> jarList) {
+ public JarResp(TSStatus status, List<ByteBuffer> jarList) {
this.status = status;
this.jarList = jarList;
}
@@ -46,7 +46,7 @@ public class TriggerJarResp implements DataSet {
this.status = status;
}
- public TGetTriggerJarResp convertToThriftResponse() throws IOException {
- return new TGetTriggerJarResp(status, jarList);
+ public TGetJarInListResp convertToThriftResponse() throws IOException {
+ return new TGetJarInListResp(status, jarList);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index d66d339dd8..a882f6ec1b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -100,6 +100,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
@@ -108,8 +110,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
-import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarReq;
-import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
@@ -237,13 +237,16 @@ public class ConfigManager implements IManager {
DataNodeRegisterResp dataSet;
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
triggerManager.getTriggerInfo().acquireTriggerTableLock();
+ udfManager.getUdfInfo().acquireUDFTableLock();
try {
dataSet = (DataNodeRegisterResp) nodeManager.registerDataNode(registerDataNodePlan);
dataSet.setTemplateInfo(clusterSchemaManager.getAllTemplateSetInfo());
dataSet.setTriggerInformation(
triggerManager.getTriggerTable(false).getAllTriggerInformation());
+ dataSet.setAllUDFInformation(udfManager.getUDFTable().getAllUDFInformation());
} finally {
triggerManager.getTriggerInfo().releaseTriggerTableLock();
+ udfManager.getUdfInfo().releaseUDFTableLock();
}
} else {
dataSet = new DataNodeRegisterResp();
@@ -818,6 +821,14 @@ public class ConfigManager implements IManager {
: new TGetUDFTableResp(status, Collections.emptyList());
}
+ @Override
+ public TGetJarInListResp getUDFJar(TGetJarInListReq req) {
+ TSStatus status = confirmLeader();
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ? udfManager.getUDFJar(req)
+ : new TGetJarInListResp(status, Collections.emptyList());
+ }
+
@Override
public TSStatus createTrigger(TCreateTriggerReq req) {
TSStatus status = confirmLeader();
@@ -859,11 +870,11 @@ public class ConfigManager implements IManager {
}
@Override
- public TGetTriggerJarResp getTriggerJar(TGetTriggerJarReq req) {
+ public TGetJarInListResp getTriggerJar(TGetJarInListReq req) {
TSStatus status = confirmLeader();
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
? triggerManager.getTriggerJar(req)
- : new TGetTriggerJarResp(status, Collections.emptyList());
+ : new TGetJarInListResp(status, Collections.emptyList());
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index ef8976367e..ac3f57483b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -60,6 +60,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
@@ -68,8 +70,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
-import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarReq;
-import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
@@ -317,6 +317,8 @@ public interface IManager {
TGetUDFTableResp getUDFTable();
+ TGetJarInListResp getUDFJar(TGetJarInListReq req);
+
/** Create trigger */
TSStatus createTrigger(TCreateTriggerReq req);
@@ -333,7 +335,7 @@ public interface IManager {
TGetLocationForTriggerResp getLocationOfStatefulTrigger(String triggerName);
/** Get Trigger jar */
- TGetTriggerJarResp getTriggerJar(TGetTriggerJarReq req);
+ TGetJarInListResp getTriggerJar(TGetJarInListReq req);
/** Merge on all DataNodes */
TSStatus merge();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
index 0f3a263150..381ac62cbf 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
@@ -33,17 +33,17 @@ import org.apache.iotdb.confignode.consensus.request.read.GetTriggerLocationPlan
import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerLocationPlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggersOnTransferNodesPlan;
+import org.apache.iotdb.confignode.consensus.response.JarResp;
import org.apache.iotdb.confignode.consensus.response.TransferringTriggersResp;
-import org.apache.iotdb.confignode.consensus.response.TriggerJarResp;
import org.apache.iotdb.confignode.consensus.response.TriggerLocationResp;
import org.apache.iotdb.confignode.consensus.response.TriggerTableResp;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.persistence.TriggerInfo;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
-import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarReq;
-import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
@@ -143,9 +143,9 @@ public class TriggerManager {
.convertToThriftResponse();
}
- public TGetTriggerJarResp getTriggerJar(TGetTriggerJarReq req) {
+ public TGetJarInListResp getTriggerJar(TGetJarInListReq req) {
try {
- return ((TriggerJarResp)
+ return ((JarResp)
configManager
.getConsensusManager()
.read(new GetTriggerJarPlan(req.getJarNameList()))
@@ -153,7 +153,7 @@ public class TriggerManager {
.convertToThriftResponse();
} catch (IOException e) {
LOGGER.error("Fail to get TriggerJar", e);
- return new TGetTriggerJarResp(
+ return new TGetJarInListResp(
new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
.setMessage(e.getMessage()),
Collections.emptyList());
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
index b2a6dc6497..b122630cd7 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
@@ -26,11 +26,15 @@ import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
import org.apache.iotdb.confignode.consensus.request.read.GetFunctionTablePlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetUDFJarPlan;
import org.apache.iotdb.confignode.consensus.request.write.function.CreateFunctionPlan;
import org.apache.iotdb.confignode.consensus.request.write.function.DropFunctionPlan;
import org.apache.iotdb.confignode.consensus.response.FunctionTableResp;
+import org.apache.iotdb.confignode.consensus.response.JarResp;
import org.apache.iotdb.confignode.persistence.UDFInfo;
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionInstanceReq;
@@ -59,6 +63,10 @@ public class UDFManager {
this.udfInfo = udfInfo;
}
+ public UDFInfo getUdfInfo() {
+ return udfInfo;
+ }
+
public TSStatus createFunction(TCreateFunctionReq req) {
udfInfo.acquireUDFTableLock();
try {
@@ -161,4 +169,21 @@ public class UDFManager {
Collections.emptyList());
}
}
+
+ public TGetJarInListResp getUDFJar(TGetJarInListReq req) {
+ try {
+ return ((JarResp)
+ configManager
+ .getConsensusManager()
+ .read(new GetUDFJarPlan(req.getJarNameList()))
+ .getDataset())
+ .convertToThriftResponse();
+ } catch (IOException e) {
+ LOGGER.error("Fail to get TriggerJar", e);
+ return new TGetJarInListResp(
+ new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage(e.getMessage()),
+ Collections.emptyList());
+ }
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
index 9f283d97c1..6b1dea30c4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
@@ -38,8 +38,8 @@ import org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTrigger
import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerLocationPlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggersOnTransferNodesPlan;
+import org.apache.iotdb.confignode.consensus.response.JarResp;
import org.apache.iotdb.confignode.consensus.response.TransferringTriggersResp;
-import org.apache.iotdb.confignode.consensus.response.TriggerJarResp;
import org.apache.iotdb.confignode.consensus.response.TriggerLocationResp;
import org.apache.iotdb.confignode.consensus.response.TriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
@@ -193,7 +193,7 @@ public class TriggerInfo implements SnapshotProcessor {
}
}
- public TriggerJarResp getTriggerJar(GetTriggerJarPlan physicalPlan) {
+ public JarResp getTriggerJar(GetTriggerJarPlan physicalPlan) {
List<ByteBuffer> jarList = new ArrayList<>();
try {
for (String jarName : physicalPlan.getJarNames()) {
@@ -203,12 +203,12 @@ public class TriggerInfo implements SnapshotProcessor {
}
} catch (Exception e) {
LOGGER.error("Get TriggerJar failed", e);
- return new TriggerJarResp(
+ return new JarResp(
new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
.setMessage("Get TriggerJar failed, because " + e.getMessage()),
Collections.emptyList());
}
- return new TriggerJarResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), jarList);
+ return new JarResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), jarList);
}
public TransferringTriggersResp getTransferringTriggers(GetTransferringTriggersPlan req) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java
index 8c8b5d9918..7b48fde9c3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java
@@ -20,15 +20,18 @@
package org.apache.iotdb.confignode.persistence;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.executable.ExecutableManager;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.commons.udf.UDFInformation;
import org.apache.iotdb.commons.udf.UDFTable;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.consensus.request.read.GetUDFJarPlan;
import org.apache.iotdb.confignode.consensus.request.write.function.CreateFunctionPlan;
import org.apache.iotdb.confignode.consensus.request.write.function.DropFunctionPlan;
import org.apache.iotdb.confignode.consensus.response.FunctionTableResp;
+import org.apache.iotdb.confignode.consensus.response.JarResp;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.udf.api.exception.UDFManagementException;
@@ -39,7 +42,10 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
@@ -132,6 +138,24 @@ public class UDFInfo implements SnapshotProcessor {
udfTable.getAllNonBuiltInUDFInformation());
}
+ public JarResp getUDFJar(GetUDFJarPlan physicalPlan) {
+ List<ByteBuffer> jarList = new ArrayList<>();
+ try {
+ for (String jarName : physicalPlan.getJarNames()) {
+ jarList.add(
+ ExecutableManager.transferToBytebuffer(
+ UDFExecutableManager.getInstance().getFileStringUnderLibRootByName(jarName)));
+ }
+ } catch (Exception e) {
+ LOGGER.error("Get UDF_Jar failed", e);
+ return new JarResp(
+ new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage("Get UDF_Jar failed, because " + e.getMessage()),
+ Collections.emptyList());
+ }
+ return new JarResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), jarList);
+ }
+
public TSStatus dropFunction(DropFunctionPlan req) {
String udfName = req.getFunctionName();
if (udfTable.containsUDF(udfName)) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index c52bd02766..159e008c83 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetTransferringTrigger
import org.apache.iotdb.confignode.consensus.request.read.GetTriggerJarPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetTriggerLocationPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetUDFJarPlan;
import org.apache.iotdb.confignode.consensus.request.read.template.CheckTemplateSettablePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetPathsSetTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetSchemaTemplatePlan;
@@ -206,6 +207,8 @@ public class ConfigPlanExecutor {
return partitionInfo.getSeriesSlotList((GetSeriesSlotListPlan) req);
case GetFunctionTable:
return udfInfo.getUDFTable();
+ case GetFunctionJar:
+ return udfInfo.getUDFJar((GetUDFJarPlan) req);
default:
throw new UnknownPhysicalPlanTypeException(req.getType());
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 7a385760c1..e6192bfda7 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -95,6 +95,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
@@ -106,11 +108,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
-import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarReq;
-import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
-import org.apache.iotdb.confignode.rpc.thrift.TGetUDFJarReq;
-import org.apache.iotdb.confignode.rpc.thrift.TGetUDFJarResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
@@ -518,9 +516,8 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
}
@Override
- public TGetUDFJarResp getUDFJar(TGetUDFJarReq req) {
- // todo: implementation
- return null;
+ public TGetJarInListResp getUDFJar(TGetJarInListReq req) {
+ return configManager.getUDFJar(req);
}
@Override
@@ -549,7 +546,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
}
@Override
- public TGetTriggerJarResp getTriggerJar(TGetTriggerJarReq req) {
+ public TGetJarInListResp getTriggerJar(TGetJarInListReq req) {
return configManager.getTriggerJar(req);
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 52527f3ef2..1cddc7720e 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -59,6 +59,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetTransferringTrigger
import org.apache.iotdb.confignode.consensus.request.read.GetTriggerJarPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetTriggerLocationPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetUDFJarPlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetAllSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetAllTemplateSetInfoPlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetPathsSetTemplatePlan;
@@ -1206,4 +1207,16 @@ public class ConfigPhysicalPlanSerDeTest {
ConfigPhysicalPlan.Factory.create(getUDFTablePlan0.serializeToByteBuffer())
instanceof GetFunctionTablePlan);
}
+
+ @Test
+ public void GetUDFJarPlanTest() throws IOException {
+ List<String> jarNames = new ArrayList<>();
+ jarNames.add("test1");
+ jarNames.add("test2");
+ GetUDFJarPlan getUDFJarPlan0 = new GetUDFJarPlan(jarNames);
+
+ GetUDFJarPlan getUDFJarPlan1 =
+ (GetUDFJarPlan) ConfigPhysicalPlan.Factory.create(getUDFJarPlan0.serializeToByteBuffer());
+ Assert.assertEquals(getUDFJarPlan0.getJarNames(), getUDFJarPlan1.getJarNames());
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java
index 9d98576753..c1192a4e26 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java
@@ -74,7 +74,9 @@ public class UDFClassLoaderManager implements IService {
public UDFClassLoader updateAndGetActiveClassLoader() throws IOException {
UDFClassLoader deprecatedClassLoader = activeClassLoader;
activeClassLoader = new UDFClassLoader(libRoot);
- deprecatedClassLoader.markAsDeprecated();
+ if (deprecatedClassLoader != null) {
+ deprecatedClassLoader.markAsDeprecated();
+ }
return activeClassLoader;
}
@@ -113,6 +115,7 @@ public class UDFClassLoaderManager implements IService {
private static UDFClassLoaderManager INSTANCE = null;
public static synchronized UDFClassLoaderManager setupAndGetInstance(String libRoot) {
+
if (INSTANCE == null) {
INSTANCE = new UDFClassLoaderManager(libRoot);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 75399d0071..57241adf8d 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -61,6 +61,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
@@ -72,11 +74,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
-import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarReq;
-import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
-import org.apache.iotdb.confignode.rpc.thrift.TGetUDFJarReq;
-import org.apache.iotdb.confignode.rpc.thrift.TGetUDFJarResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
@@ -1103,10 +1101,10 @@ public class ConfigNodeClient
}
@Override
- public TGetUDFJarResp getUDFJar(TGetUDFJarReq req) throws TException {
+ public TGetJarInListResp getUDFJar(TGetJarInListReq req) throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
try {
- TGetUDFJarResp resp = client.getUDFJar(req);
+ TGetJarInListResp resp = client.getUDFJar(req);
if (!updateConfigNodeLeader(resp.getStatus())) {
return resp;
}
@@ -1224,10 +1222,10 @@ public class ConfigNodeClient
}
@Override
- public TGetTriggerJarResp getTriggerJar(TGetTriggerJarReq req) throws TException {
+ public TGetJarInListResp getTriggerJar(TGetJarInListReq req) throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
try {
- TGetTriggerJarResp resp = client.getTriggerJar(req);
+ TGetJarInListResp resp = client.getTriggerJar(req);
if (!updateConfigNodeLeader(resp.getStatus())) {
return resp;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 7580b3f675..273670ad97 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -42,10 +42,8 @@ import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFManagementService;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
-import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarReq;
-import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarResp;
-import org.apache.iotdb.confignode.rpc.thrift.TGetUDFJarReq;
-import org.apache.iotdb.confignode.rpc.thrift.TGetUDFJarResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.client.ConfigNodeClient;
import org.apache.iotdb.db.client.ConfigNodeInfo;
@@ -422,6 +420,7 @@ public class DataNode implements DataNodeMBean {
private void initUDFRelatedInstance() throws StartupException {
try {
UDFExecutableManager.setupAndGetInstance(config.getUdfTemporaryLibDir(), config.getUdfDir());
+ UDFClassLoaderManager.setupAndGetInstance(config.getUdfDir());
} catch (IOException e) {
throw new StartupException(e);
}
@@ -469,7 +468,7 @@ public class DataNode implements DataNodeMBean {
try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
List<String> jarNameList =
udfInformationList.stream().map(UDFInformation::getJarName).collect(Collectors.toList());
- TGetUDFJarResp resp = configNodeClient.getUDFJar(new TGetUDFJarReq(jarNameList));
+ TGetJarInListResp resp = configNodeClient.getUDFJar(new TGetJarInListReq(jarNameList));
if (resp.getStatus().getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
throw new StartupException("Failed to get UDF jar from config node.");
}
@@ -576,7 +575,7 @@ public class DataNode implements DataNodeMBean {
triggerInformationList.stream()
.map(TriggerInformation::getJarName)
.collect(Collectors.toList());
- TGetTriggerJarResp resp = configNodeClient.getTriggerJar(new TGetTriggerJarReq(jarNameList));
+ TGetJarInListResp resp = configNodeClient.getTriggerJar(new TGetJarInListReq(jarNameList));
if (resp.getStatus().getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
throw new StartupException("Failed to get trigger jar from config node.");
}
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 770f4baf49..557dab18b2 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -326,16 +326,6 @@ struct TGetUDFTableResp {
2: required list<binary> allUDFInformation
}
-// Get jars of the corresponding trigger
-struct TGetUDFJarReq {
- 1: required list<string> jarNameList
-}
-
-struct TGetUDFJarResp {
- 1: required common.TSStatus status
- 2: required list<binary> jarList
-}
-
// Trigger
enum TTriggerState {
// The intermediate state of Create trigger, the trigger need to create has not yet activated on any DataNodes.
@@ -377,12 +367,12 @@ struct TGetTriggerTableResp {
2: required list<binary> allTriggerInformation
}
-// Get jars of the corresponding trigger
-struct TGetTriggerJarReq {
+// Get jars of the corresponding jarName
+struct TGetJarInListReq {
1: required list<string> jarNameList
}
-struct TGetTriggerJarResp {
+struct TGetJarInListResp {
1: required common.TSStatus status
2: required list<binary> jarList
}
@@ -820,7 +810,7 @@ service IConfigNodeRPCService {
/**
* Return the UDF jar list of the jar name list
*/
- TGetUDFJarResp getUDFJar(TGetUDFJarReq req)
+ TGetJarInListResp getUDFJar(TGetJarInListReq req)
// ======================================================
// Trigger
@@ -859,7 +849,7 @@ service IConfigNodeRPCService {
/**
* Return the trigger jar list of the trigger name list
*/
- TGetTriggerJarResp getTriggerJar(TGetTriggerJarReq req)
+ TGetJarInListResp getTriggerJar(TGetJarInListReq req)
// ======================================================
// Maintenance Tools