You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/01/10 07:52:54 UTC

[iotdb] branch iotdb-2274 created (now 20479e7)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a change to branch iotdb-2274
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 20479e7  [IOTDB-2274] TriggerExample: deadlock between trigger recovery and MQTT service starting

This branch includes the following new commits:

     new 20479e7  [IOTDB-2274] TriggerExample: deadlock between trigger recovery and MQTT service starting

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: [IOTDB-2274] TriggerExample: deadlock between trigger recovery and MQTT service starting

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-2274
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 20479e76b720edaea5df0c968dfd059b604a2c47
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon Jan 10 15:52:09 2022 +0800

    [IOTDB-2274] TriggerExample: deadlock between trigger recovery and MQTT service starting
---
 .../org/apache/iotdb/trigger/TriggerExample.java   | 28 ++++++++++++++++++----
 1 file changed, 24 insertions(+), 4 deletions(-)

diff --git a/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java b/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java
index 6e3ae52..d9b9dc0 100644
--- a/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java
+++ b/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java
@@ -45,6 +45,14 @@ public class TriggerExample implements Trigger {
   private final LocalIoTDBHandler localIoTDBHandler = new LocalIoTDBHandler();
   private final MQTTHandler mqttHandler = new MQTTHandler();
 
+  // This field is required when the target MQTT server is current IoTDB.
+  // When IoTDB restarts, the registered triggers will be restored before starting the MQTT service.
+  // For this trigger, if openSinkHandlers() is called in onCreate(), IoTDB server will be stuck
+  // in openSinkHandlers when recovering, because it can't connect to the MQTT server (not started
+  // yet).
+  // See IOTDB-2274 for more detail.
+  private volatile boolean isSinksOpenedAfterCreation = false;
+
   private SlidingSizeWindowEvaluationHandler windowEvaluationHandler;
 
   @Override
@@ -54,8 +62,6 @@ public class TriggerExample implements Trigger {
     double lo = attributes.getDouble("lo");
     double hi = attributes.getDouble("hi");
 
-    openSinkHandlers();
-
     windowEvaluationHandler =
         new SlidingSizeWindowEvaluationHandler(
             new SlidingSizeWindowConfiguration(TSDataType.DOUBLE, 5, 5),
@@ -93,19 +99,33 @@ public class TriggerExample implements Trigger {
   }
 
   @Override
-  public Double fire(long timestamp, Double value) {
+  public Double fire(long timestamp, Double value) throws Exception {
+    tryOpenSinksFirstOnFire();
     windowEvaluationHandler.collect(timestamp, value);
     return value;
   }
 
   @Override
-  public double[] fire(long[] timestamps, double[] values) {
+  public double[] fire(long[] timestamps, double[] values) throws Exception {
+    tryOpenSinksFirstOnFire();
     for (int i = 0; i < timestamps.length; ++i) {
       windowEvaluationHandler.collect(timestamps[i], values[i]);
     }
     return values;
   }
 
+  // See IOTDB-2274 for more detail.
+  private void tryOpenSinksFirstOnFire() throws Exception {
+    if (!isSinksOpenedAfterCreation) {
+      synchronized (this) {
+        if (!isSinksOpenedAfterCreation) {
+          openSinkHandlers();
+          isSinksOpenedAfterCreation = true;
+        }
+      }
+    }
+  }
+
   private void openSinkHandlers() throws Exception {
     localIoTDBHandler.open(
         new LocalIoTDBConfiguration(