You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/14 05:46:07 UTC
[iotdb] branch master updated: [IOTDB-4589] Register/recover trigger when registering/restart a datanode (#7575)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d8012d29b8 [IOTDB-4589] Register/recover trigger when registering/restart a datanode (#7575)
d8012d29b8 is described below
commit d8012d29b8174ec026eeb67e30062ba9a7bfab5b
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Fri Oct 14 13:46:01 2022 +0800
[IOTDB-4589] Register/recover trigger when registering/restart a datanode (#7575)
---
.../consensus/request/ConfigPhysicalPlan.java | 4 +
.../consensus/request/ConfigPhysicalPlanType.java | 1 +
.../consensus/request/read/GetTriggerJarPlan.java | 68 +++++++++
.../consensus/response/DataNodeRegisterResp.java | 11 ++
.../consensus/response/TriggerJarResp.java | 52 +++++++
.../iotdb/confignode/manager/ConfigManager.java | 22 ++-
.../apache/iotdb/confignode/manager/IManager.java | 5 +
.../iotdb/confignode/manager/TriggerManager.java | 21 +++
.../iotdb/confignode/persistence/TriggerInfo.java | 24 +++
.../persistence/executor/ConfigPlanExecutor.java | 3 +
.../thrift/ConfigNodeRPCServiceProcessor.java | 8 +
.../request/ConfigPhysicalPlanSerDeTest.java | 14 ++
.../apache/iotdb/db/client/ConfigNodeClient.java | 18 +++
.../java/org/apache/iotdb/db/service/DataNode.java | 124 +++++++++++++++-
.../db/service/ResourcesInformationHolder.java | 43 ++++++
.../iotdb/db/trigger/executor/TriggerExecutor.java | 4 +
.../trigger/service/TriggerManagementService.java | 161 +++++++++++++--------
.../src/main/thrift/confignode.thrift | 16 ++
18 files changed, 535 insertions(+), 64 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index 62d1bab979..19587cdac5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan
import org.apache.iotdb.confignode.consensus.request.read.GetSeriesSlotListPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetTimeSlotListPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetTriggerJarPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.CheckTemplateSettablePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetAllSchemaTemplatePlan;
@@ -238,6 +239,9 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest {
case GetTriggerTable:
req = new GetTriggerTablePlan();
break;
+ case GetTriggerJar:
+ req = new GetTriggerJarPlan();
+ break;
case CreateSchemaTemplate:
req = new CreateSchemaTemplatePlan();
break;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index ca96142d45..29da4384d9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -88,6 +88,7 @@ public enum ConfigPhysicalPlanType {
DeleteTriggerInTable,
GetTriggerTable,
UpdateTriggerStateInTable,
+ GetTriggerJar,
GetRouting,
GetSeriesSlotList,
GetTimeSlotList
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerJarPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerJarPlan.java
new file mode 100644
index 0000000000..cde7842db7
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerJarPlan.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.read;
+
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class GetTriggerJarPlan extends ConfigPhysicalPlan {
+
+ private List<String> jarNames;
+
+ public GetTriggerJarPlan() {
+ super(ConfigPhysicalPlanType.GetTriggerJar);
+ }
+
+ public GetTriggerJarPlan(List<String> triggerNames) {
+ super(ConfigPhysicalPlanType.GetTriggerJar);
+ jarNames = triggerNames;
+ }
+
+ public List<String> getJarNames() {
+ return jarNames;
+ }
+
+ @Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+ stream.writeInt(ConfigPhysicalPlanType.GetTriggerJar.ordinal());
+
+ ReadWriteIOUtils.write(jarNames.size(), stream);
+ for (String jarName : jarNames) {
+ ReadWriteIOUtils.write(jarName, stream);
+ }
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ int size = ReadWriteIOUtils.readInt(buffer);
+ jarNames = new ArrayList<>(size);
+ while (size > 0) {
+ jarNames.add(ReadWriteIOUtils.readString(buffer));
+ size--;
+ }
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java
index 94d56fa854..5c4881ca77 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.rpc.TSStatusCode;
+import java.nio.ByteBuffer;
import java.util.List;
public class DataNodeRegisterResp implements DataSet {
@@ -36,6 +37,7 @@ public class DataNodeRegisterResp implements DataSet {
private TGlobalConfig globalConfig;
private TRatisConfig ratisConfig;
private byte[] templateInfo;
+ private List<ByteBuffer> allTriggerInformation;
public DataNodeRegisterResp() {
this.dataNodeId = null;
@@ -70,6 +72,14 @@ public class DataNodeRegisterResp implements DataSet {
this.templateInfo = templateInfo;
}
+ public List<ByteBuffer> getTriggerInformation() {
+ return allTriggerInformation;
+ }
+
+ public void setTriggerInformation(List<ByteBuffer> triggerInformation) {
+ this.allTriggerInformation = triggerInformation;
+ }
+
public TDataNodeRegisterResp convertToRpcDataNodeRegisterResp() {
TDataNodeRegisterResp resp = new TDataNodeRegisterResp();
resp.setStatus(status);
@@ -81,6 +91,7 @@ public class DataNodeRegisterResp implements DataSet {
resp.setGlobalConfig(globalConfig);
resp.setTemplateInfo(templateInfo);
resp.setRatisConfig(ratisConfig);
+ resp.setAllTriggerInformation(allTriggerInformation);
}
return resp;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/TriggerJarResp.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/TriggerJarResp.java
new file mode 100644
index 0000000000..fe05faaa3e
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/TriggerJarResp.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.response;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarResp;
+import org.apache.iotdb.consensus.common.DataSet;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class TriggerJarResp implements DataSet {
+
+ private TSStatus status;
+
+ private final List<ByteBuffer> jarList;
+
+ public TriggerJarResp(TSStatus status, List<ByteBuffer> jarList) {
+ this.status = status;
+ this.jarList = jarList;
+ }
+
+ public TSStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(TSStatus status) {
+ this.status = status;
+ }
+
+ public TGetTriggerJarResp convertToThriftResponse() throws IOException {
+ return new TGetTriggerJarResp(status, jarList);
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 63b8b44c71..793297de22 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -100,6 +100,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetRoutingResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
@@ -223,8 +225,14 @@ public class ConfigManager implements IManager {
TSStatus status = confirmLeader();
DataNodeRegisterResp dataSet;
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataSet = (DataNodeRegisterResp) nodeManager.registerDataNode(registerDataNodePlan);
- dataSet.setTemplateInfo(clusterSchemaManager.getAllTemplateSetInfo());
+ triggerManager.getTriggerInfo().acquireTriggerTableLock();
+ try {
+ dataSet = (DataNodeRegisterResp) nodeManager.registerDataNode(registerDataNodePlan);
+ dataSet.setTemplateInfo(clusterSchemaManager.getAllTemplateSetInfo());
+ dataSet.setTriggerInformation(triggerManager.getTriggerTable().getAllTriggerInformation());
+ } finally {
+ triggerManager.getTriggerInfo().releaseTriggerTableLock();
+ }
} else {
dataSet = new DataNodeRegisterResp();
dataSet.setStatus(status);
@@ -794,7 +802,15 @@ public class ConfigManager implements IManager {
TSStatus status = confirmLeader();
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
? triggerManager.getTriggerTable()
- : new TGetTriggerTableResp().setStatus(status);
+ : new TGetTriggerTableResp(status, Collections.emptyList());
+ }
+
+ @Override
+ public TGetTriggerJarResp getTriggerJar(TGetTriggerJarReq req) {
+ TSStatus status = confirmLeader();
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ? triggerManager.getTriggerJar(req)
+ : new TGetTriggerJarResp(status, Collections.emptyList());
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 45707f8943..4d6aad5e57 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -61,6 +61,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetRoutingResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
@@ -307,6 +309,9 @@ public interface IManager {
/** Show trigger & DataNode start */
TGetTriggerTableResp getTriggerTable();
+ /** Get Trigger jar */
+ TGetTriggerJarResp getTriggerJar(TGetTriggerJarReq req);
+
/** Merge on all DataNodes */
TSStatus merge();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
index 4100251d30..49e7a42512 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
@@ -24,11 +24,15 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathDeserializeUtil;
import org.apache.iotdb.commons.trigger.TriggerInformation;
+import org.apache.iotdb.confignode.consensus.request.read.GetTriggerJarPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
+import org.apache.iotdb.confignode.consensus.response.TriggerJarResp;
import org.apache.iotdb.confignode.consensus.response.TriggerTableResp;
import org.apache.iotdb.confignode.persistence.TriggerInfo;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -109,4 +113,21 @@ public class TriggerManager {
Collections.emptyList());
}
}
+
+ public TGetTriggerJarResp getTriggerJar(TGetTriggerJarReq req) {
+ try {
+ return ((TriggerJarResp)
+ configManager
+ .getConsensusManager()
+ .read(new GetTriggerJarPlan(req.getJarNameList()))
+ .getDataset())
+ .convertToThriftResponse();
+ } catch (IOException e) {
+ LOGGER.error("Fail to get TriggerJar", e);
+ return new TGetTriggerJarResp(
+ new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage(e.getMessage()),
+ Collections.emptyList());
+ }
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
index 2f84afec3d..bcbdee832a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.persistence;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.executable.ExecutableManager;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.commons.trigger.TriggerTable;
@@ -27,9 +28,11 @@ import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.consensus.request.read.GetTriggerJarPlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
+import org.apache.iotdb.confignode.consensus.response.TriggerJarResp;
import org.apache.iotdb.confignode.consensus.response.TriggerTableResp;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -45,7 +48,10 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantLock;
@@ -168,6 +174,24 @@ public class TriggerInfo implements SnapshotProcessor {
triggerTable.getAllTriggerInformation());
}
+ public TriggerJarResp getTriggerJar(GetTriggerJarPlan physicalPlan) {
+ List<ByteBuffer> jarList = new ArrayList<>();
+ try {
+ for (String jarName : physicalPlan.getJarNames()) {
+ jarList.add(
+ ExecutableManager.transferToBytebuffer(
+ TriggerExecutableManager.getInstance().getFileStringUnderLibRootByName(jarName)));
+ }
+ } catch (Exception e) {
+ LOGGER.error("Get TriggerJar failed", e);
+ return new TriggerJarResp(
+ new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage("Get TriggerJar failed, because " + e.getMessage()),
+ Collections.emptyList());
+ }
+ return new TriggerJarResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), jarList);
+ }
+
/** only used in Test */
public Map<String, TriggerInformation> getRawTriggerTable() {
return triggerTable.getTable();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 983aafcf79..efaa2d1658 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan
import org.apache.iotdb.confignode.consensus.request.read.GetSeriesSlotListPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetTimeSlotListPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetTriggerJarPlan;
import org.apache.iotdb.confignode.consensus.request.read.template.CheckTemplateSettablePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetPathsSetTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetSchemaTemplatePlan;
@@ -180,6 +181,8 @@ public class ConfigPlanExecutor {
return syncInfo.showPipe((ShowPipePlan) req);
case GetTriggerTable:
return triggerInfo.getTriggerTable();
+ case GetTriggerJar:
+ return triggerInfo.getTriggerJar((GetTriggerJarPlan) req);
case GetRouting:
return partitionInfo.getRouting((GetRoutingPlan) req);
case GetTimeSlotList:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 2718b56f59..67e45635fa 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -100,6 +100,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
@@ -501,10 +503,16 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
return configManager.dropTrigger(req);
}
+ @Override
public TGetTriggerTableResp getTriggerTable() {
return configManager.getTriggerTable();
}
+ @Override
+ public TGetTriggerJarResp getTriggerJar(TGetTriggerJarReq req) {
+ return configManager.getTriggerJar(req);
+ }
+
@Override
public TSStatus merge() throws TException {
return configManager.merge();
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 410e3abd91..ed8f2a26c8 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -52,6 +52,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan
import org.apache.iotdb.confignode.consensus.request.read.GetSeriesSlotListPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetTimeSlotListPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetTriggerJarPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetAllSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetAllTemplateSetInfoPlan;
@@ -1000,6 +1001,19 @@ public class ConfigPhysicalPlanSerDeTest {
updateTriggerStateInTablePlan1.getTriggerState());
}
+ @Test
+ public void GetTriggerJarPlanTest() throws IOException {
+ List<String> jarNames = new ArrayList<>();
+ jarNames.add("test1");
+ jarNames.add("test2");
+ GetTriggerJarPlan getTriggerJarPlan0 = new GetTriggerJarPlan(jarNames);
+
+ GetTriggerJarPlan getTriggerJarPlan1 =
+ (GetTriggerJarPlan)
+ ConfigPhysicalPlan.Factory.create(getTriggerJarPlan0.serializeToByteBuffer());
+ Assert.assertEquals(getTriggerJarPlan0.getJarNames(), getTriggerJarPlan1.getJarNames());
+ }
+
@Test
public void GetRoutingPlanTest() throws IOException {
GetRoutingPlan getRoutingPlan0 =
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index af69376851..fb8c87da7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -67,6 +67,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
@@ -934,6 +936,22 @@ public class ConfigNodeClient
throw new TException(MSG_RECONNECTION_FAIL);
}
+ @Override
+ public TGetTriggerJarResp getTriggerJar(TGetTriggerJarReq req) throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TGetTriggerJarResp resp = client.getTriggerJar(req);
+ if (!updateConfigNodeLeader(resp.getStatus())) {
+ return resp;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
@Override
public TSStatus createSchemaTemplate(TCreateSchemaTemplateReq req) throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index f2493ee567..4b0e82f43b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -32,12 +32,16 @@ import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.JMXService;
import org.apache.iotdb.commons.service.RegisterManager;
import org.apache.iotdb.commons.service.StartupChecks;
+import org.apache.iotdb.commons.trigger.TriggerInformation;
+import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarResp;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.client.ConfigNodeClient;
import org.apache.iotdb.db.client.ConfigNodeInfo;
@@ -65,6 +69,8 @@ import org.apache.iotdb.db.service.metrics.MetricService;
import org.apache.iotdb.db.service.thrift.impl.ClientRPCServiceImpl;
import org.apache.iotdb.db.service.thrift.impl.DataNodeRegionManager;
import org.apache.iotdb.db.sync.SyncService;
+import org.apache.iotdb.db.trigger.executor.TriggerExecutor;
+import org.apache.iotdb.db.trigger.service.TriggerManagementService;
import org.apache.iotdb.db.wal.WALManager;
import org.apache.iotdb.db.wal.utils.WALMode;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -75,8 +81,10 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
public class DataNode implements DataNodeMBean {
private static final Logger logger = LoggerFactory.getLogger(DataNode.class);
@@ -94,6 +102,10 @@ public class DataNode implements DataNodeMBean {
private final TEndPoint thisNode = new TEndPoint();
+ /** Hold the information of trigger, udf...... */
+ private final ResourcesInformationHolder resourcesInformationHolder =
+ new ResourcesInformationHolder();
+
private DataNode() {
// we do not init anything here, so that we can re-initialize the instance in IT.
}
@@ -186,6 +198,9 @@ public class DataNode implements DataNodeMBean {
ClusterTemplateManager.getInstance()
.updateTemplateSetInfo(dataNodeRegisterResp.getTemplateInfo());
+ // store triggerInformationList
+ getTriggerInformationList(dataNodeRegisterResp.getAllTriggerInformation());
+
if (dataNodeRegisterResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()
|| dataNodeRegisterResp.getStatus().getCode()
@@ -251,6 +266,10 @@ public class DataNode implements DataNodeMBean {
throw new StartupException("Cannot register to the cluster.");
}
+ private void prepareResources() throws StartupException {
+ prepareTriggerResources();
+ }
+
/** register services and set up DataNode */
private void active() throws StartupException {
try {
@@ -273,6 +292,9 @@ public class DataNode implements DataNodeMBean {
private void setUp() throws StartupException, QueryProcessException {
logger.info("Setting up IoTDB DataNode...");
+ // get resources for trigger,udf...
+ prepareResources();
+
Runtime.getRuntime().addShutdownHook(new IoTDBShutdownHook());
setUncaughtExceptionHandler();
initServiceProvider();
@@ -297,7 +319,6 @@ public class DataNode implements DataNodeMBean {
registerManager.register(DriverScheduler.getInstance());
registerUdfServices();
- initTriggerRelatedInstance();
logger.info(
"IoTDB DataNode is setting up, some storage groups may not be ready now, please wait several seconds...");
@@ -397,6 +418,107 @@ public class DataNode implements DataNodeMBean {
}
}
+ private void prepareTriggerResources() throws StartupException {
+ initTriggerRelatedInstance();
+ if (resourcesInformationHolder.getTriggerInformationList() == null
+ || resourcesInformationHolder.getTriggerInformationList().isEmpty()) {
+ return;
+ }
+
+ // get jars from config node
+ List<TriggerInformation> triggerNeedJarList = getJarListForTrigger();
+ int index = 0;
+ while (index < triggerNeedJarList.size()) {
+ List<TriggerInformation> curList = new ArrayList<>();
+ int offset = 0;
+ while (offset < ResourcesInformationHolder.getJarNumOfOneRpc()
+ && index + offset < triggerNeedJarList.size()) {
+ curList.add(triggerNeedJarList.get(index + offset));
+ offset++;
+ }
+ index += (offset + 1);
+ getJarOfTriggers(curList);
+ }
+
+ // create instances of triggers and do registration
+ try {
+ for (TriggerInformation triggerInformation :
+ resourcesInformationHolder.getTriggerInformationList()) {
+ TriggerManagementService.getInstance().doRegister(triggerInformation);
+ }
+ } catch (Exception e) {
+ throw new StartupException(e);
+ }
+ logger.debug("successfully registered all the triggers");
+ for (TriggerInformation triggerInformation :
+ TriggerManagementService.getInstance().getAllTriggerInformationInTriggerTable()) {
+ logger.debug("get trigger: {}", triggerInformation.getTriggerName());
+ }
+ for (TriggerExecutor triggerExecutor :
+ TriggerManagementService.getInstance().getAllTriggerExecutors()) {
+ logger.debug(
+ "get trigger executor: {}", triggerExecutor.getTriggerInformation().getTriggerName());
+ }
+ }
+
+ private void getJarOfTriggers(List<TriggerInformation> triggerInformationList)
+ throws StartupException {
+ try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+ List<String> jarNameList =
+ triggerInformationList.stream()
+ .map(TriggerInformation::getJarName)
+ .collect(Collectors.toList());
+ TGetTriggerJarResp resp = configNodeClient.getTriggerJar(new TGetTriggerJarReq(jarNameList));
+ if (resp.getStatus().getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
+ throw new StartupException("Failed to get trigger jar from config node.");
+ }
+ List<ByteBuffer> jarList = resp.getJarList();
+ for (int i = 0; i < triggerInformationList.size(); i++) {
+ TriggerExecutableManager.getInstance()
+ .writeToLibDir(jarList.get(i), triggerInformationList.get(i).getJarName());
+ }
+ } catch (IOException | TException e) {
+ throw new StartupException(e);
+ }
+ }
+
+ /** Generate a list for triggers that do not have jar on this node. */
+ private List<TriggerInformation> getJarListForTrigger() {
+ List<TriggerInformation> res = new ArrayList<>();
+ for (TriggerInformation triggerInformation :
+ resourcesInformationHolder.getTriggerInformationList()) {
+ if (triggerInformation.isStateful()) {
+ // jar of stateful trigger is not needed
+ continue;
+ }
+ // jar does not exist, add current triggerInformation to list
+ if (!TriggerExecutableManager.getInstance()
+ .hasFileUnderLibRoot(triggerInformation.getJarName())) {
+ res.add(triggerInformation);
+ } else {
+ try {
+ // local jar has conflicts with jar on config node, add current triggerInformation to list
+ if (!TriggerManagementService.getInstance().isLocalJarCorrect(triggerInformation)) {
+ res.add(triggerInformation);
+ }
+ } catch (TriggerManagementException e) {
+ res.add(triggerInformation);
+ }
+ }
+ }
+ return res;
+ }
+
+ private void getTriggerInformationList(List<ByteBuffer> allTriggerInformation) {
+ if (allTriggerInformation != null && !allTriggerInformation.isEmpty()) {
+ List<TriggerInformation> list = new ArrayList<>();
+ for (ByteBuffer triggerInformationByteBuffer : allTriggerInformation) {
+ list.add(TriggerInformation.deserialize(triggerInformationByteBuffer));
+ }
+ resourcesInformationHolder.setTriggerInformationList(list);
+ }
+ }
+
private void initSchemaEngine() {
long time = System.currentTimeMillis();
SchemaEngine.getInstance().init();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ResourcesInformationHolder.java b/server/src/main/java/org/apache/iotdb/db/service/ResourcesInformationHolder.java
new file mode 100644
index 0000000000..480b78e008
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/ResourcesInformationHolder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service;
+
+import org.apache.iotdb.commons.trigger.TriggerInformation;
+
+import java.util.List;
+
+public class ResourcesInformationHolder {
+ private static final int JAR_NUM_OF_ONE_RPC = 10;
+
+ /** store the list when registering in config node for preparing trigger related resources */
+ private List<TriggerInformation> triggerInformationList;
+
+ public static int getJarNumOfOneRpc() {
+ return JAR_NUM_OF_ONE_RPC;
+ }
+
+ public List<TriggerInformation> getTriggerInformationList() {
+ return triggerInformationList;
+ }
+
+ public void setTriggerInformationList(List<TriggerInformation> triggerInformationList) {
+ this.triggerInformationList = triggerInformationList;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerExecutor.java b/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerExecutor.java
index bd5d3dd3e4..f5797651db 100644
--- a/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerExecutor.java
@@ -62,4 +62,8 @@ public class TriggerExecutor {
methodName, System.lineSeparator())
+ e);
}
+
+ public TriggerInformation getTriggerInformation() {
+ return triggerInformation;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
index 4994ec1270..930e4fcc1d 100644
--- a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
+++ b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
@@ -19,13 +19,15 @@
package org.apache.iotdb.db.trigger.service;
+import org.apache.iotdb.commons.path.PatternTreeMap;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.commons.trigger.TriggerTable;
import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
-import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.path.PatternTreeMapFactory;
import org.apache.iotdb.db.trigger.executor.TriggerExecutor;
import org.apache.iotdb.trigger.api.Trigger;
@@ -38,6 +40,8 @@ import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
@@ -52,7 +56,11 @@ public class TriggerManagementService {
private final Map<String, TriggerExecutor> executorMap;
- private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+ /**
+ * Maintain a PatternTree: PathPattern -> List<String> triggerNames Return the triggerNames of
+ * triggers whose PathPatterns match the given one.
+ */
+ private final PatternTreeMap<String, PatternTreeMapFactory.StringSerializer> patternTreeMap;
private static final int DATA_NODE_ID = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
@@ -60,6 +68,7 @@ public class TriggerManagementService {
this.lock = new ReentrantLock();
this.triggerTable = new TriggerTable();
this.executorMap = new ConcurrentHashMap<>();
+ this.patternTreeMap = PatternTreeMapFactory.getTriggerPatternTreeMap();
}
public void acquireLock() {
@@ -106,10 +115,14 @@ public class TriggerManagementService {
if (executor != null) {
executor.onDrop();
}
- // todo: delete trigger in PatternTree when implementing trigger fire
+
+ if (triggerInformation == null) {
+ return;
+ }
+ patternTreeMap.delete(triggerInformation.getPathPattern(), triggerName);
// if it is needed to delete jar file of the trigger, delete both jar file and md5
- if (triggerInformation != null && needToDeleteJar) {
+ if (needToDeleteJar) {
TriggerExecutableManager.getInstance()
.removeFileUnderLibRoot(triggerInformation.getJarName());
TriggerExecutableManager.getInstance().removeFileUnderTemporaryRoot(triggerName + ".txt");
@@ -119,76 +132,89 @@ public class TriggerManagementService {
}
}
- private void checkIfRegistered(TriggerInformation triggerInformation) throws IOException {
+ private void checkIfRegistered(TriggerInformation triggerInformation)
+ throws TriggerManagementException {
+ String triggerName = triggerInformation.getTriggerName();
+ String jarName = triggerInformation.getJarName();
+ if (triggerTable.containsTrigger(triggerName)
+ && TriggerExecutableManager.getInstance().hasFileUnderLibRoot(jarName)) {
+ if (!isLocalJarCorrect(triggerInformation)) {
+ // same jar name with different md5
+ String errorMessage =
+ String.format(
+ "Failed to registered trigger %s, "
+ + "because existed md5 of jar file for trigger %s is different from the new jar file. ",
+ triggerName, triggerName);
+ LOGGER.warn(errorMessage);
+ throw new TriggerManagementException(errorMessage);
+ }
+ }
+ }
+
+ /** check whether local jar is correct according to md5 */
+ public boolean isLocalJarCorrect(TriggerInformation triggerInformation)
+ throws TriggerManagementException {
+ String jarName = triggerInformation.getJarName();
String triggerName = triggerInformation.getTriggerName();
- if (triggerTable.containsTrigger(triggerName)) {
- String jarName = triggerInformation.getJarName();
- if (TriggerExecutableManager.getInstance().hasFileUnderLibRoot(jarName)) {
- // A jar with the same name exists, we need to check md5
- String existedMd5 = "";
- String md5FilePath = triggerName + ".txt";
-
- // if meet error when reading md5 from txt, we need to compute it again
- boolean hasComputed = false;
- if (TriggerExecutableManager.getInstance().hasFileUnderTemporaryRoot(md5FilePath)) {
- try {
- existedMd5 =
- TriggerExecutableManager.getInstance()
- .readTextFromFileUnderTemporaryRoot(md5FilePath);
- hasComputed = true;
- } catch (IOException e) {
- LOGGER.warn("Error occurred when trying to read md5 of {}", md5FilePath);
- }
- }
- if (!hasComputed) {
- try {
- existedMd5 =
- DigestUtils.md5Hex(
- Files.newInputStream(
- Paths.get(
- TriggerExecutableManager.getInstance().getLibRoot()
- + File.separator
- + triggerInformation.getJarName())));
- // save the md5 in a txt under trigger temporary lib
- TriggerExecutableManager.getInstance()
- .saveTextAsFileUnderTemporaryRoot(existedMd5, md5FilePath);
- } catch (IOException e) {
- String errorMessage =
- String.format(
- "Failed to registered trigger %s, "
- + "because error occurred when trying to compute md5 of jar file for trigger %s ",
- triggerName, triggerName);
- LOGGER.warn(errorMessage, e);
- throw new TriggerManagementException(errorMessage);
- }
- }
-
- if (!existedMd5.equals(triggerInformation.getJarFileMD5())) {
- // same jar name with different md5
- String errorMessage =
- String.format(
- "Failed to registered trigger %s, "
- + "because existed md5 of jar file for trigger %s is different from the new jar file. ",
- triggerName, triggerName);
- LOGGER.warn(errorMessage);
- throw new TriggerManagementException(errorMessage);
- }
+ // A jar with the same name exists, we need to check md5
+ String existedMd5 = "";
+ String md5FilePath = triggerName + ".txt";
+
+ // if meet error when reading md5 from txt, we need to compute it again
+ boolean hasComputed = false;
+ if (TriggerExecutableManager.getInstance().hasFileUnderTemporaryRoot(md5FilePath)) {
+ try {
+ existedMd5 =
+ TriggerExecutableManager.getInstance().readTextFromFileUnderTemporaryRoot(md5FilePath);
+ hasComputed = true;
+ } catch (IOException e) {
+ LOGGER.warn("Error occurred when trying to read md5 of {}", md5FilePath);
}
}
+ if (!hasComputed) {
+ try {
+ existedMd5 =
+ DigestUtils.md5Hex(
+ Files.newInputStream(
+ Paths.get(
+ TriggerExecutableManager.getInstance().getLibRoot()
+ + File.separator
+ + triggerInformation.getJarName())));
+ // save the md5 in a txt under trigger temporary lib
+ TriggerExecutableManager.getInstance()
+ .saveTextAsFileUnderTemporaryRoot(existedMd5, md5FilePath);
+ } catch (IOException e) {
+ String errorMessage =
+ String.format(
+ "Failed to registered trigger %s, "
+ + "because error occurred when trying to compute md5 of jar file for trigger %s ",
+ triggerName, triggerName);
+ LOGGER.warn(errorMessage, e);
+ throw new TriggerManagementException(errorMessage);
+ }
+ }
+ return existedMd5.equals(triggerInformation.getJarFileMD5());
}
- private void doRegister(TriggerInformation triggerInformation) throws IOException {
+ /**
+ * Only call this method directly for registering new data node, otherwise you need to call
+ * register().
+ */
+ public void doRegister(TriggerInformation triggerInformation) throws IOException {
try (TriggerClassLoader currentActiveClassLoader =
TriggerClassLoaderManager.getInstance().updateAndGetActiveClassLoader()) {
String triggerName = triggerInformation.getTriggerName();
+ // register in trigger-table
triggerTable.addTriggerInformation(triggerName, triggerInformation);
- // if it is a stateful trigger, we only create its instance on specified DataNode
+ // update PatternTreeMap
+ patternTreeMap.append(triggerInformation.getPathPattern(), triggerName);
+ // if it is a stateful trigger, we only maintain its instance on specified DataNode
if (!triggerInformation.isStateful()
|| triggerInformation.getDataNodeLocation().getDataNodeId() == DATA_NODE_ID) {
// get trigger instance
Trigger trigger =
constructTriggerInstance(triggerInformation.getClassName(), currentActiveClassLoader);
- // construct and save TriggerExecutor
+ // construct and save TriggerExecutor after successfully creating trigger instance
TriggerExecutor triggerExecutor = new TriggerExecutor(triggerInformation, trigger);
executorMap.put(triggerName, triggerExecutor);
}
@@ -217,6 +243,21 @@ public class TriggerManagementService {
"Failed to reflect trigger instance with className(%s), because %s", className, e));
}
}
+
+ // region only for test
+
+ @TestOnly
+ public List<TriggerInformation> getAllTriggerInformationInTriggerTable() {
+ return triggerTable.getAllTriggerInformation();
+ }
+
+ @TestOnly
+ public List<TriggerExecutor> getAllTriggerExecutors() {
+ return new ArrayList<>(executorMap.values());
+ }
+
+ // end region
+
/////////////////////////////////////////////////////////////////////////////////////////////////
// singleton instance holder
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index c5ac8d708e..caedeef3cb 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -36,6 +36,7 @@ struct TDataNodeRegisterResp {
4: optional TGlobalConfig globalConfig
5: optional binary templateInfo
6: optional TRatisConfig ratisConfig
+ 7: optional list<binary> allTriggerInformation
}
struct TGlobalConfig {
@@ -337,6 +338,16 @@ struct TShowClusterResp {
4: required map<i32, string> nodeStatus
}
+// Get jars of the corresponding trigger
+struct TGetTriggerJarReq {
+ 1: required list<string> jarNameList
+}
+
+struct TGetTriggerJarResp {
+ 1: required common.TSStatus status
+ 2: required list<binary> jarList
+}
+
// Show datanodes
struct TDataNodeInfo {
1: required i32 dataNodeId
@@ -760,6 +771,11 @@ service IConfigNodeRPCService {
*/
TGetTriggerTableResp getTriggerTable()
+ /**
+ * Return the trigger jar list of the trigger name list
+ */
+ TGetTriggerJarResp getTriggerJar(TGetTriggerJarReq req)
+
// ======================================================
// Maintenance Tools
// ======================================================