You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/01/04 12:40:06 UTC

[iotdb] branch rel/1.0 updated: [To rel/1.0][IOTDB-5351] Add StatisticsUpdaterTrigger as trigger example and fix possible IT failures

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.0 by this push:
     new d7580cce2a [To rel/1.0][IOTDB-5351] Add StatisticsUpdaterTrigger as trigger example and fix possible IT failures
d7580cce2a is described below

commit d7580cce2a6820d78a920a0737a5f3a78ccd1d67
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Wed Jan 4 20:39:58 2023 +0800

    [To rel/1.0][IOTDB-5351] Add StatisticsUpdaterTrigger as trigger example and fix possible IT failures
---
 .../org/apache/iotdb/trigger/SimpleTrigger.java    |  38 -----
 .../iotdb/trigger/StatisticsUpdaterTrigger.java    | 160 +++++++++++++++++++++
 .../apache/iotdb/trigger/old/AlertingExample.java  | 108 --------------
 .../trigger/example/TriggerFireTimesCounter.java   |  26 +++-
 .../db/it/trigger/IoTDBTriggerManagementIT.java    |   8 +-
 .../IoTDBSessionInsertWithTriggerExecutionIT.java  |  10 +-
 .../src/test/resources/TriggerFireTimesCounter.jar | Bin 1397 -> 1519 bytes
 7 files changed, 192 insertions(+), 158 deletions(-)

diff --git a/example/trigger/src/main/java/org/apache/iotdb/trigger/SimpleTrigger.java b/example/trigger/src/main/java/org/apache/iotdb/trigger/SimpleTrigger.java
deleted file mode 100644
index a3c797d41e..0000000000
--- a/example/trigger/src/main/java/org/apache/iotdb/trigger/SimpleTrigger.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.trigger;
-
-import org.apache.iotdb.trigger.api.Trigger;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-
-import java.util.Arrays;
-
-public class SimpleTrigger implements Trigger {
-
-  @Override
-  public boolean fire(Tablet tablet) {
-    System.out.println("receive a tablet, device name is " + tablet.deviceId);
-    System.out.println("measurements are: ");
-    tablet
-        .getSchemas()
-        .forEach(measurementSchema -> System.out.println(measurementSchema.getMeasurementId()));
-    System.out.println("time are: " + Arrays.toString(tablet.timestamps));
-    return true;
-  }
-}
diff --git a/example/trigger/src/main/java/org/apache/iotdb/trigger/StatisticsUpdaterTrigger.java b/example/trigger/src/main/java/org/apache/iotdb/trigger/StatisticsUpdaterTrigger.java
new file mode 100644
index 0000000000..01cf1c28e1
--- /dev/null
+++ b/example/trigger/src/main/java/org/apache/iotdb/trigger/StatisticsUpdaterTrigger.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.trigger;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.trigger.api.Trigger;
+import org.apache.iotdb.trigger.api.TriggerAttributes;
+import org.apache.iotdb.trigger.api.enums.FailureStrategy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class StatisticsUpdaterTrigger implements Trigger {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(StatisticsUpdaterTrigger.class);
+
+  private static final String TARGET_DEVICE = "root.__system.statistics";
+
+  private static final String TARGET_SERIES = "total_count";
+  private String ip;
+
+  private int port;
+
+  private Session session;
+
+  private AtomicLong cnt = new AtomicLong(0);
+
+  private Future<?> updateFuture;
+
+  private final ScheduledExecutorService triggerInformationUpdateExecutor =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+          "Stateful-Trigger-Statistics-Updater");
+
+  private static final long UPDATE_INTERVAL = 1000 * 20;
+
+  @Override
+  public void onCreate(TriggerAttributes attributes) throws Exception {
+    if (attributes.hasAttribute("ip")) {
+      ip = attributes.getString("ip");
+    } else {
+      throw new RuntimeException("ip is required");
+    }
+    if (attributes.hasAttribute("port")) {
+      port = Integer.parseInt(attributes.getString("port"));
+    } else {
+      throw new RuntimeException("port is required");
+    }
+  }
+
+  @Override
+  public boolean fire(Tablet tablet) throws Exception {
+    ensureSession();
+    if (tablet.bitMaps == null) {
+      cnt.addAndGet((long) tablet.rowSize * tablet.getSchemas().size());
+      return true;
+    }
+    for (int column = 0; column < tablet.getSchemas().size(); column++) {
+      BitMap bitMap = tablet.bitMaps[column];
+      if (bitMap == null) {
+        cnt.addAndGet(tablet.rowSize);
+      } else {
+        for (int row = 0; row < tablet.rowSize; row++) {
+          if (!bitMap.isMarked(row)) {
+            cnt.incrementAndGet();
+          }
+        }
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public void restore() throws Exception {
+    ensureSession();
+    try {
+      SessionDataSet sessionDataSet =
+          session.executeQueryStatement(
+              String.format("select last %s from %s", TARGET_SERIES, TARGET_DEVICE));
+      if (sessionDataSet.hasNext()) {
+        cnt = new AtomicLong(sessionDataSet.next().getFields().get(0).getLongV());
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Error occurred when trying to restore stateful trigger", e);
+    }
+    LOGGER.info("###### restore ##########");
+  }
+
+  @Override
+  public void onDrop() throws Exception {
+    LOGGER.info("********** onDrop() ***********");
+    if (session != null) {
+      session.close();
+      updateFuture.cancel(true);
+    }
+  }
+
+  @Override
+  public FailureStrategy getFailureStrategy() {
+    return FailureStrategy.OPTIMISTIC;
+  }
+
+  private void ensureSession() throws IoTDBConnectionException {
+    if (session == null) {
+      session = new Session.Builder().host(ip).port(port).build();
+      session.open(false);
+      updateFuture =
+          ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+              triggerInformationUpdateExecutor,
+              this::updateTask,
+              UPDATE_INTERVAL,
+              UPDATE_INTERVAL,
+              TimeUnit.MILLISECONDS);
+      LOGGER.info("Stateful-Trigger-Statistics-Updater is successfully started.");
+    }
+  }
+
+  private void updateTask() {
+    try {
+      this.session.insertRecord(
+          TARGET_DEVICE,
+          new Date().getTime(),
+          Collections.singletonList(TARGET_SERIES),
+          Collections.singletonList(TSDataType.INT64),
+          cnt.get());
+    } catch (Exception e) {
+      LOGGER.warn("Error occurred in updateTask", e);
+    }
+  }
+}
diff --git a/example/trigger/src/main/java/org/apache/iotdb/trigger/old/AlertingExample.java b/example/trigger/src/main/java/org/apache/iotdb/trigger/old/AlertingExample.java
deleted file mode 100644
index ec7fe2bce3..0000000000
--- a/example/trigger/src/main/java/org/apache/iotdb/trigger/old/AlertingExample.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.trigger.old;
-
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.engine.trigger.api.Trigger;
-import org.apache.iotdb.db.engine.trigger.sink.alertmanager.AlertManagerConfiguration;
-import org.apache.iotdb.db.engine.trigger.sink.alertmanager.AlertManagerEvent;
-import org.apache.iotdb.db.engine.trigger.sink.alertmanager.AlertManagerHandler;
-import org.apache.iotdb.trigger.api.TriggerAttributes;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-public class AlertingExample implements Trigger {
-
-  private final AlertManagerHandler alertManagerHandler = new AlertManagerHandler();
-
-  private final AlertManagerConfiguration alertManagerConfiguration =
-      new AlertManagerConfiguration("http://127.0.0.1:9093/api/v2/alerts");
-
-  private String alertname;
-
-  private final HashMap<String, String> labels = new HashMap<>();
-
-  private final HashMap<String, String> annotations = new HashMap<>();
-
-  @Override
-  public void onCreate(TriggerAttributes attributes) throws Exception {
-    alertManagerHandler.open(alertManagerConfiguration);
-
-    alertname = "alert_test";
-
-    labels.put("series", "root.ln.wf01.wt01.temperature");
-    labels.put("value", "");
-    labels.put("severity", "");
-
-    annotations.put("summary", "high temperature");
-    annotations.put("description", "{{.alertname}}: {{.series}} is {{.value}}");
-  }
-
-  @Override
-  public void onDrop() throws IOException {
-    alertManagerHandler.close();
-  }
-
-  @Override
-  public void onStart() {
-    alertManagerHandler.open(alertManagerConfiguration);
-  }
-
-  @Override
-  public void onStop() throws Exception {
-    alertManagerHandler.close();
-  }
-
-  @Override
-  public Double fire(long timestamp, Double value, PartialPath path) throws Exception {
-    if (value > 100.0) {
-      labels.put("value", String.valueOf(value));
-      labels.put("severity", "critical");
-      AlertManagerEvent alertManagerEvent = new AlertManagerEvent(alertname, labels, annotations);
-      alertManagerHandler.onEvent(alertManagerEvent);
-    } else if (value > 50.0) {
-      labels.put("value", String.valueOf(value));
-      labels.put("severity", "warning");
-      AlertManagerEvent alertManagerEvent = new AlertManagerEvent(alertname, labels, annotations);
-      alertManagerHandler.onEvent(alertManagerEvent);
-    }
-
-    return value;
-  }
-
-  @Override
-  public double[] fire(long[] timestamps, double[] values, PartialPath path) throws Exception {
-    for (double value : values) {
-      if (value > 100.0) {
-        labels.put("value", String.valueOf(value));
-        labels.put("severity", "critical");
-        AlertManagerEvent alertManagerEvent = new AlertManagerEvent(alertname, labels, annotations);
-        alertManagerHandler.onEvent(alertManagerEvent);
-      } else if (value > 50.0) {
-        labels.put("value", String.valueOf(value));
-        labels.put("severity", "warning");
-        AlertManagerEvent alertManagerEvent = new AlertManagerEvent(alertname, labels, annotations);
-        alertManagerHandler.onEvent(alertManagerEvent);
-      }
-    }
-    return values;
-  }
-}
diff --git a/integration-test/src/main/java/org/apache/iotdb/db/trigger/example/TriggerFireTimesCounter.java b/integration-test/src/main/java/org/apache/iotdb/db/trigger/example/TriggerFireTimesCounter.java
index f8358e2f46..30c58053cb 100644
--- a/integration-test/src/main/java/org/apache/iotdb/db/trigger/example/TriggerFireTimesCounter.java
+++ b/integration-test/src/main/java/org/apache/iotdb/db/trigger/example/TriggerFireTimesCounter.java
@@ -41,6 +41,8 @@ public class TriggerFireTimesCounter implements Trigger {
   private static final Logger LOGGER = LoggerFactory.getLogger(TriggerFireTimesCounter.class);
   private String TXT_PATH;
 
+  private final int LOCK_FILE_RETRY_TIME = 10;
+
   @Override
   public void onCreate(TriggerAttributes attributes) throws Exception {
     String counterName = attributes.getString("name");
@@ -70,9 +72,20 @@ public class TriggerFireTimesCounter implements Trigger {
 
   @Override
   public boolean fire(Tablet tablet) throws Exception {
-    try (FileChannel fileChannel =
-            FileChannel.open(Paths.get(TXT_PATH), StandardOpenOption.APPEND);
-        FileLock fileLock = fileChannel.tryLock()) {
+    FileLock fileLock = null;
+    FileChannel fileChannel = null;
+    int retryNum = 0;
+    try {
+      fileChannel = FileChannel.open(Paths.get(TXT_PATH), StandardOpenOption.APPEND);
+      while (fileLock == null) {
+        fileLock = fileChannel.tryLock();
+        if (fileLock == null) {
+          if (retryNum++ >= LOCK_FILE_RETRY_TIME) {
+            break;
+          }
+          Thread.sleep(100);
+        }
+      }
       int rows = tablet.rowSize;
       if (fileLock != null && fileLock.isValid()) {
         String records = System.lineSeparator() + rows;
@@ -86,6 +99,13 @@ public class TriggerFireTimesCounter implements Trigger {
     } catch (Throwable t) {
       LOGGER.warn("TriggerFireTimesCounter error", t);
       return false;
+    } finally {
+      if (fileLock != null) {
+        fileLock.close();
+      }
+      if (fileChannel != null) {
+        fileChannel.close();
+      }
     }
     return true;
   }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/trigger/IoTDBTriggerManagementIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/trigger/IoTDBTriggerManagementIT.java
index 35801a05c7..6a7d19fb61 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/trigger/IoTDBTriggerManagementIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/trigger/IoTDBTriggerManagementIT.java
@@ -62,16 +62,16 @@ public class IoTDBTriggerManagementIT {
       "org.apache.iotdb.db.trigger.example.TriggerFireTimesCounter";
 
   private static final String STATELESS_TRIGGER_BEFORE_INSERTION_PREFIX =
-      "statelessTriggerBeforeInsertion_";
+      "statelessTriggerBeforeInsertionTriggerManagement_";
 
   private static final String STATELESS_TRIGGER_AFTER_INSERTION_PREFIX =
-      "statelessTriggerAfterInsertion_";
+      "statelessTriggerAfterInsertionTriggerManagement_";
 
   private static final String STATEFUL_TRIGGER_BEFORE_INSERTION_PREFIX =
-      "statefulTriggerBeforeInsertion_";
+      "statefulTriggerBeforeInsertionTriggerManagement_";
 
   private static final String STATEFUL_TRIGGER_AFTER_INSERTION_PREFIX =
-      "statefulTriggerAfterInsertion_";
+      "statefulTriggerAfterInsertionTriggerManagement_";
 
   private static final String STATEFUL = "STATEFUL";
 
diff --git a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionInsertWithTriggerExecutionIT.java b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionInsertWithTriggerExecutionIT.java
index b0ac6ae476..3b8ebdaddf 100644
--- a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionInsertWithTriggerExecutionIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionInsertWithTriggerExecutionIT.java
@@ -67,16 +67,16 @@ public class IoTDBSessionInsertWithTriggerExecutionIT {
   private final int rows = 10;
 
   private static final String STATELESS_TRIGGER_BEFORE_INSERTION_PREFIX =
-      "statelessTriggerBeforeInsertion_";
+      "statelessTriggerBeforeInsertionSession_";
 
   private static final String STATELESS_TRIGGER_AFTER_INSERTION_PREFIX =
-      "statelessTriggerAfterInsertion_";
+      "statelessTriggerAfterInsertionSession_";
 
   private static final String STATEFUL_TRIGGER_BEFORE_INSERTION_PREFIX =
-      "statefulTriggerBeforeInsertion_";
+      "statefulTriggerBeforeInsertionSession_";
 
   private static final String STATEFUL_TRIGGER_AFTER_INSERTION_PREFIX =
-      "statefulTriggerAfterInsertion_";
+      "statefulTriggerAfterInsertionSession_";
 
   @BeforeClass
   public static void setUp() throws Exception {
@@ -93,7 +93,7 @@ public class IoTDBSessionInsertWithTriggerExecutionIT {
 
   private static void createTimeSeries() {
     try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
-      session.setStorageGroup("root.test");
+      session.createDatabase("root.test");
       session.createTimeseries(
           "root.test.stateless.a", TSDataType.INT32, TSEncoding.PLAIN, CompressionType.SNAPPY);
       session.createTimeseries(
diff --git a/integration-test/src/test/resources/TriggerFireTimesCounter.jar b/integration-test/src/test/resources/TriggerFireTimesCounter.jar
index 1d4d1f8e13..dc585f9f04 100644
Binary files a/integration-test/src/test/resources/TriggerFireTimesCounter.jar and b/integration-test/src/test/resources/TriggerFireTimesCounter.jar differ