You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/01/10 13:26:16 UTC

[iotdb] branch master updated: [IOTDB-2274] TriggerExample: deadlock between trigger recovery and MQTT service starting (#4748)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 45050cb  [IOTDB-2274] TriggerExample: deadlock between trigger recovery and MQTT service starting (#4748)
45050cb is described below

commit 45050cb05d97b9413d6e22e676f7a840c574c62f
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon Jan 10 21:25:44 2022 +0800

    [IOTDB-2274] TriggerExample: deadlock between trigger recovery and MQTT service starting (#4748)
---
 .../org/apache/iotdb/trigger/TriggerExample.java   | 28 ++++++++++++++++++----
 1 file changed, 24 insertions(+), 4 deletions(-)

diff --git a/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java b/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java
index 6e3ae52..d9b9dc0 100644
--- a/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java
+++ b/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java
@@ -45,6 +45,14 @@ public class TriggerExample implements Trigger {
   private final LocalIoTDBHandler localIoTDBHandler = new LocalIoTDBHandler();
   private final MQTTHandler mqttHandler = new MQTTHandler();
 
+  // This field is required when the target MQTT server is current IoTDB.
+  // When IoTDB restarts, the registered triggers will be restored before starting the MQTT service.
+  // For this trigger, if openSinkHandlers() is called in onCreate(), IoTDB server will be stuck
+  // in openSinkHandlers when recovering, because it can't connect to the MQTT server (not started
+  // yet).
+  // See IOTDB-2274 for more detail.
+  private volatile boolean isSinksOpenedAfterCreation = false;
+
   private SlidingSizeWindowEvaluationHandler windowEvaluationHandler;
 
   @Override
@@ -54,8 +62,6 @@ public class TriggerExample implements Trigger {
     double lo = attributes.getDouble("lo");
     double hi = attributes.getDouble("hi");
 
-    openSinkHandlers();
-
     windowEvaluationHandler =
         new SlidingSizeWindowEvaluationHandler(
             new SlidingSizeWindowConfiguration(TSDataType.DOUBLE, 5, 5),
@@ -93,19 +99,33 @@ public class TriggerExample implements Trigger {
   }
 
   @Override
-  public Double fire(long timestamp, Double value) {
+  public Double fire(long timestamp, Double value) throws Exception {
+    tryOpenSinksFirstOnFire();
     windowEvaluationHandler.collect(timestamp, value);
     return value;
   }
 
   @Override
-  public double[] fire(long[] timestamps, double[] values) {
+  public double[] fire(long[] timestamps, double[] values) throws Exception {
+    tryOpenSinksFirstOnFire();
     for (int i = 0; i < timestamps.length; ++i) {
       windowEvaluationHandler.collect(timestamps[i], values[i]);
     }
     return values;
   }
 
+  // See IOTDB-2274 for more detail.
+  private void tryOpenSinksFirstOnFire() throws Exception {
+    if (!isSinksOpenedAfterCreation) {
+      synchronized (this) {
+        if (!isSinksOpenedAfterCreation) {
+          openSinkHandlers();
+          isSinksOpenedAfterCreation = true;
+        }
+      }
+    }
+  }
+
   private void openSinkHandlers() throws Exception {
     localIoTDBHandler.open(
         new LocalIoTDBConfiguration(