You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/01/10 07:52:55 UTC

[iotdb] 01/01: [IOTDB-2274] TriggerExample: deadlock between trigger recovery and MQTT service starting

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-2274
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 20479e76b720edaea5df0c968dfd059b604a2c47
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon Jan 10 15:52:09 2022 +0800

    [IOTDB-2274] TriggerExample: deadlock between trigger recovery and MQTT service starting
---
 .../org/apache/iotdb/trigger/TriggerExample.java   | 28 ++++++++++++++++++----
 1 file changed, 24 insertions(+), 4 deletions(-)

diff --git a/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java b/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java
index 6e3ae52..d9b9dc0 100644
--- a/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java
+++ b/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java
@@ -45,6 +45,14 @@ public class TriggerExample implements Trigger {
   private final LocalIoTDBHandler localIoTDBHandler = new LocalIoTDBHandler();
   private final MQTTHandler mqttHandler = new MQTTHandler();
 
+  // This field is required when the target MQTT server is current IoTDB.
+  // When IoTDB restarts, the registered triggers will be restored before starting the MQTT service.
+  // For this trigger, if openSinkHandlers() is called in onCreate(), IoTDB server will be stuck
+  // in openSinkHandlers when recovering, because it can't connect to the MQTT server (not started
+  // yet).
+  // See IOTDB-2274 for more detail.
+  private volatile boolean isSinksOpenedAfterCreation = false;
+
   private SlidingSizeWindowEvaluationHandler windowEvaluationHandler;
 
   @Override
@@ -54,8 +62,6 @@ public class TriggerExample implements Trigger {
     double lo = attributes.getDouble("lo");
     double hi = attributes.getDouble("hi");
 
-    openSinkHandlers();
-
     windowEvaluationHandler =
         new SlidingSizeWindowEvaluationHandler(
             new SlidingSizeWindowConfiguration(TSDataType.DOUBLE, 5, 5),
@@ -93,19 +99,33 @@ public class TriggerExample implements Trigger {
   }
 
   @Override
-  public Double fire(long timestamp, Double value) {
+  public Double fire(long timestamp, Double value) throws Exception {
+    tryOpenSinksFirstOnFire();
     windowEvaluationHandler.collect(timestamp, value);
     return value;
   }
 
   @Override
-  public double[] fire(long[] timestamps, double[] values) {
+  public double[] fire(long[] timestamps, double[] values) throws Exception {
+    tryOpenSinksFirstOnFire();
     for (int i = 0; i < timestamps.length; ++i) {
       windowEvaluationHandler.collect(timestamps[i], values[i]);
     }
     return values;
   }
 
+  // See IOTDB-2274 for more detail.
+  private void tryOpenSinksFirstOnFire() throws Exception {
+    if (!isSinksOpenedAfterCreation) {
+      synchronized (this) {
+        if (!isSinksOpenedAfterCreation) {
+          openSinkHandlers();
+          isSinksOpenedAfterCreation = true;
+        }
+      }
+    }
+  }
+
   private void openSinkHandlers() throws Exception {
     localIoTDBHandler.open(
         new LocalIoTDBConfiguration(