You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/20 06:55:26 UTC

[iotdb] branch master updated: [IOTDB-4669] Scheduled task for checking trigger-table of DataNode (#7660)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a12f45e42e [IOTDB-4669] Scheduled task for checking trigger-table of DataNode (#7660)
a12f45e42e is described below

commit a12f45e42e79ff0883ff327ee1683529164569c5
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Thu Oct 20 14:55:21 2022 +0800

    [IOTDB-4669] Scheduled task for checking trigger-table of DataNode (#7660)
---
 .../java/org/apache/iotdb/db/service/DataNode.java |  13 +++
 .../db/trigger/executor/TriggerFireVisitor.java    |  39 ++++----
 .../trigger/service/TriggerInformationUpdater.java | 104 +++++++++++++++++++++
 .../trigger/service/TriggerManagementService.java  |   8 +-
 4 files changed, 140 insertions(+), 24 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 75fd26fa74..d24af2095f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -70,6 +70,7 @@ import org.apache.iotdb.db.service.thrift.impl.ClientRPCServiceImpl;
 import org.apache.iotdb.db.service.thrift.impl.DataNodeRegionManager;
 import org.apache.iotdb.db.sync.SyncService;
 import org.apache.iotdb.db.trigger.executor.TriggerExecutor;
+import org.apache.iotdb.db.trigger.service.TriggerInformationUpdater;
 import org.apache.iotdb.db.trigger.service.TriggerManagementService;
 import org.apache.iotdb.db.wal.WALManager;
 import org.apache.iotdb.db.wal.utils.WALMode;
@@ -106,6 +107,10 @@ public class DataNode implements DataNodeMBean {
   private final ResourcesInformationHolder resourcesInformationHolder =
       new ResourcesInformationHolder();
 
+  /** Responsible for keeping trigger information up to date */
+  private final TriggerInformationUpdater triggerInformationUpdater =
+      new TriggerInformationUpdater();
+
   private DataNode() {
     // we do not init anything here, so that we can re-initialize the instance in IT.
   }
@@ -459,6 +464,9 @@ public class DataNode implements DataNodeMBean {
       logger.debug(
           "get trigger executor: {}", triggerExecutor.getTriggerInformation().getTriggerName());
     }
+
+    // start TriggerInformationUpdater
+    triggerInformationUpdater.startTriggerInformationUpdater();
   }
 
   private void getJarOfTriggers(List<TriggerInformation> triggerInformationList)
@@ -557,12 +565,17 @@ public class DataNode implements DataNodeMBean {
 
   private void deactivate() {
     logger.info("Deactivating IoTDB DataNode...");
+    stopTriggerRelatedServices();
     // stopThreadPools();
     registerManager.deregisterAll();
     JMXService.deregisterMBean(mbeanName);
     logger.info("IoTDB DataNode is deactivated.");
   }
 
+  private void stopTriggerRelatedServices() {
+    triggerInformationUpdater.stopTriggerInformationUpdater();
+  }
+
   private void setUncaughtExceptionHandler() {
     Thread.setDefaultUncaughtExceptionHandler(new IoTDBDefaultThreadExceptionHandler());
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java b/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
index 1deb612671..a4f627e3fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
@@ -322,10 +322,11 @@ public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEv
     TriggerFireResult result = TriggerFireResult.SUCCESS;
     for (int i = 0; i < FIRE_RETRY_NUM; i++) {
       if (TriggerManagementService.getInstance().needToFireOnAnotherDataNode(triggerName)) {
-        TEndPoint endPoint =
-            TriggerManagementService.getInstance().getEndPointForStatefulTrigger(triggerName);
+        TDataNodeLocation tDataNodeLocation =
+            TriggerManagementService.getInstance()
+                .getDataNodeLocationOfStatefulTrigger(triggerName);
         try (SyncDataNodeInternalServiceClient client =
-            INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(endPoint)) {
+            INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(tDataNodeLocation.getInternalEndPoint())) {
           TFireTriggerReq req = new TFireTriggerReq(triggerName, tablet.serialize(), event.getId());
           TFireTriggerResp resp = client.fireTrigger(req);
           if (resp.foundExecutor) {
@@ -333,9 +334,9 @@ public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEv
             return TriggerFireResult.construct(resp.getFireResult());
           } else {
             // update TDataNodeLocation of stateful trigger through config node
-            if (!updateLocationOfStatefulTrigger(triggerName)) {
+            if (!updateLocationOfStatefulTrigger(triggerName, tDataNodeLocation.getDataNodeId())) {
               // if TDataNodeLocation is still the same, sleep 1s and before the retry
-              Thread.sleep(1000);
+              Thread.sleep(4000);
             }
           }
         } catch (IOException | TException e) {
@@ -347,15 +348,15 @@ public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEv
           LOGGER.warn(
               "Error occurred when trying to fire trigger({}) on TEndPoint: {}, the cause is: {}",
               triggerName,
-              endPoint.toString(),
+              tDataNodeLocation.getInternalEndPoint().toString(),
               e);
           // update TDataNodeLocation of stateful trigger through config node
-          updateLocationOfStatefulTrigger(triggerName);
+          updateLocationOfStatefulTrigger(triggerName, tDataNodeLocation.getDataNodeId());
         } catch (Throwable e) {
           LOGGER.warn(
               "Error occurred when trying to fire trigger({}) on TEndPoint: {}, the cause is: {}",
               triggerName,
-              endPoint.toString(),
+              tDataNodeLocation.getInternalEndPoint().toString(),
               e);
           // do not retry if it is not due to bad network or no executor found
           return TriggerManagementService.getInstance()
@@ -396,22 +397,20 @@ public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEv
   }
 
   /** Return true if the config node returns a new TDataNodeLocation */
-  private boolean updateLocationOfStatefulTrigger(String triggerName) {
+  private boolean updateLocationOfStatefulTrigger(String triggerName, int currentDataNodeId) {
     try (ConfigNodeClient configNodeClient =
         CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
-      TDataNodeLocation tDataNodeLocation =
+      TDataNodeLocation newTDataNodeLocation =
           configNodeClient.getLocationOfStatefulTrigger(triggerName).getDataNodeLocation();
-      if (tDataNodeLocation != null) {
-        TriggerManagementService.getInstance()
-            .updateLocationOfStatefulTrigger(triggerName, tDataNodeLocation);
-        return TriggerManagementService.getInstance()
-                .getTriggerInformation(triggerName)
-                .getDataNodeLocation()
-                .getDataNodeId()
-            == tDataNodeLocation.getDataNodeId();
-      } else {
-        return false;
+      if (newTDataNodeLocation != null) {
+        if (currentDataNodeId != newTDataNodeLocation.getDataNodeId()) {
+          // indicates that the location of this stateful trigger has changed
+          TriggerManagementService.getInstance()
+              .updateLocationOfStatefulTrigger(triggerName, newTDataNodeLocation);
+          return true;
+        }
       }
+      return false;
     } catch (TException | IOException e) {
       LOGGER.error(
           "Failed to update location of stateful trigger({}) through config node and the cause is {}.",
diff --git a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerInformationUpdater.java b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerInformationUpdater.java
new file mode 100644
index 0000000000..c6e0ff97d1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerInformationUpdater.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.trigger.service;
+
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.consensus.PartitionRegionId;
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.trigger.TriggerInformation;
+import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
+import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeInfo;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class TriggerInformationUpdater {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TriggerInformationUpdater.class);
+
+  private static final IClientManager<PartitionRegionId, ConfigNodeClient>
+      CONFIG_NODE_CLIENT_MANAGER =
+          new IClientManager.Factory<PartitionRegionId, ConfigNodeClient>()
+              .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+
+  private final ScheduledExecutorService triggerInformationUpdateExecutor =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+          "Stateful-Trigger-Information-Updater");
+
+  private Future<?> updateFuture;
+
+  private static final long UPDATE_INTERVAL = 1000 * 60;
+
+  public void startTriggerInformationUpdater() {
+    if (updateFuture == null) {
+      updateFuture =
+          ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+              triggerInformationUpdateExecutor,
+              this::updateTask,
+              UPDATE_INTERVAL,
+              UPDATE_INTERVAL,
+              TimeUnit.MILLISECONDS);
+      LOGGER.info("Stateful-Trigger-Information-Updater is successfully started.");
+    }
+  }
+
+  public void stopTriggerInformationUpdater() {
+    if (updateFuture != null) {
+      updateFuture.cancel(false);
+      updateFuture = null;
+      LOGGER.info("Stateful-Trigger-Information-Updater is successfully stopped.");
+    }
+  }
+
+  public void updateTask() {
+    try (ConfigNodeClient client =
+        CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+      TGetTriggerTableResp getStatefulTriggerTableResp = client.getStatefulTriggerTable();
+      if (getStatefulTriggerTableResp.getStatus().getCode()
+          != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new IoTDBException(
+            getStatefulTriggerTableResp.getStatus().getMessage(),
+            getStatefulTriggerTableResp.getStatus().getCode());
+      }
+      List<TriggerInformation> statefulTriggerInformationList =
+          getStatefulTriggerTableResp.getAllTriggerInformation().stream()
+              .map(TriggerInformation::deserialize)
+              .collect(Collectors.toList());
+      for (TriggerInformation triggerInformation : statefulTriggerInformationList) {
+        TriggerManagementService.getInstance()
+            .updateLocationOfStatefulTrigger(
+                triggerInformation.getTriggerName(), triggerInformation.getDataNodeLocation());
+      }
+    } catch (Exception e) {
+      LOGGER.warn(String.format("Meet error when updating trigger information: %s", e));
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
index 62adc03055..8f471ff405 100644
--- a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
+++ b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerManagementService.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.trigger.service;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PatternTreeMap;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
@@ -319,12 +318,13 @@ public class TriggerManagementService {
 
   /**
    * @param triggerName given trigger
-   * @return InternalRPC TEndPoint of DataNode where instance of given stateful trigger is on.
+   * @return TDataNodeLocation of DataNode where instance of given stateful trigger is on. Null if
+   *     trigger not found.
    */
-  public TEndPoint getEndPointForStatefulTrigger(String triggerName) {
+  public TDataNodeLocation getDataNodeLocationOfStatefulTrigger(String triggerName) {
     TriggerInformation triggerInformation = triggerTable.getTriggerInformation(triggerName);
     if (triggerInformation.isStateful()) {
-      return triggerInformation.getDataNodeLocation().getInternalEndPoint();
+      return triggerInformation.getDataNodeLocation();
     }
     return null;
   }