You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/01/10 07:52:55 UTC
[iotdb] 01/01: [IOTDB-2274] TriggerExample: deadlock between trigger recovery and MQTT service starting
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch iotdb-2274
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 20479e76b720edaea5df0c968dfd059b604a2c47
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon Jan 10 15:52:09 2022 +0800
[IOTDB-2274] TriggerExample: deadlock between trigger recovery and MQTT service starting
---
.../org/apache/iotdb/trigger/TriggerExample.java | 28 ++++++++++++++++++----
1 file changed, 24 insertions(+), 4 deletions(-)
diff --git a/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java b/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java
index 6e3ae52..d9b9dc0 100644
--- a/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java
+++ b/example/trigger/src/main/java/org/apache/iotdb/trigger/TriggerExample.java
@@ -45,6 +45,14 @@ public class TriggerExample implements Trigger {
private final LocalIoTDBHandler localIoTDBHandler = new LocalIoTDBHandler();
private final MQTTHandler mqttHandler = new MQTTHandler();
+ // This field is required when the target MQTT server is current IoTDB.
+ // When IoTDB restarts, the registered triggers will be restored before starting the MQTT service.
+ // For this trigger, if openSinkHandlers() is called in onCreate(), IoTDB server will be stuck
+ // in openSinkHandlers when recovering, because it can't connect to the MQTT server (not started
+ // yet).
+ // See IOTDB-2274 for more detail.
+ private volatile boolean isSinksOpenedAfterCreation = false;
+
private SlidingSizeWindowEvaluationHandler windowEvaluationHandler;
@Override
@@ -54,8 +62,6 @@ public class TriggerExample implements Trigger {
double lo = attributes.getDouble("lo");
double hi = attributes.getDouble("hi");
- openSinkHandlers();
-
windowEvaluationHandler =
new SlidingSizeWindowEvaluationHandler(
new SlidingSizeWindowConfiguration(TSDataType.DOUBLE, 5, 5),
@@ -93,19 +99,33 @@ public class TriggerExample implements Trigger {
}
@Override
- public Double fire(long timestamp, Double value) {
+ public Double fire(long timestamp, Double value) throws Exception {
+ tryOpenSinksFirstOnFire();
windowEvaluationHandler.collect(timestamp, value);
return value;
}
@Override
- public double[] fire(long[] timestamps, double[] values) {
+ public double[] fire(long[] timestamps, double[] values) throws Exception {
+ tryOpenSinksFirstOnFire();
for (int i = 0; i < timestamps.length; ++i) {
windowEvaluationHandler.collect(timestamps[i], values[i]);
}
return values;
}
+ // See IOTDB-2274 for more detail.
+ private void tryOpenSinksFirstOnFire() throws Exception {
+ if (!isSinksOpenedAfterCreation) {
+ synchronized (this) {
+ if (!isSinksOpenedAfterCreation) {
+ openSinkHandlers();
+ isSinksOpenedAfterCreation = true;
+ }
+ }
+ }
+ }
+
private void openSinkHandlers() throws Exception {
localIoTDBHandler.open(
new LocalIoTDBConfiguration(