You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/19 08:51:29 UTC
[iotdb] branch master updated: [IOTDB-4627]Trigger transfer (#7643)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 127263dd3b [IOTDB-4627]Trigger transfer (#7643)
127263dd3b is described below
commit 127263dd3b20d45bc6342e915cc6c15f918e6938
Author: Weihao Li <60...@users.noreply.github.com>
AuthorDate: Wed Oct 19 16:51:23 2022 +0800
[IOTDB-4627]Trigger transfer (#7643)
---
.../confignode/client/DataNodeRequestType.java | 1 +
.../client/async/AsyncDataNodeClientPool.java | 7 ++
.../client/async/handlers/AsyncClientHandler.java | 1 +
.../consensus/request/ConfigPhysicalPlan.java | 12 +++
.../consensus/request/ConfigPhysicalPlanType.java | 5 +-
...ePlan.java => GetTransferringTriggersPlan.java} | 8 +-
.../request/read/GetTriggerTablePlan.java | 22 ++++-
.../write/trigger/UpdateTriggerLocationPlan.java | 76 ++++++++++++++++++
.../trigger/UpdateTriggersOnTransferNodesPlan.java | 75 +++++++++++++++++
.../TransferringTriggersResp.java} | 29 +++----
.../iotdb/confignode/manager/ConfigManager.java | 38 ++++++++-
.../apache/iotdb/confignode/manager/IManager.java | 3 +
.../iotdb/confignode/manager/TriggerManager.java | 93 +++++++++++++++++++++-
.../iotdb/confignode/manager/node/NodeManager.java | 37 ++++++++-
.../iotdb/confignode/persistence/TriggerInfo.java | 50 ++++++++----
.../persistence/executor/ConfigPlanExecutor.java | 13 ++-
.../procedure/env/ConfigNodeProcedureEnv.java | 3 -
.../procedure/impl/CreateTriggerProcedure.java | 5 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 5 ++
.../request/ConfigPhysicalPlanSerDeTest.java | 72 +++++++++++++++--
.../apache/iotdb/commons/trigger/TriggerTable.java | 40 ++++++++++
.../apache/iotdb/db/client/ConfigNodeClient.java | 16 ++++
.../java/org/apache/iotdb/db/service/DataNode.java | 4 -
.../impl/DataNodeInternalRPCServiceImpl.java | 17 ++++
.../trigger/service/TriggerManagementService.java | 38 +++++++--
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../src/main/thrift/confignode.thrift | 9 ++-
thrift/src/main/thrift/datanode.thrift | 12 +++
28 files changed, 622 insertions(+), 70 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index e2e52f3613..5fa8b3b0fa 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -59,6 +59,7 @@ public enum DataNodeRequestType {
DROP_TRIGGER_INSTANCE,
ACTIVE_TRIGGER_INSTANCE,
INACTIVE_TRIGGER_INSTANCE,
+ UPDATE_TRIGGER_LOCATION,
/** Sync */
PRE_CREATE_PIPE,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index 24865fddbe..f2736fe121 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
+import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -170,6 +171,12 @@ public class AsyncDataNodeClientPool {
(AsyncTSStatusRPCHandler)
clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
break;
+ case UPDATE_TRIGGER_LOCATION:
+ client.updateTriggerLocation(
+ (TUpdateTriggerLocationReq) clientHandler.getRequest(requestId),
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
case MERGE:
case FULL_MERGE:
client.merge(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
index a10ef2306c..d23bf29633 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
@@ -179,6 +179,7 @@ public class AsyncClientHandler<Q, R> {
case DROP_TRIGGER_INSTANCE:
case ACTIVE_TRIGGER_INSTANCE:
case INACTIVE_TRIGGER_INSTANCE:
+ case UPDATE_TRIGGER_LOCATION:
case MERGE:
case FULL_MERGE:
case FLUSH:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index 2afc6be446..be83d6f453 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan
import org.apache.iotdb.confignode.consensus.request.read.GetSeriesSlotListPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetTimeSlotListPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetTransferringTriggersPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetTriggerJarPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.CheckTemplateSettablePlan;
@@ -72,7 +73,9 @@ import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchema
import org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
+import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerLocationPlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
+import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggersOnTransferNodesPlan;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -303,6 +306,15 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest {
case GetSeriesSlotList:
req = new GetSeriesSlotListPlan();
break;
+ case UpdateTriggersOnTransferNodes:
+ req = new UpdateTriggersOnTransferNodesPlan();
+ break;
+ case UpdateTriggerLocation:
+ req = new UpdateTriggerLocationPlan();
+ break;
+ case GetTransferringTriggers:
+ req = new GetTransferringTriggersPlan();
+ break;
default:
throw new IOException("unknown PhysicalPlan type: " + typeNum);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index c155a2bcd4..e81f4ddcc6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -92,5 +92,8 @@ public enum ConfigPhysicalPlanType {
GetTriggerJar,
GetRegionId,
GetSeriesSlotList,
- GetTimeSlotList
+ GetTimeSlotList,
+ UpdateTriggersOnTransferNodes,
+ UpdateTriggerLocation,
+ GetTransferringTriggers
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerTablePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTransferringTriggersPlan.java
similarity index 83%
copy from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerTablePlan.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTransferringTriggersPlan.java
index da1caa3ca7..694fe0438f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerTablePlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTransferringTriggersPlan.java
@@ -26,15 +26,15 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-public class GetTriggerTablePlan extends ConfigPhysicalPlan {
+public class GetTransferringTriggersPlan extends ConfigPhysicalPlan {
- public GetTriggerTablePlan() {
- super(ConfigPhysicalPlanType.GetTriggerTable);
+ public GetTransferringTriggersPlan() {
+ super(ConfigPhysicalPlanType.GetTransferringTriggers);
}
@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
- stream.writeInt(ConfigPhysicalPlanType.GetTriggerTable.ordinal());
+ stream.writeInt(ConfigPhysicalPlanType.GetTransferringTriggers.ordinal());
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerTablePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerTablePlan.java
index da1caa3ca7..35919cb901 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerTablePlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerTablePlan.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.consensus.request.read;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -28,15 +29,34 @@ import java.nio.ByteBuffer;
public class GetTriggerTablePlan extends ConfigPhysicalPlan {
+ boolean onlyStateful;
+
public GetTriggerTablePlan() {
super(ConfigPhysicalPlanType.GetTriggerTable);
}
+ public GetTriggerTablePlan(boolean onlyStateful) {
+ this();
+ this.onlyStateful = onlyStateful;
+ }
+
+ public boolean isOnlyStateful() {
+ return onlyStateful;
+ }
+
+ public void setOnlyStateful(boolean onlyStateful) {
+ this.onlyStateful = onlyStateful;
+ }
+
@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
stream.writeInt(ConfigPhysicalPlanType.GetTriggerTable.ordinal());
+
+ ReadWriteIOUtils.write(onlyStateful, stream);
}
@Override
- protected void deserializeImpl(ByteBuffer buffer) throws IOException {}
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ this.onlyStateful = ReadWriteIOUtils.readBool(buffer);
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/UpdateTriggerLocationPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/UpdateTriggerLocationPlan.java
new file mode 100644
index 0000000000..da5d302c39
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/UpdateTriggerLocationPlan.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write.trigger;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class UpdateTriggerLocationPlan extends ConfigPhysicalPlan {
+
+ private String triggerName;
+ private TDataNodeLocation dataNodeLocation;
+
+ public UpdateTriggerLocationPlan() {
+ super(ConfigPhysicalPlanType.UpdateTriggerLocation);
+ }
+
+ public UpdateTriggerLocationPlan(String triggerName, TDataNodeLocation dataNodeLocation) {
+ super(ConfigPhysicalPlanType.UpdateTriggerLocation);
+ this.triggerName = triggerName;
+ this.dataNodeLocation = dataNodeLocation;
+ }
+
+ public String getTriggerName() {
+ return triggerName;
+ }
+
+ public void setTriggerName(String triggerName) {
+ this.triggerName = triggerName;
+ }
+
+ public TDataNodeLocation getDataNodeLocation() {
+ return dataNodeLocation;
+ }
+
+ public void setDataNodeLocation(TDataNodeLocation dataNodeLocation) {
+ this.dataNodeLocation = dataNodeLocation;
+ }
+
+ @Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+ stream.writeInt(getType().ordinal());
+
+ ReadWriteIOUtils.write(triggerName, stream);
+ ThriftCommonsSerDeUtils.serializeTDataNodeLocation(dataNodeLocation, stream);
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ this.triggerName = ReadWriteIOUtils.readString(buffer);
+ this.dataNodeLocation = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer);
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/UpdateTriggersOnTransferNodesPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/UpdateTriggersOnTransferNodesPlan.java
new file mode 100644
index 0000000000..aa8ad19f18
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/UpdateTriggersOnTransferNodesPlan.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write.trigger;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class UpdateTriggersOnTransferNodesPlan extends ConfigPhysicalPlan {
+
+ private List<TDataNodeLocation> dataNodeLocations;
+
+ public UpdateTriggersOnTransferNodesPlan() {
+ super(ConfigPhysicalPlanType.UpdateTriggersOnTransferNodes);
+ }
+
+ public UpdateTriggersOnTransferNodesPlan(List<TDataNodeLocation> dataNodeLocations) {
+ super(ConfigPhysicalPlanType.UpdateTriggersOnTransferNodes);
+ this.dataNodeLocations = dataNodeLocations;
+ }
+
+ public List<TDataNodeLocation> getDataNodeLocations() {
+ return dataNodeLocations;
+ }
+
+ public void setDataNodeLocations(List<TDataNodeLocation> dataNodeLocations) {
+ this.dataNodeLocations = dataNodeLocations;
+ }
+
+ @Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+ stream.writeInt(getType().ordinal());
+
+ ReadWriteIOUtils.write(dataNodeLocations.size(), stream);
+ for (TDataNodeLocation dataNodeLocation : dataNodeLocations) {
+ ThriftCommonsSerDeUtils.serializeTDataNodeLocation(dataNodeLocation, stream);
+ }
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ int size = ReadWriteIOUtils.readInt(buffer);
+ List<TDataNodeLocation> dataNodeLocations = new ArrayList<>(size);
+ while (size > 0) {
+ dataNodeLocations.add(ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer));
+ size--;
+ }
+ this.dataNodeLocations = dataNodeLocations;
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerTablePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/TransferringTriggersResp.java
similarity index 54%
copy from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerTablePlan.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/TransferringTriggersResp.java
index da1caa3ca7..11d6671e44 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/GetTriggerTablePlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/TransferringTriggersResp.java
@@ -17,26 +17,27 @@
* under the License.
*/
-package org.apache.iotdb.confignode.consensus.request.read;
+package org.apache.iotdb.confignode.consensus.response;
-import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
-import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.apache.iotdb.consensus.common.DataSet;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.util.List;
-public class GetTriggerTablePlan extends ConfigPhysicalPlan {
+public class TransferringTriggersResp implements DataSet {
- public GetTriggerTablePlan() {
- super(ConfigPhysicalPlanType.GetTriggerTable);
+ private List<String> transferringTriggers;
+
+ public TransferringTriggersResp() {}
+
+ public TransferringTriggersResp(List<String> transferringTriggers) {
+ this.transferringTriggers = transferringTriggers;
}
- @Override
- protected void serializeImpl(DataOutputStream stream) throws IOException {
- stream.writeInt(ConfigPhysicalPlanType.GetTriggerTable.ordinal());
+ public List<String> getTransferringTriggers() {
+ return transferringTriggers;
}
- @Override
- protected void deserializeImpl(ByteBuffer buffer) throws IOException {}
+ public void setTransferringTriggers(List<String> transferringTriggers) {
+ this.transferringTriggers = transferringTriggers;
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 6718f618b7..1165a52c83 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
@@ -229,7 +230,8 @@ public class ConfigManager implements IManager {
try {
dataSet = (DataNodeRegisterResp) nodeManager.registerDataNode(registerDataNodePlan);
dataSet.setTemplateInfo(clusterSchemaManager.getAllTemplateSetInfo());
- dataSet.setTriggerInformation(triggerManager.getTriggerTable().getAllTriggerInformation());
+ dataSet.setTriggerInformation(
+ triggerManager.getTriggerTable(false).getAllTriggerInformation());
} finally {
triggerManager.getTriggerInfo().releaseTriggerTableLock();
}
@@ -801,7 +803,15 @@ public class ConfigManager implements IManager {
public TGetTriggerTableResp getTriggerTable() {
TSStatus status = confirmLeader();
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
- ? triggerManager.getTriggerTable()
+ ? triggerManager.getTriggerTable(false)
+ : new TGetTriggerTableResp(status, Collections.emptyList());
+ }
+
+ @Override
+ public TGetTriggerTableResp getStatefulTriggerTable() {
+ TSStatus status = confirmLeader();
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ? triggerManager.getTriggerTable(true)
: new TGetTriggerTableResp(status, Collections.emptyList());
}
@@ -1190,8 +1200,30 @@ public class ConfigManager implements IManager {
}
public TSStatus transfer(List<TDataNodeLocation> newUnknownDataList) {
+ Map<Integer, TDataNodeLocation> runningDataNodeLocationMap = new HashMap<>();
+ nodeManager
+ .filterDataNodeThroughStatus(NodeStatus.Running)
+ .forEach(
+ dataNodeConfiguration ->
+ runningDataNodeLocationMap.put(
+ dataNodeConfiguration.getLocation().getDataNodeId(),
+ dataNodeConfiguration.getLocation()));
+ if (runningDataNodeLocationMap.isEmpty()) {
+ // no running DataNode, will not transfer and print log
+ return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+ }
+
+ newUnknownDataList.forEach(
+ dataNodeLocation -> runningDataNodeLocationMap.remove(dataNodeLocation.getDataNodeId()));
+
LOGGER.info("start Transfer of {}", newUnknownDataList);
// transfer trigger
- return triggerManager.transferTrigger(newUnknownDataList);
+ TSStatus transferResult =
+ triggerManager.transferTrigger(newUnknownDataList, runningDataNodeLocationMap);
+ if (transferResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn("Fail to transfer because {}, will retry", transferResult.getMessage());
+ }
+
+ return transferResult;
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index e8b076ccf9..b45989915a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -310,6 +310,9 @@ public interface IManager {
/** Show trigger & DataNode start */
TGetTriggerTableResp getTriggerTable();
+ /** DataNode refresh stateful trigger cache */
+ TGetTriggerTableResp getStatefulTriggerTable();
+
/** Get Trigger jar */
TGetTriggerJarResp getTriggerJar(TGetTriggerJarReq req);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
index acce3d6fc8..7080a6d56a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
@@ -24,10 +24,18 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathDeserializeUtil;
import org.apache.iotdb.commons.trigger.TriggerInformation;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
+import org.apache.iotdb.confignode.consensus.request.read.GetTransferringTriggersPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetTriggerJarPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
+import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerLocationPlan;
+import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggersOnTransferNodesPlan;
+import org.apache.iotdb.confignode.consensus.response.TransferringTriggersResp;
import org.apache.iotdb.confignode.consensus.response.TriggerJarResp;
import org.apache.iotdb.confignode.consensus.response.TriggerTableResp;
+import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.persistence.TriggerInfo;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
@@ -35,6 +43,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerJarResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
+import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
+import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.trigger.api.enums.TriggerEvent;
import org.apache.iotdb.trigger.api.enums.TriggerType;
@@ -46,6 +56,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
public class TriggerManager {
private static final Logger LOGGER = LoggerFactory.getLogger(TriggerManager.class);
@@ -101,10 +112,13 @@ public class TriggerManager {
return configManager.getProcedureManager().dropTrigger(req.getTriggerName());
}
- public TGetTriggerTableResp getTriggerTable() {
+ public TGetTriggerTableResp getTriggerTable(boolean onlyStateful) {
try {
return ((TriggerTableResp)
- configManager.getConsensusManager().read(new GetTriggerTablePlan()).getDataset())
+ configManager
+ .getConsensusManager()
+ .read(new GetTriggerTablePlan(onlyStateful))
+ .getDataset())
.convertToThriftResponse();
} catch (IOException e) {
LOGGER.error("Fail to get TriggerTable", e);
@@ -132,8 +146,79 @@ public class TriggerManager {
}
}
- public TSStatus transferTrigger(List<TDataNodeLocation> newUnknownDataList) {
- // TODO implement
+ /**
+ * Step1: Mark Stateful Triggers on UnknownDataNodes as {@link TTriggerState#TRANSFERRING}.
+ *
+ * <p>Step2: Get all Transferring Triggers marked in Step1.
+ *
+ * <p>Step3: For each trigger get in Step2, find the DataNode with the lowest load, then transfer
+ * the Stateful Trigger to it and update this information on all DataNodes.
+ *
+ * <p>Step4: Update the newest location on ConfigNodes.
+ *
+ * @param dataNodeLocationMap The DataNodes with {@link
+ * org.apache.iotdb.commons.cluster.NodeStatus#Running} State
+ * @return result of transferTrigger
+ */
+ public TSStatus transferTrigger(
+ List<TDataNodeLocation> newUnknownDataList,
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
+ TSStatus transferResult;
+ triggerInfo.acquireTriggerTableLock();
+ try {
+ ConsensusManager consensusManager = configManager.getConsensusManager();
+ NodeManager nodeManager = configManager.getNodeManager();
+
+ transferResult =
+ consensusManager
+ .write(new UpdateTriggersOnTransferNodesPlan(newUnknownDataList))
+ .getStatus();
+ if (transferResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return transferResult;
+ }
+
+ List<String> transferringTriggers =
+ ((TransferringTriggersResp)
+ consensusManager.read(new GetTransferringTriggersPlan()).getDataset())
+ .getTransferringTriggers();
+
+ for (String trigger : transferringTriggers) {
+ TDataNodeLocation newDataNodeLocation =
+ nodeManager.getLowestLoadDataNode(dataNodeLocationMap.keySet());
+
+ transferResult =
+ RpcUtils.squashResponseStatusList(
+ updateTriggerLocation(trigger, newDataNodeLocation, dataNodeLocationMap));
+ if (transferResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return transferResult;
+ }
+
+ transferResult =
+ consensusManager
+ .write(new UpdateTriggerLocationPlan(trigger, newDataNodeLocation))
+ .getStatus();
+ if (transferResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return transferResult;
+ }
+ }
+ } finally {
+ triggerInfo.releaseTriggerTableLock();
+ }
+
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
+
+ public List<TSStatus> updateTriggerLocation(
+ String triggerName,
+ TDataNodeLocation dataNodeLocation,
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
+ final TUpdateTriggerLocationReq request =
+ new TUpdateTriggerLocationReq(triggerName, dataNodeLocation);
+
+ AsyncClientHandler<TUpdateTriggerLocationReq, TSStatus> clientHandler =
+ new AsyncClientHandler<>(
+ DataNodeRequestType.UPDATE_TRIGGER_LOCATION, request, dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ return clientHandler.getResponseList();
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index b3fcb65ef7..6b36fa8035 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -255,8 +255,17 @@ public class NodeManager {
preCheckStatus.getStatus());
return preCheckStatus;
}
- // if add request to queue, then return to client
+
DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
+ // do transfer of the DataNodes before remove
+ if (configManager.transfer(removeDataNodePlan.getDataNodeLocations()).getCode()
+ != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ dataSet.setStatus(
+ new TSStatus(TSStatusCode.NODE_DELETE_FAILED_ERROR.getStatusCode())
+ .setMessage("Fail to do transfer of the DataNodes"));
+ return dataSet;
+ }
+ // if add request to queue, then return to client
boolean registerSucceed =
configManager.getProcedureManager().removeDataNode(removeDataNodePlan);
TSStatus status;
@@ -716,8 +725,6 @@ public class NodeManager {
TSStatus transferResult = configManager.transfer(newUnknownNodes);
if (transferResult.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
oldUnknownNodes.addAll(newUnknownNodes);
- } else {
- LOGGER.warn("Fail to transfer because {}, will retry", transferResult.getMessage());
}
}
}
@@ -823,6 +830,30 @@ public class NodeManager {
return configManager.getNodeManager().getRegisteredDataNodeLocations().get(result.get());
}
+ /**
+ * Get the DataNodeLocation of the lowest load DataNode in input
+ *
+ * @return TDataNodeLocation
+ */
+ public TDataNodeLocation getLowestLoadDataNode(Set<Integer> nodes) {
+ AtomicInteger result = new AtomicInteger();
+ AtomicLong lowestLoadScore = new AtomicLong(Long.MAX_VALUE);
+
+ nodes.forEach(
+ nodeID -> {
+ BaseNodeCache cache = nodeCacheMap.get(nodeID);
+ long score = (cache == null) ? Long.MAX_VALUE : cache.getLoadScore();
+ if (score < lowestLoadScore.get()) {
+ result.set(nodeID);
+ lowestLoadScore.set(score);
+ }
+ });
+
+ LOGGER.info(
+ "get the lowest load DataNode, NodeID: [{}], LoadScore: [{}]", result, lowestLoadScore);
+ return configManager.getNodeManager().getRegisteredDataNodeLocations().get(result.get());
+ }
+
public boolean isNodeRemoving(int dataNodeId) {
DataNodeHeartbeatCache cache =
(DataNodeHeartbeatCache) configManager.getNodeManager().getNodeCacheMap().get(dataNodeId);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
index bcbdee832a..341129110f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TriggerInfo.java
@@ -28,12 +28,18 @@ import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.consensus.request.read.GetTransferringTriggersPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetTriggerJarPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
+import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerLocationPlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
+import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggersOnTransferNodesPlan;
+import org.apache.iotdb.confignode.consensus.response.TransferringTriggersResp;
import org.apache.iotdb.confignode.consensus.response.TriggerJarResp;
import org.apache.iotdb.confignode.consensus.response.TriggerTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -91,13 +97,7 @@ public class TriggerInfo implements SnapshotProcessor {
triggerTableLock.unlock();
}
- /**
- * Validate whether the trigger can be created
- *
- * @param triggerName
- * @param jarName
- * @param jarMD5
- */
+ /** Validate whether the trigger can be created */
public void validate(String triggerName, String jarName, String jarMD5) {
if (triggerTable.containsTrigger(triggerName)) {
throw new TriggerManagementException(
@@ -114,11 +114,7 @@ public class TriggerInfo implements SnapshotProcessor {
}
}
- /**
- * Validate whether the trigger can be dropped
- *
- * @param triggerName
- */
+ /** Validate whether the trigger can be dropped */
public void validate(String triggerName) {
if (triggerTable.containsTrigger(triggerName)) {
return;
@@ -168,10 +164,16 @@ public class TriggerInfo implements SnapshotProcessor {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
- public TriggerTableResp getTriggerTable() {
- return new TriggerTableResp(
- new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
- triggerTable.getAllTriggerInformation());
+ public TriggerTableResp getTriggerTable(GetTriggerTablePlan req) {
+ if (req.isOnlyStateful()) {
+ return new TriggerTableResp(
+ new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
+ triggerTable.getAllStatefulTriggerInformation());
+ } else {
+ return new TriggerTableResp(
+ new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
+ triggerTable.getAllTriggerInformation());
+ }
}
public TriggerJarResp getTriggerJar(GetTriggerJarPlan physicalPlan) {
@@ -192,6 +194,22 @@ public class TriggerInfo implements SnapshotProcessor {
return new TriggerJarResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), jarList);
}
+ public TransferringTriggersResp getTransferringTriggers(GetTransferringTriggersPlan req) {
+ return new TransferringTriggersResp(triggerTable.getTransferringTriggers());
+ }
+
+ public TSStatus updateTriggersOnTransferNodes(UpdateTriggersOnTransferNodesPlan physicalPlan) {
+ triggerTable.updateTriggersOnTransferNodes(physicalPlan.getDataNodeLocations());
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
+ public TSStatus updateTriggerLocation(UpdateTriggerLocationPlan physicalPlan) {
+ triggerTable.updateTriggerLocation(
+ physicalPlan.getTriggerName(), physicalPlan.getDataNodeLocation());
+ triggerTable.setTriggerState(physicalPlan.getTriggerName(), TTriggerState.ACTIVE);
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
/** only used in Test */
public Map<String, TriggerInformation> getRawTriggerTable() {
return triggerTable.getTable();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 4b5f4e49ef..ee2a4849e1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -35,7 +35,9 @@ import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan
import org.apache.iotdb.confignode.consensus.request.read.GetSeriesSlotListPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetTimeSlotListPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetTransferringTriggersPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetTriggerJarPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.CheckTemplateSettablePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetPathsSetTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetSchemaTemplatePlan;
@@ -71,7 +73,9 @@ import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchema
import org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
+import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerLocationPlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
+import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggersOnTransferNodesPlan;
import org.apache.iotdb.confignode.consensus.response.SchemaNodeManagementResp;
import org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeException;
import org.apache.iotdb.confignode.persistence.AuthorInfo;
@@ -181,9 +185,11 @@ public class ConfigPlanExecutor {
case ShowPipe:
return syncInfo.showPipe((ShowPipePlan) req);
case GetTriggerTable:
- return triggerInfo.getTriggerTable();
+ return triggerInfo.getTriggerTable((GetTriggerTablePlan) req);
case GetTriggerJar:
return triggerInfo.getTriggerJar((GetTriggerJarPlan) req);
+ case GetTransferringTriggers:
+ return triggerInfo.getTransferringTriggers((GetTransferringTriggersPlan) req);
case GetRegionId:
return partitionInfo.getRegionId((GetRegionIdPlan) req);
case GetTimeSlotList:
@@ -267,6 +273,11 @@ public class ConfigPlanExecutor {
return triggerInfo.deleteTriggerInTable((DeleteTriggerInTablePlan) physicalPlan);
case UpdateTriggerStateInTable:
return triggerInfo.updateTriggerStateInTable((UpdateTriggerStateInTablePlan) physicalPlan);
+ case UpdateTriggersOnTransferNodes:
+ return triggerInfo.updateTriggersOnTransferNodes(
+ (UpdateTriggersOnTransferNodesPlan) physicalPlan);
+ case UpdateTriggerLocation:
+ return triggerInfo.updateTriggerLocation((UpdateTriggerLocationPlan) physicalPlan);
case CreateSchemaTemplate:
return clusterSchemaInfo.createSchemaTemplate((CreateSchemaTemplatePlan) physicalPlan);
case UpdateRegionLocation:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index b1f73ff5de..5bd99708ec 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -487,9 +487,6 @@ public class ConfigNodeProcedureEnv {
AsyncClientHandler<TCreateTriggerInstanceReq, TSStatus> clientHandler =
new AsyncClientHandler<>(
DataNodeRequestType.CREATE_TRIGGER_INSTANCE, request, dataNodeLocationMap);
- // TODO: The request sent to DataNodes which stateful triggerInstance needn't to be created
- // don't set
- // JarFile
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
return clientHandler.getResponseList();
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
index a9fb7bf404..e8c8ac4813 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateTriggerProcedure.java
@@ -206,11 +206,10 @@ public class CreateTriggerProcedure extends AbstractNodeProcedure<CreateTriggerS
if (RpcUtils.squashResponseStatusList(
env.dropTriggerOnDataNodes(triggerInformation.getTriggerName(), false))
.getCode()
- == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- } else {
+ != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new TriggerManagementException(
String.format(
- "Fail to [CONFIG_NODE_INACTIVE] rollback of trigger [{}]",
+ "Fail to [CONFIG_NODE_INACTIVE] rollback of trigger [%s]",
triggerInformation.getTriggerName()));
}
break;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index a777f98eac..8603f18648 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -509,6 +509,11 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
return configManager.getTriggerTable();
}
+ @Override
+ public TGetTriggerTableResp getStatefulTriggerTable() {
+ return configManager.getStatefulTriggerTable();
+ }
+
@Override
public TGetLocationForTriggerResp getLocationOfStatefulTrigger(String triggerName) {
// todo: implementation
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 8e5aaff06d..4858362c81 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -52,6 +52,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan
import org.apache.iotdb.confignode.consensus.request.read.GetSeriesSlotListPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetTimeSlotListPlan;
+import org.apache.iotdb.confignode.consensus.request.read.GetTransferringTriggersPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetTriggerJarPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetTriggerTablePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetAllSchemaTemplatePlan;
@@ -85,7 +86,9 @@ import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchema
import org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan;
+import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerLocationPlan;
import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan;
+import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggersOnTransferNodesPlan;
import org.apache.iotdb.confignode.persistence.partition.RegionCreateTask;
import org.apache.iotdb.confignode.persistence.partition.RegionDeleteTask;
import org.apache.iotdb.confignode.procedure.Procedure;
@@ -943,11 +946,13 @@ public class ConfigPhysicalPlanSerDeTest {
}
@Test
- public void GetTriggerTablePlan() throws IOException {
- GetTriggerTablePlan getTriggerTablePlan0 = new GetTriggerTablePlan();
- Assert.assertTrue(
- ConfigPhysicalPlan.Factory.create(getTriggerTablePlan0.serializeToByteBuffer())
- instanceof GetTriggerTablePlan);
+ public void GetTriggerTablePlanTest() throws IOException {
+ GetTriggerTablePlan getTriggerTablePlan0 = new GetTriggerTablePlan(true);
+ GetTriggerTablePlan getTriggerTablePlan1 =
+ (GetTriggerTablePlan)
+ ConfigPhysicalPlan.Factory.create(getTriggerTablePlan0.serializeToByteBuffer());
+ Assert.assertEquals(
+ getTriggerTablePlan0.isOnlyStateful(), getTriggerTablePlan1.isOnlyStateful());
}
@Test
@@ -1044,4 +1049,61 @@ public class ConfigPhysicalPlanSerDeTest {
ConfigPhysicalPlan.Factory.create(getSeriesSlotListPlan0.serializeToByteBuffer());
Assert.assertEquals(getSeriesSlotListPlan0, getSeriesSlotListPlan1);
}
+
+ @Test
+ public void UpdateTriggersOnTransferNodesPlanTest() throws IOException {
+ List<TDataNodeLocation> dataNodeLocations = new ArrayList<>(2);
+ dataNodeLocations.add(
+ new TDataNodeLocation(
+ 10000,
+ new TEndPoint("127.0.0.1", 6600),
+ new TEndPoint("127.0.0.1", 7700),
+ new TEndPoint("127.0.0.1", 8800),
+ new TEndPoint("127.0.0.1", 9900),
+ new TEndPoint("127.0.0.1", 11000)));
+ dataNodeLocations.add(
+ new TDataNodeLocation(
+ 20000,
+ new TEndPoint("127.0.0.1", 6600),
+ new TEndPoint("127.0.0.1", 7700),
+ new TEndPoint("127.0.0.1", 8800),
+ new TEndPoint("127.0.0.1", 9900),
+ new TEndPoint("127.0.0.1", 11000)));
+
+ UpdateTriggersOnTransferNodesPlan plan0 =
+ new UpdateTriggersOnTransferNodesPlan(dataNodeLocations);
+ UpdateTriggersOnTransferNodesPlan plan1 =
+ (UpdateTriggersOnTransferNodesPlan)
+ ConfigPhysicalPlan.Factory.create(plan0.serializeToByteBuffer());
+
+ Assert.assertEquals(plan0.getDataNodeLocations(), plan1.getDataNodeLocations());
+ }
+
+ @Test
+ public void UpdateTriggerLocationPlanTest() throws IOException {
+ UpdateTriggerLocationPlan plan0 =
+ new UpdateTriggerLocationPlan(
+ "test",
+ new TDataNodeLocation(
+ 10000,
+ new TEndPoint("127.0.0.1", 6600),
+ new TEndPoint("127.0.0.1", 7700),
+ new TEndPoint("127.0.0.1", 8800),
+ new TEndPoint("127.0.0.1", 9900),
+ new TEndPoint("127.0.0.1", 11000)));
+ UpdateTriggerLocationPlan plan1 =
+ (UpdateTriggerLocationPlan)
+ ConfigPhysicalPlan.Factory.create(plan0.serializeToByteBuffer());
+
+ Assert.assertEquals(plan0.getTriggerName(), plan1.getTriggerName());
+ Assert.assertEquals(plan0.getDataNodeLocation(), plan1.getDataNodeLocation());
+ }
+
+ @Test
+ public void GetTransferringTriggersPlanTest() throws IOException {
+ GetTransferringTriggersPlan getTransferringTriggerPlan0 = new GetTransferringTriggersPlan();
+ Assert.assertTrue(
+ ConfigPhysicalPlan.Factory.create(getTransferringTriggerPlan0.serializeToByteBuffer())
+ instanceof GetTransferringTriggersPlan);
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
index eed43d15fd..fbc09275ae 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/trigger/TriggerTable.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.commons.trigger;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -27,9 +28,12 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
/** This Class used to save the information of Triggers and implements methods of manipulate it. */
@NotThreadSafe
@@ -78,6 +82,42 @@ public class TriggerTable {
return new ArrayList<>(triggerTable.values());
}
+ public List<TriggerInformation> getAllStatefulTriggerInformation() {
+ return triggerTable.values().stream()
+ .filter(TriggerInformation::isStateful)
+ .collect(Collectors.toList());
+ }
+
+ public List<String> getTransferringTriggers() {
+ return triggerTable.values().stream()
+ .filter(
+ triggerInformation ->
+ triggerInformation.getTriggerState() == TTriggerState.TRANSFERRING)
+ .map(TriggerInformation::getTriggerName)
+ .collect(Collectors.toList());
+ }
+
+ // update stateful trigger to TRANSFERRING which dataNodeLocation is in transferNodes
+ public void updateTriggersOnTransferNodes(List<TDataNodeLocation> transferNodes) {
+ Set<TDataNodeLocation> dataNodeLocationSet = new HashSet<>(transferNodes);
+ triggerTable
+ .values()
+ .forEach(
+ triggerInformation -> {
+ if (triggerInformation.isStateful()
+ && dataNodeLocationSet.contains(triggerInformation.getDataNodeLocation())) {
+ triggerInformation.setTriggerState(TTriggerState.TRANSFERRING);
+ }
+ });
+ }
+
+ public void updateTriggerLocation(String triggerName, TDataNodeLocation dataNodeLocation) {
+ TriggerInformation triggerInformation = triggerTable.get(triggerName);
+ // triggerInformation will not be null here
+ triggerInformation.setDataNodeLocation(dataNodeLocation);
+ triggerTable.put(triggerName, triggerInformation);
+ }
+
public boolean isEmpty() {
return triggerTable.isEmpty();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 9adc73dbd8..a3d4f38722 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -953,6 +953,22 @@ public class ConfigNodeClient
throw new TException(MSG_RECONNECTION_FAIL);
}
+ @Override
+ public TGetTriggerTableResp getStatefulTriggerTable() throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TGetTriggerTableResp resp = client.getTriggerTable();
+ if (!updateConfigNodeLeader(resp.getStatus())) {
+ return resp;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
@Override
public TGetTriggerJarResp getTriggerJar(TGetTriggerJarReq req) throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 4b0e82f43b..75fd26fa74 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -487,10 +487,6 @@ public class DataNode implements DataNodeMBean {
List<TriggerInformation> res = new ArrayList<>();
for (TriggerInformation triggerInformation :
resourcesInformationHolder.getTriggerInformationList()) {
- if (triggerInformation.isStateful()) {
- // jar of stateful trigger is not needed
- continue;
- }
// jar does not exist, add current triggerInformation to list
if (!TriggerExecutableManager.getInstance()
.hasFileUnderLibRoot(triggerInformation.getJarName())) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 2341017cd3..30a2eeb644 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -140,6 +140,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
+import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.trigger.api.enums.FailureStrategy;
@@ -1032,6 +1033,22 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
+ @Override
+ public TSStatus updateTriggerLocation(TUpdateTriggerLocationReq req) throws TException {
+ try {
+ TriggerManagementService.getInstance()
+ .updateLocationOfStatefulTrigger(req.triggerName, req.newLocation);
+ } catch (Exception e) {
+ LOGGER.error(
+ "Error occurred during update Location for trigger: {}. The cause is {}.",
+ req.triggerName,
+ e);
+ return new TSStatus(TSStatusCode.UPDATE_TRIGGER_LOCATION_ERROR.getStatusCode())
+ .setMessage(e.getMessage());
+ }
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
@Override
public TFireTriggerResp fireTrigger(TFireTriggerReq req) {
String triggerName = req.getTriggerName();
diff --git a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
index fed28359cc..62adc03055 100644
--- a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
+++ b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
@@ -135,15 +135,40 @@ public class TriggerManagementService {
}
}
- public void updateLocationOfStatefulTrigger(
- String triggerName, TDataNodeLocation tDataNodeLocation) {
+ public void updateLocationOfStatefulTrigger(String triggerName, TDataNodeLocation newLocation)
+ throws IOException {
try {
acquireLock();
TriggerInformation triggerInformation = triggerTable.getTriggerInformation(triggerName);
- triggerInformation.setDataNodeLocation(tDataNodeLocation);
- triggerTable.setTriggerInformation(triggerName, triggerInformation);
- } catch (Exception e) {
- LOGGER.warn("Failed to update location of trigger({}), the cause is: {}", triggerName, e);
+ if (triggerInformation == null || !triggerInformation.isStateful()) {
+ return;
+ }
+ triggerInformation.setDataNodeLocation(newLocation);
+ triggerTable.addTriggerInformation(triggerName, triggerInformation);
+ if (newLocation.getDataNodeId() != DATA_NODE_ID) {
+ // The instance of stateful trigger is created on another DataNode. We need to drop the
+ // instance if it exists on this DataNode
+ TriggerExecutor triggerExecutor = executorMap.remove(triggerName);
+ if (triggerExecutor != null) {
+ triggerExecutor.onDrop();
+ }
+ } else {
+ TriggerExecutor triggerExecutor = executorMap.get(triggerName);
+ if (triggerExecutor != null) {
+ return;
+ }
+ // newLocation of stateful trigger is this DataNode, we need to create its instance if it
+ // does not exist.
+ try (TriggerClassLoader currentActiveClassLoader =
+ TriggerClassLoaderManager.getInstance().updateAndGetActiveClassLoader()) {
+ TriggerExecutor newExecutor =
+ new TriggerExecutor(
+ triggerInformation,
+ constructTriggerInstance(
+ triggerInformation.getClassName(), currentActiveClassLoader));
+ executorMap.put(triggerName, newExecutor);
+ }
+ }
} finally {
releaseLock();
}
@@ -182,7 +207,6 @@ public class TriggerManagementService {
private void checkIfRegistered(TriggerInformation triggerInformation)
throws TriggerManagementException {
-
String triggerName = triggerInformation.getTriggerName();
String jarName = triggerInformation.getJarName();
if (triggerTable.containsTrigger(triggerName)
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 898d6183cf..c5d56e571a 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -81,6 +81,7 @@ public enum TSStatusCode {
CREATE_TRIGGER_INSTANCE_ERROR(362),
ACTIVE_TRIGGER_INSTANCE_ERROR(363),
DROP_TRIGGER_INSTANCE_ERROR(364),
+ UPDATE_TRIGGER_LOCATION_ERROR(365),
EXECUTE_STATEMENT_ERROR(400),
SQL_PARSE_ERROR(401),
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index fb629fb094..6c87d06b5f 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -314,6 +314,8 @@ enum TTriggerState {
ACTIVE
// The intermediate state of Drop trigger, the cluster is in the process of removing the trigger.
DROPPING
+ // The intermediate state of Transfer trigger, the cluster is in the process of transferring the trigger.
+ TRANSFERRING
}
struct TCreateTriggerReq {
@@ -785,10 +787,15 @@ service IConfigNodeRPCService {
TGetLocationForTriggerResp getLocationOfStatefulTrigger(string triggerName)
/**
- * Return the trigger table of config leader
+ * Return the trigger table
*/
TGetTriggerTableResp getTriggerTable()
+ /**
+ * Return the Stateful trigger table
+ */
+ TGetTriggerTableResp getStatefulTriggerTable()
+
/**
* Return the trigger jar list of the trigger name list
*/
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index d75aaa8be8..aa43b7a300 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -187,6 +187,11 @@ struct TDropTriggerInstanceReq {
2: required bool needToDeleteJarFile
}
+struct TUpdateTriggerLocationReq {
+ 1: required string triggerName
+ 2: required common.TDataNodeLocation newLocation
+}
+
struct TFireTriggerReq {
1: required string triggerName
2: required binary tablet
@@ -471,6 +476,13 @@ service IDataNodeRPCService {
**/
common.TSStatus dropTriggerInstance(TDropTriggerInstanceReq req)
+ /**
+ * Config node will renew DataNodeLocation of a stateful trigger.
+ *
+ * @param trigger name, new DataNodeLocation
+ **/
+ common.TSStatus updateTriggerLocation (TUpdateTriggerLocationReq req)
+
/**
* Fire a stateful trigger on current data node.
*