You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/01/04 12:40:06 UTC
[iotdb] branch rel/1.0 updated: [To rel/1.0][IOTDB-5351] Add StatisticsUpdaterTrigger as trigger example and fix possible IT failures
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.0 by this push:
new d7580cce2a [To rel/1.0][IOTDB-5351] Add StatisticsUpdaterTrigger as trigger example and fix possible IT failures
d7580cce2a is described below
commit d7580cce2a6820d78a920a0737a5f3a78ccd1d67
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Wed Jan 4 20:39:58 2023 +0800
[To rel/1.0][IOTDB-5351] Add StatisticsUpdaterTrigger as trigger example and fix possible IT failures
---
.../org/apache/iotdb/trigger/SimpleTrigger.java | 38 -----
.../iotdb/trigger/StatisticsUpdaterTrigger.java | 160 +++++++++++++++++++++
.../apache/iotdb/trigger/old/AlertingExample.java | 108 --------------
.../trigger/example/TriggerFireTimesCounter.java | 26 +++-
.../db/it/trigger/IoTDBTriggerManagementIT.java | 8 +-
.../IoTDBSessionInsertWithTriggerExecutionIT.java | 10 +-
.../src/test/resources/TriggerFireTimesCounter.jar | Bin 1397 -> 1519 bytes
7 files changed, 192 insertions(+), 158 deletions(-)
diff --git a/example/trigger/src/main/java/org/apache/iotdb/trigger/SimpleTrigger.java b/example/trigger/src/main/java/org/apache/iotdb/trigger/SimpleTrigger.java
deleted file mode 100644
index a3c797d41e..0000000000
--- a/example/trigger/src/main/java/org/apache/iotdb/trigger/SimpleTrigger.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.trigger;
-
-import org.apache.iotdb.trigger.api.Trigger;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-
-import java.util.Arrays;
-
-public class SimpleTrigger implements Trigger {
-
- @Override
- public boolean fire(Tablet tablet) {
- System.out.println("receive a tablet, device name is " + tablet.deviceId);
- System.out.println("measurements are: ");
- tablet
- .getSchemas()
- .forEach(measurementSchema -> System.out.println(measurementSchema.getMeasurementId()));
- System.out.println("time are: " + Arrays.toString(tablet.timestamps));
- return true;
- }
-}
diff --git a/example/trigger/src/main/java/org/apache/iotdb/trigger/StatisticsUpdaterTrigger.java b/example/trigger/src/main/java/org/apache/iotdb/trigger/StatisticsUpdaterTrigger.java
new file mode 100644
index 0000000000..01cf1c28e1
--- /dev/null
+++ b/example/trigger/src/main/java/org/apache/iotdb/trigger/StatisticsUpdaterTrigger.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.trigger;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.trigger.api.Trigger;
+import org.apache.iotdb.trigger.api.TriggerAttributes;
+import org.apache.iotdb.trigger.api.enums.FailureStrategy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class StatisticsUpdaterTrigger implements Trigger {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(StatisticsUpdaterTrigger.class);
+
+ private static final String TARGET_DEVICE = "root.__system.statistics";
+
+ private static final String TARGET_SERIES = "total_count";
+ private String ip;
+
+ private int port;
+
+ private Session session;
+
+ private AtomicLong cnt = new AtomicLong(0);
+
+ private Future<?> updateFuture;
+
+ private final ScheduledExecutorService triggerInformationUpdateExecutor =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ "Stateful-Trigger-Statistics-Updater");
+
+ private static final long UPDATE_INTERVAL = 1000 * 20;
+
+ @Override
+ public void onCreate(TriggerAttributes attributes) throws Exception {
+ if (attributes.hasAttribute("ip")) {
+ ip = attributes.getString("ip");
+ } else {
+ throw new RuntimeException("ip is required");
+ }
+ if (attributes.hasAttribute("port")) {
+ port = Integer.parseInt(attributes.getString("port"));
+ } else {
+ throw new RuntimeException("port is required");
+ }
+ }
+
+ @Override
+ public boolean fire(Tablet tablet) throws Exception {
+ ensureSession();
+ if (tablet.bitMaps == null) {
+ cnt.addAndGet((long) tablet.rowSize * tablet.getSchemas().size());
+ return true;
+ }
+ for (int column = 0; column < tablet.getSchemas().size(); column++) {
+ BitMap bitMap = tablet.bitMaps[column];
+ if (bitMap == null) {
+ cnt.addAndGet(tablet.rowSize);
+ } else {
+ for (int row = 0; row < tablet.rowSize; row++) {
+ if (!bitMap.isMarked(row)) {
+ cnt.incrementAndGet();
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void restore() throws Exception {
+ ensureSession();
+ try {
+ SessionDataSet sessionDataSet =
+ session.executeQueryStatement(
+ String.format("select last %s from %s", TARGET_SERIES, TARGET_DEVICE));
+ if (sessionDataSet.hasNext()) {
+ cnt = new AtomicLong(sessionDataSet.next().getFields().get(0).getLongV());
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Error occurred when trying to restore stateful trigger", e);
+ }
+ LOGGER.info("###### restore ##########");
+ }
+
+ @Override
+ public void onDrop() throws Exception {
+ LOGGER.info("********** onDrop() ***********");
+ if (session != null) {
+ session.close();
+ updateFuture.cancel(true);
+ }
+ }
+
+ @Override
+ public FailureStrategy getFailureStrategy() {
+ return FailureStrategy.OPTIMISTIC;
+ }
+
+ private void ensureSession() throws IoTDBConnectionException {
+ if (session == null) {
+ session = new Session.Builder().host(ip).port(port).build();
+ session.open(false);
+ updateFuture =
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ triggerInformationUpdateExecutor,
+ this::updateTask,
+ UPDATE_INTERVAL,
+ UPDATE_INTERVAL,
+ TimeUnit.MILLISECONDS);
+ LOGGER.info("Stateful-Trigger-Statistics-Updater is successfully started.");
+ }
+ }
+
+ private void updateTask() {
+ try {
+ this.session.insertRecord(
+ TARGET_DEVICE,
+ new Date().getTime(),
+ Collections.singletonList(TARGET_SERIES),
+ Collections.singletonList(TSDataType.INT64),
+ cnt.get());
+ } catch (Exception e) {
+ LOGGER.warn("Error occurred in updateTask", e);
+ }
+ }
+}
diff --git a/example/trigger/src/main/java/org/apache/iotdb/trigger/old/AlertingExample.java b/example/trigger/src/main/java/org/apache/iotdb/trigger/old/AlertingExample.java
deleted file mode 100644
index ec7fe2bce3..0000000000
--- a/example/trigger/src/main/java/org/apache/iotdb/trigger/old/AlertingExample.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.trigger.old;
-
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.engine.trigger.api.Trigger;
-import org.apache.iotdb.db.engine.trigger.sink.alertmanager.AlertManagerConfiguration;
-import org.apache.iotdb.db.engine.trigger.sink.alertmanager.AlertManagerEvent;
-import org.apache.iotdb.db.engine.trigger.sink.alertmanager.AlertManagerHandler;
-import org.apache.iotdb.trigger.api.TriggerAttributes;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-public class AlertingExample implements Trigger {
-
- private final AlertManagerHandler alertManagerHandler = new AlertManagerHandler();
-
- private final AlertManagerConfiguration alertManagerConfiguration =
- new AlertManagerConfiguration("http://127.0.0.1:9093/api/v2/alerts");
-
- private String alertname;
-
- private final HashMap<String, String> labels = new HashMap<>();
-
- private final HashMap<String, String> annotations = new HashMap<>();
-
- @Override
- public void onCreate(TriggerAttributes attributes) throws Exception {
- alertManagerHandler.open(alertManagerConfiguration);
-
- alertname = "alert_test";
-
- labels.put("series", "root.ln.wf01.wt01.temperature");
- labels.put("value", "");
- labels.put("severity", "");
-
- annotations.put("summary", "high temperature");
- annotations.put("description", "{{.alertname}}: {{.series}} is {{.value}}");
- }
-
- @Override
- public void onDrop() throws IOException {
- alertManagerHandler.close();
- }
-
- @Override
- public void onStart() {
- alertManagerHandler.open(alertManagerConfiguration);
- }
-
- @Override
- public void onStop() throws Exception {
- alertManagerHandler.close();
- }
-
- @Override
- public Double fire(long timestamp, Double value, PartialPath path) throws Exception {
- if (value > 100.0) {
- labels.put("value", String.valueOf(value));
- labels.put("severity", "critical");
- AlertManagerEvent alertManagerEvent = new AlertManagerEvent(alertname, labels, annotations);
- alertManagerHandler.onEvent(alertManagerEvent);
- } else if (value > 50.0) {
- labels.put("value", String.valueOf(value));
- labels.put("severity", "warning");
- AlertManagerEvent alertManagerEvent = new AlertManagerEvent(alertname, labels, annotations);
- alertManagerHandler.onEvent(alertManagerEvent);
- }
-
- return value;
- }
-
- @Override
- public double[] fire(long[] timestamps, double[] values, PartialPath path) throws Exception {
- for (double value : values) {
- if (value > 100.0) {
- labels.put("value", String.valueOf(value));
- labels.put("severity", "critical");
- AlertManagerEvent alertManagerEvent = new AlertManagerEvent(alertname, labels, annotations);
- alertManagerHandler.onEvent(alertManagerEvent);
- } else if (value > 50.0) {
- labels.put("value", String.valueOf(value));
- labels.put("severity", "warning");
- AlertManagerEvent alertManagerEvent = new AlertManagerEvent(alertname, labels, annotations);
- alertManagerHandler.onEvent(alertManagerEvent);
- }
- }
- return values;
- }
-}
diff --git a/integration-test/src/main/java/org/apache/iotdb/db/trigger/example/TriggerFireTimesCounter.java b/integration-test/src/main/java/org/apache/iotdb/db/trigger/example/TriggerFireTimesCounter.java
index f8358e2f46..30c58053cb 100644
--- a/integration-test/src/main/java/org/apache/iotdb/db/trigger/example/TriggerFireTimesCounter.java
+++ b/integration-test/src/main/java/org/apache/iotdb/db/trigger/example/TriggerFireTimesCounter.java
@@ -41,6 +41,8 @@ public class TriggerFireTimesCounter implements Trigger {
private static final Logger LOGGER = LoggerFactory.getLogger(TriggerFireTimesCounter.class);
private String TXT_PATH;
+ private final int LOCK_FILE_RETRY_TIME = 10;
+
@Override
public void onCreate(TriggerAttributes attributes) throws Exception {
String counterName = attributes.getString("name");
@@ -70,9 +72,20 @@ public class TriggerFireTimesCounter implements Trigger {
@Override
public boolean fire(Tablet tablet) throws Exception {
- try (FileChannel fileChannel =
- FileChannel.open(Paths.get(TXT_PATH), StandardOpenOption.APPEND);
- FileLock fileLock = fileChannel.tryLock()) {
+ FileLock fileLock = null;
+ FileChannel fileChannel = null;
+ int retryNum = 0;
+ try {
+ fileChannel = FileChannel.open(Paths.get(TXT_PATH), StandardOpenOption.APPEND);
+ while (fileLock == null) {
+ fileLock = fileChannel.tryLock();
+ if (fileLock == null) {
+ if (retryNum++ >= LOCK_FILE_RETRY_TIME) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+ }
int rows = tablet.rowSize;
if (fileLock != null && fileLock.isValid()) {
String records = System.lineSeparator() + rows;
@@ -86,6 +99,13 @@ public class TriggerFireTimesCounter implements Trigger {
} catch (Throwable t) {
LOGGER.warn("TriggerFireTimesCounter error", t);
return false;
+ } finally {
+ if (fileLock != null) {
+ fileLock.close();
+ }
+ if (fileChannel != null) {
+ fileChannel.close();
+ }
}
return true;
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/trigger/IoTDBTriggerManagementIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/trigger/IoTDBTriggerManagementIT.java
index 35801a05c7..6a7d19fb61 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/trigger/IoTDBTriggerManagementIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/trigger/IoTDBTriggerManagementIT.java
@@ -62,16 +62,16 @@ public class IoTDBTriggerManagementIT {
"org.apache.iotdb.db.trigger.example.TriggerFireTimesCounter";
private static final String STATELESS_TRIGGER_BEFORE_INSERTION_PREFIX =
- "statelessTriggerBeforeInsertion_";
+ "statelessTriggerBeforeInsertionTriggerManagement_";
private static final String STATELESS_TRIGGER_AFTER_INSERTION_PREFIX =
- "statelessTriggerAfterInsertion_";
+ "statelessTriggerAfterInsertionTriggerManagement_";
private static final String STATEFUL_TRIGGER_BEFORE_INSERTION_PREFIX =
- "statefulTriggerBeforeInsertion_";
+ "statefulTriggerBeforeInsertionTriggerManagement_";
private static final String STATEFUL_TRIGGER_AFTER_INSERTION_PREFIX =
- "statefulTriggerAfterInsertion_";
+ "statefulTriggerAfterInsertionTriggerManagement_";
private static final String STATEFUL = "STATEFUL";
diff --git a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionInsertWithTriggerExecutionIT.java b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionInsertWithTriggerExecutionIT.java
index b0ac6ae476..3b8ebdaddf 100644
--- a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionInsertWithTriggerExecutionIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionInsertWithTriggerExecutionIT.java
@@ -67,16 +67,16 @@ public class IoTDBSessionInsertWithTriggerExecutionIT {
private final int rows = 10;
private static final String STATELESS_TRIGGER_BEFORE_INSERTION_PREFIX =
- "statelessTriggerBeforeInsertion_";
+ "statelessTriggerBeforeInsertionSession_";
private static final String STATELESS_TRIGGER_AFTER_INSERTION_PREFIX =
- "statelessTriggerAfterInsertion_";
+ "statelessTriggerAfterInsertionSession_";
private static final String STATEFUL_TRIGGER_BEFORE_INSERTION_PREFIX =
- "statefulTriggerBeforeInsertion_";
+ "statefulTriggerBeforeInsertionSession_";
private static final String STATEFUL_TRIGGER_AFTER_INSERTION_PREFIX =
- "statefulTriggerAfterInsertion_";
+ "statefulTriggerAfterInsertionSession_";
@BeforeClass
public static void setUp() throws Exception {
@@ -93,7 +93,7 @@ public class IoTDBSessionInsertWithTriggerExecutionIT {
private static void createTimeSeries() {
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
- session.setStorageGroup("root.test");
+ session.createDatabase("root.test");
session.createTimeseries(
"root.test.stateless.a", TSDataType.INT32, TSEncoding.PLAIN, CompressionType.SNAPPY);
session.createTimeseries(
diff --git a/integration-test/src/test/resources/TriggerFireTimesCounter.jar b/integration-test/src/test/resources/TriggerFireTimesCounter.jar
index 1d4d1f8e13..dc585f9f04 100644
Binary files a/integration-test/src/test/resources/TriggerFireTimesCounter.jar and b/integration-test/src/test/resources/TriggerFireTimesCounter.jar differ