You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/20 06:55:26 UTC
[iotdb] branch master updated: [IOTDB-4669] Scheduled task for checking trigger-table of DataNode (#7660)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a12f45e42e [IOTDB-4669] Scheduled task for checking trigger-table of DataNode (#7660)
a12f45e42e is described below
commit a12f45e42e79ff0883ff327ee1683529164569c5
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Thu Oct 20 14:55:21 2022 +0800
[IOTDB-4669] Scheduled task for checking trigger-table of DataNode (#7660)
---
.../java/org/apache/iotdb/db/service/DataNode.java | 13 +++
.../db/trigger/executor/TriggerFireVisitor.java | 39 ++++----
.../trigger/service/TriggerInformationUpdater.java | 104 +++++++++++++++++++++
.../trigger/service/TriggerManagementService.java | 8 +-
4 files changed, 140 insertions(+), 24 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 75fd26fa74..d24af2095f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -70,6 +70,7 @@ import org.apache.iotdb.db.service.thrift.impl.ClientRPCServiceImpl;
import org.apache.iotdb.db.service.thrift.impl.DataNodeRegionManager;
import org.apache.iotdb.db.sync.SyncService;
import org.apache.iotdb.db.trigger.executor.TriggerExecutor;
+import org.apache.iotdb.db.trigger.service.TriggerInformationUpdater;
import org.apache.iotdb.db.trigger.service.TriggerManagementService;
import org.apache.iotdb.db.wal.WALManager;
import org.apache.iotdb.db.wal.utils.WALMode;
@@ -106,6 +107,10 @@ public class DataNode implements DataNodeMBean {
private final ResourcesInformationHolder resourcesInformationHolder =
new ResourcesInformationHolder();
+ /** Responsible for keeping trigger information up to date */
+ private final TriggerInformationUpdater triggerInformationUpdater =
+ new TriggerInformationUpdater();
+
private DataNode() {
// we do not init anything here, so that we can re-initialize the instance in IT.
}
@@ -459,6 +464,9 @@ public class DataNode implements DataNodeMBean {
logger.debug(
"get trigger executor: {}", triggerExecutor.getTriggerInformation().getTriggerName());
}
+
+ // start TriggerInformationUpdater
+ triggerInformationUpdater.startTriggerInformationUpdater();
}
private void getJarOfTriggers(List<TriggerInformation> triggerInformationList)
@@ -557,12 +565,17 @@ public class DataNode implements DataNodeMBean {
private void deactivate() {
logger.info("Deactivating IoTDB DataNode...");
+ stopTriggerRelatedServices();
// stopThreadPools();
registerManager.deregisterAll();
JMXService.deregisterMBean(mbeanName);
logger.info("IoTDB DataNode is deactivated.");
}
+ private void stopTriggerRelatedServices() {
+ triggerInformationUpdater.stopTriggerInformationUpdater();
+ }
+
private void setUncaughtExceptionHandler() {
Thread.setDefaultUncaughtExceptionHandler(new IoTDBDefaultThreadExceptionHandler());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java b/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
index 1deb612671..a4f627e3fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
@@ -322,10 +322,11 @@ public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEv
TriggerFireResult result = TriggerFireResult.SUCCESS;
for (int i = 0; i < FIRE_RETRY_NUM; i++) {
if (TriggerManagementService.getInstance().needToFireOnAnotherDataNode(triggerName)) {
- TEndPoint endPoint =
- TriggerManagementService.getInstance().getEndPointForStatefulTrigger(triggerName);
+ TDataNodeLocation tDataNodeLocation =
+ TriggerManagementService.getInstance()
+ .getDataNodeLocationOfStatefulTrigger(triggerName);
try (SyncDataNodeInternalServiceClient client =
- INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(endPoint)) {
+ INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(tDataNodeLocation.getInternalEndPoint())) {
TFireTriggerReq req = new TFireTriggerReq(triggerName, tablet.serialize(), event.getId());
TFireTriggerResp resp = client.fireTrigger(req);
if (resp.foundExecutor) {
@@ -333,9 +334,9 @@ public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEv
return TriggerFireResult.construct(resp.getFireResult());
} else {
// update TDataNodeLocation of stateful trigger through config node
- if (!updateLocationOfStatefulTrigger(triggerName)) {
+ if (!updateLocationOfStatefulTrigger(triggerName, tDataNodeLocation.getDataNodeId())) {
// if TDataNodeLocation is still the same, sleep 1s and before the retry
- Thread.sleep(1000);
+ Thread.sleep(4000);
}
}
} catch (IOException | TException e) {
@@ -347,15 +348,15 @@ public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEv
LOGGER.warn(
"Error occurred when trying to fire trigger({}) on TEndPoint: {}, the cause is: {}",
triggerName,
- endPoint.toString(),
+ tDataNodeLocation.getInternalEndPoint().toString(),
e);
// update TDataNodeLocation of stateful trigger through config node
- updateLocationOfStatefulTrigger(triggerName);
+ updateLocationOfStatefulTrigger(triggerName, tDataNodeLocation.getDataNodeId());
} catch (Throwable e) {
LOGGER.warn(
"Error occurred when trying to fire trigger({}) on TEndPoint: {}, the cause is: {}",
triggerName,
- endPoint.toString(),
+ tDataNodeLocation.getInternalEndPoint().toString(),
e);
// do not retry if it is not due to bad network or no executor found
return TriggerManagementService.getInstance()
@@ -396,22 +397,20 @@ public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEv
}
/** Return true if the config node returns a new TDataNodeLocation */
- private boolean updateLocationOfStatefulTrigger(String triggerName) {
+ private boolean updateLocationOfStatefulTrigger(String triggerName, int currentDataNodeId) {
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
- TDataNodeLocation tDataNodeLocation =
+ TDataNodeLocation newTDataNodeLocation =
configNodeClient.getLocationOfStatefulTrigger(triggerName).getDataNodeLocation();
- if (tDataNodeLocation != null) {
- TriggerManagementService.getInstance()
- .updateLocationOfStatefulTrigger(triggerName, tDataNodeLocation);
- return TriggerManagementService.getInstance()
- .getTriggerInformation(triggerName)
- .getDataNodeLocation()
- .getDataNodeId()
- == tDataNodeLocation.getDataNodeId();
- } else {
- return false;
+ if (newTDataNodeLocation != null) {
+ if (currentDataNodeId != newTDataNodeLocation.getDataNodeId()) {
+ // indicates that the location of this stateful trigger has changed
+ TriggerManagementService.getInstance()
+ .updateLocationOfStatefulTrigger(triggerName, newTDataNodeLocation);
+ return true;
+ }
}
+ return false;
} catch (TException | IOException e) {
LOGGER.error(
"Failed to update location of stateful trigger({}) through config node and the cause is {}.",
diff --git a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerInformationUpdater.java b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerInformationUpdater.java
new file mode 100644
index 0000000000..c6e0ff97d1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerInformationUpdater.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.trigger.service;
+
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.consensus.PartitionRegionId;
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.trigger.TriggerInformation;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
+import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeInfo;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class TriggerInformationUpdater {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TriggerInformationUpdater.class);
+
+ private static final IClientManager<PartitionRegionId, ConfigNodeClient>
+ CONFIG_NODE_CLIENT_MANAGER =
+ new IClientManager.Factory<PartitionRegionId, ConfigNodeClient>()
+ .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+
+ private final ScheduledExecutorService triggerInformationUpdateExecutor =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ "Stateful-Trigger-Information-Updater");
+
+ private Future<?> updateFuture;
+
+ private static final long UPDATE_INTERVAL = 1000 * 60;
+
+ public void startTriggerInformationUpdater() {
+ if (updateFuture == null) {
+ updateFuture =
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ triggerInformationUpdateExecutor,
+ this::updateTask,
+ UPDATE_INTERVAL,
+ UPDATE_INTERVAL,
+ TimeUnit.MILLISECONDS);
+ LOGGER.info("Stateful-Trigger-Information-Updater is successfully started.");
+ }
+ }
+
+ public void stopTriggerInformationUpdater() {
+ if (updateFuture != null) {
+ updateFuture.cancel(false);
+ updateFuture = null;
+ LOGGER.info("Stateful-Trigger-Information-Updater is successfully stopped.");
+ }
+ }
+
+ public void updateTask() {
+ try (ConfigNodeClient client =
+ CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+ TGetTriggerTableResp getStatefulTriggerTableResp = client.getStatefulTriggerTable();
+ if (getStatefulTriggerTableResp.getStatus().getCode()
+ != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBException(
+ getStatefulTriggerTableResp.getStatus().getMessage(),
+ getStatefulTriggerTableResp.getStatus().getCode());
+ }
+ List<TriggerInformation> statefulTriggerInformationList =
+ getStatefulTriggerTableResp.getAllTriggerInformation().stream()
+ .map(TriggerInformation::deserialize)
+ .collect(Collectors.toList());
+ for (TriggerInformation triggerInformation : statefulTriggerInformationList) {
+ TriggerManagementService.getInstance()
+ .updateLocationOfStatefulTrigger(
+ triggerInformation.getTriggerName(), triggerInformation.getDataNodeLocation());
+ }
+ } catch (Exception e) {
+ LOGGER.warn(String.format("Meet error when updating trigger information: %s", e));
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
index 62adc03055..8f471ff405 100644
--- a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
+++ b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.trigger.service;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PatternTreeMap;
import org.apache.iotdb.commons.trigger.TriggerInformation;
@@ -319,12 +318,13 @@ public class TriggerManagementService {
/**
* @param triggerName given trigger
- * @return InternalRPC TEndPoint of DataNode where instance of given stateful trigger is on.
+ * @return TDataNodeLocation of DataNode where instance of given stateful trigger is on. Null if
+ * trigger not found.
*/
- public TEndPoint getEndPointForStatefulTrigger(String triggerName) {
+ public TDataNodeLocation getDataNodeLocationOfStatefulTrigger(String triggerName) {
TriggerInformation triggerInformation = triggerTable.getTriggerInformation(triggerName);
if (triggerInformation.isStateful()) {
- return triggerInformation.getDataNodeLocation().getInternalEndPoint();
+ return triggerInformation.getDataNodeLocation();
}
return null;
}