You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by le...@apache.org on 2022/01/29 16:30:40 UTC

[iotdb] branch kyy-2022 updated: add MAC implemented using UDTF

This is an automated email from the ASF dual-hosted git repository.

leirui pushed a commit to branch kyy-2022
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/kyy-2022 by this push:
     new 37dbfbd  add MAC implemented using UDTF
37dbfbd is described below

commit 37dbfbd19fb336c383e2963e3a3fcfac77a6032d
Author: Lei Rui <10...@qq.com>
AuthorDate: Sun Jan 30 00:19:42 2022 +0800

    add MAC implemented using UDTF
---
 .../db/query/udf/builtin/BuiltinFunction.java      |   1 +
 .../iotdb/db/query/udf/builtin/UDTFM4MAC.java      | 462 +++++++++++++++++++++
 .../apache/iotdb/db/integration/m4/MyTestM4.java   | 209 ++++++++++
 3 files changed, 672 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java
index 92e25f8..fc8da6b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java
@@ -47,6 +47,7 @@ public enum BuiltinFunction {
   NON_NEGATIVE_DERIVATIVE("NON_NEGATIVE_DERIVATIVE", UDTFNonNegativeDerivative.class),
   TOP_K("TOP_K", UDTFTopK.class),
   BOTTOM_K("BOTTOM_K", UDTFBottomK.class),
+  M4("M4", UDTFM4MAC.class),
   ;
 
   private final String functionName;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4MAC.java b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4MAC.java
new file mode 100644
index 0000000..8d7207d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4MAC.java
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.udf.builtin;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.udf.api.UDTF;
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.exception.UDFException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.io.IOException;
+
+// The integration test for MAC is in org.apache.iotdb.db.integration.m4.MyTestM4.testMAC()
+public class UDTFM4MAC implements UDTF {
+
+  protected TSDataType dataType;
+  protected long tqs;
+  protected long tqe;
+  protected int w;
+
+  private long minTime;
+  private long maxTime;
+
+  private long bottomTime;
+  private long topTime;
+
+  private int intMaxV;
+  private long longMaxV;
+  private float floatMaxV;
+  private double doubleMaxV;
+
+  private int intMinV;
+  private long longMinV;
+  private float floatMinV;
+  private double doubleMinV;
+
+  private int intFirstV;
+  private long longFirstV;
+  private float floatFirstV;
+  private double doubleFirstV;
+
+  private int intLastV;
+  private long longLastV;
+  private float floatLastV;
+  private double doubleLastV;
+
+  private String[] result;
+  private int idx;
+
+  private void init() {
+    this.minTime = Long.MAX_VALUE;
+    this.maxTime = Long.MIN_VALUE;
+
+    this.intFirstV = 0;
+    this.longFirstV = 0;
+    this.floatFirstV = 0;
+    this.doubleFirstV = 0;
+
+    this.intLastV = 0;
+    this.longLastV = 0;
+    this.floatLastV = 0;
+    this.doubleLastV = 0;
+
+    this.bottomTime = 0;
+    this.topTime = 0;
+
+    this.intMinV = Integer.MAX_VALUE;
+    this.longMinV = Long.MAX_VALUE;
+    this.floatMinV = Float.MAX_VALUE;
+    this.doubleMinV = Double.MAX_VALUE;
+
+    this.intMaxV = Integer.MIN_VALUE;
+    this.longMaxV = Long.MIN_VALUE;
+    this.floatMaxV = Float.MIN_VALUE;
+    this.doubleMaxV = Double.MIN_VALUE;
+  }
+
+  @Override
+  public void validate(UDFParameterValidator validator) throws UDFException {
+    validator
+        .validateInputSeriesNumber(1)
+        .validateInputSeriesDataType(
+            0, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE)
+        .validateRequiredAttribute("tqs")
+        .validateRequiredAttribute("tqe")
+        .validateRequiredAttribute("w");
+  }
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws MetadataException {
+    dataType = parameters.getDataType(0);
+    tqs = parameters.getLong("tqs"); // closed
+    tqe = parameters.getLong("tqe"); // open
+    w = parameters.getInt("w");
+    if ((tqe - tqs) % w != 0) {
+      throw new MetadataException("You should make tqe-tqs integer divide w");
+    }
+    configurations
+        .setAccessStrategy(new RowByRowAccessStrategy())
+        .setOutputDataType(TSDataType.TEXT);
+    init();
+    this.idx = 0;
+    result = new String[w];
+    for (int i = 0; i < w; i++) {
+      result[i] = "empty";
+    }
+  }
+
+  @Override
+  public void transform(Row row, PointCollector collector)
+      throws QueryProcessException, IOException {
+    switch (dataType) {
+      case INT32:
+        transformInt(row.getTime(), row.getInt(0));
+        break;
+      case INT64:
+        transformLong(row.getTime(), row.getLong(0));
+        break;
+      case FLOAT:
+        transformFloat(row.getTime(), row.getFloat(0));
+        break;
+      case DOUBLE:
+        transformDouble(row.getTime(), row.getDouble(0));
+        break;
+      default:
+        break;
+    }
+  }
+
+  protected void transformInt(long time, int value) throws IOException {
+    long intervalLen = (tqe - tqs) / w;
+    int pos = (int) Math.floor((time - tqs) * 1.0 / intervalLen);
+    if (pos >= w) {
+      throw new IOException("Make sure the range time filter is time>=tqs and time<tqe");
+    }
+
+    if (pos > idx) {
+      result[idx] =
+          "FirstPoint=("
+              + minTime
+              + ","
+              + intFirstV
+              + "), "
+              + "LastPoint=("
+              + maxTime
+              + ","
+              + intLastV
+              + "), "
+              + "BottomPoint=("
+              + bottomTime
+              + ","
+              + intMinV
+              + "), "
+              + "TopPoint=("
+              + topTime
+              + ","
+              + intMaxV
+              + ")";
+      idx = pos;
+      init(); // clear environment for this new interval
+    }
+    // update for the current interval
+    if (time < minTime) {
+      minTime = time;
+      intFirstV = value;
+    }
+    if (time > maxTime) {
+      maxTime = time;
+      intLastV = value;
+    }
+    if (value < intMinV) {
+      bottomTime = time;
+      intMinV = value;
+    }
+    if (value > intMaxV) {
+      topTime = time;
+      intMaxV = value;
+    }
+  }
+
+  protected void transformLong(long time, long value) throws IOException {
+    long intervalLen = (tqe - tqs) / w;
+    int pos = (int) Math.floor((time - tqs) * 1.0 / intervalLen);
+
+    if (pos >= w) {
+      throw new IOException("Make sure the range time filter is time>=tqs and time<tqe");
+    }
+
+    if (pos > idx) {
+      result[idx] =
+          "FirstPoint=("
+              + minTime
+              + ","
+              + longFirstV
+              + "), "
+              + "LastPoint=("
+              + maxTime
+              + ","
+              + longLastV
+              + "), "
+              + "BottomPoint=("
+              + bottomTime
+              + ","
+              + longMinV
+              + "), "
+              + "TopPoint=("
+              + topTime
+              + ","
+              + longMaxV
+              + ")";
+      idx = pos;
+      init(); // clear environment for this new interval
+    }
+    if (time < minTime) {
+      minTime = time;
+      longFirstV = value;
+    }
+    if (time > maxTime) {
+      maxTime = time;
+      longLastV = value;
+    }
+    if (value < longMinV) {
+      bottomTime = time;
+      longMinV = value;
+    }
+    if (value > longMaxV) {
+      topTime = time;
+      longMaxV = value;
+    }
+  }
+
+  protected void transformFloat(long time, float value) throws IOException {
+    long intervalLen = (tqe - tqs) / w;
+    int pos = (int) Math.floor((time - tqs) * 1.0 / intervalLen);
+
+    if (pos >= w) {
+      throw new IOException("Make sure the range time filter is time>=tqs and time<tqe");
+    }
+
+    if (pos > idx) {
+      result[idx] =
+          "FirstPoint=("
+              + minTime
+              + ","
+              + floatFirstV
+              + "), "
+              + "LastPoint=("
+              + maxTime
+              + ","
+              + floatLastV
+              + "), "
+              + "BottomPoint=("
+              + bottomTime
+              + ","
+              + floatMinV
+              + "), "
+              + "TopPoint=("
+              + topTime
+              + ","
+              + floatMaxV
+              + ")";
+      idx = pos;
+      init(); // clear environment for this new interval
+    }
+    if (time < minTime) {
+      minTime = time;
+      floatFirstV = value;
+    }
+    if (time > maxTime) {
+      maxTime = time;
+      floatLastV = value;
+    }
+    if (value < floatMinV) {
+      bottomTime = time;
+      floatMinV = value;
+    }
+    if (value > floatMaxV) {
+      topTime = time;
+      floatMaxV = value;
+    }
+  }
+
+  protected void transformDouble(long time, double value) throws IOException {
+    long intervalLen = (tqe - tqs) / w;
+    int pos = (int) Math.floor((time - tqs) * 1.0 / intervalLen);
+
+    if (pos >= w) {
+      throw new IOException("Make sure the range time filter is time>=tqs and time<tqe");
+    }
+
+    if (pos > idx) {
+      result[idx] =
+          "FirstPoint=("
+              + minTime
+              + ","
+              + doubleFirstV
+              + "), "
+              + "LastPoint=("
+              + maxTime
+              + ","
+              + doubleLastV
+              + "), "
+              + "BottomPoint=("
+              + bottomTime
+              + ","
+              + doubleMinV
+              + "), "
+              + "TopPoint=("
+              + topTime
+              + ","
+              + doubleMaxV
+              + ")";
+      idx = pos;
+      init(); // clear environment for this new interval
+    }
+    if (time < minTime) {
+      minTime = time;
+      doubleFirstV = value;
+    }
+    if (time > maxTime) {
+      maxTime = time;
+      doubleLastV = value;
+    }
+    if (value < doubleMinV) {
+      bottomTime = time;
+      doubleMinV = value;
+    }
+    if (value > doubleMaxV) {
+      topTime = time;
+      doubleMaxV = value;
+    }
+  }
+
+  @Override
+  public void terminate(PointCollector collector) throws IOException, QueryProcessException {
+    // record the last interval (not necessarily idx=w-1) in the transform stage
+    switch (dataType) {
+      case INT32:
+        result[idx] =
+            "FirstPoint=("
+                + minTime
+                + ","
+                + intFirstV
+                + "), "
+                + "LastPoint=("
+                + maxTime
+                + ","
+                + intLastV
+                + "), "
+                + "BottomPoint=("
+                + bottomTime
+                + ","
+                + intMinV
+                + "), "
+                + "TopPoint=("
+                + topTime
+                + ","
+                + intMaxV
+                + ")";
+        break;
+      case INT64:
+        result[idx] =
+            "FirstPoint=("
+                + minTime
+                + ","
+                + longFirstV
+                + "), "
+                + "LastPoint=("
+                + maxTime
+                + ","
+                + longLastV
+                + "), "
+                + "BottomPoint=("
+                + bottomTime
+                + ","
+                + longMinV
+                + "), "
+                + "TopPoint=("
+                + topTime
+                + ","
+                + longMaxV
+                + ")";
+        break;
+      case FLOAT:
+        result[idx] =
+            "FirstPoint=("
+                + minTime
+                + ","
+                + floatFirstV
+                + "), "
+                + "LastPoint=("
+                + maxTime
+                + ","
+                + floatLastV
+                + "), "
+                + "BottomPoint=("
+                + bottomTime
+                + ","
+                + floatMinV
+                + "), "
+                + "TopPoint=("
+                + topTime
+                + ","
+                + floatMaxV
+                + ")";
+        break;
+      case DOUBLE:
+        result[idx] =
+            "FirstPoint=("
+                + minTime
+                + ","
+                + doubleFirstV
+                + "), "
+                + "LastPoint=("
+                + maxTime
+                + ","
+                + doubleLastV
+                + "), "
+                + "BottomPoint=("
+                + bottomTime
+                + ","
+                + doubleMinV
+                + "), "
+                + "TopPoint=("
+                + topTime
+                + ","
+                + doubleMaxV
+                + ")";
+        break;
+      default:
+        break;
+    }
+    // collect result
+    for (int i = 0; i < w; i++) {
+      long startInterval = tqs + (tqe - tqs) / w * i;
+      collector.putString(startInterval, result[i]);
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTestM4.java b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTestM4.java
new file mode 100644
index 0000000..d2aabb8
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTestM4.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration.m4;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Locale;
+
+import static org.junit.Assert.fail;
+
+public class MyTestM4 {
+
+  private static final String TIMESTAMP_STR = "Time";
+
+  private static String[] creationSqls =
+      new String[] {
+        "SET STORAGE GROUP TO root.vehicle.d0",
+        "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=FLOAT, ENCODING=RLE",
+      };
+
+  private final String d0s0 = "root.vehicle.d0.s0";
+
+  private static final String insertTemplate =
+      "INSERT INTO root.vehicle.d0(timestamp,s0)" + " VALUES(%d,%d)";
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+    EnvironmentUtils.envSetUp();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    prepareData1();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+  }
+
+  /**
+   * MAC: merge all chunks. Use UDTF to mimic the process of merging all chunks to calculate
+   * aggregation points.
+   */
+  @Test
+  public void testMAC() {
+    try (Connection connection =
+            DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      statement.execute(
+          "create function M4 as \"org.apache.iotdb.db.query.udf.builtin.UDTFM4MAC\"");
+
+      long tqs = 0L;
+      long tqe = 100L;
+      int w = 4;
+      boolean hasResultSet =
+          statement.execute(
+              String.format(
+                  "select M4(s0,'tqs'='%1$d','tqe'='%2$d','w'='%3$d') from root.vehicle.d0 where "
+                      + "time>=%1$d and time<%2$d",
+                  tqs, tqe, w));
+
+      Assert.assertTrue(hasResultSet);
+      int cnt;
+      try (ResultSet resultSet = statement.getResultSet()) {
+        cnt = 0;
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR)
+                  + ","
+                  + resultSet.getString(
+                      "M4(root.vehicle.d0.s0, \"tqs\"=\"0\", \"tqe\"=\"100\", \"w\"=\"4\")");
+          System.out.println(ans);
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  /** MOC: merge overlapping chunks. This is what IoTDB does. */
+  @Test
+  public void testMOC() {
+    try (Connection connection =
+            DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      boolean hasResultSet =
+          statement.execute(
+              "SELECT min_time(s0),first_value(s0),"
+                  + "max_time(s0), last_value(s0),"
+                  + "max_value(s0), min_value(s0)"
+                  + " FROM root.vehicle.d0 group by ([0,100),25ms)");
+
+      Assert.assertTrue(hasResultSet);
+      int cnt;
+      try (ResultSet resultSet = statement.getResultSet()) {
+        cnt = 0;
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR)
+                  + ","
+                  + resultSet.getString(String.format("min_time(%s)", d0s0))
+                  + ","
+                  + resultSet.getString(String.format("first_value(%s)", d0s0))
+                  + ","
+                  + resultSet.getString(String.format("max_time(%s)", d0s0))
+                  + ","
+                  + resultSet.getString(String.format("last_value(%s)", d0s0))
+                  + ","
+                  + resultSet.getString(String.format("max_value(%s)", d0s0))
+                  + ","
+                  + resultSet.getString(String.format("min_value(%s)", d0s0));
+          System.out.println(ans);
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  /**
+   * The data written is shown in figure:
+   * https://user-images.githubusercontent.com/33376433/151664843-6afcb40d-fe6e-4ea8-bdd6-efe467f40c1c.png
+   */
+  private static void prepareData1() {
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      for (String sql : creationSqls) {
+        statement.execute(sql);
+      }
+
+      // seq1
+      for (int i = 0; i <= 99; i++) {
+        statement.addBatch(String.format(Locale.ENGLISH, insertTemplate, i, i));
+      }
+      statement.executeBatch();
+      statement.clearBatch();
+      statement.execute("FLUSH");
+
+      // unseq2
+      for (int i = 10; i <= 28; i++) {
+        statement.addBatch(String.format(Locale.ENGLISH, insertTemplate, i, i));
+      }
+      statement.executeBatch();
+      statement.clearBatch();
+      statement.execute("FLUSH");
+
+      // unseq3
+      for (int i = 29; i <= 36; i++) {
+        statement.addBatch(String.format(Locale.ENGLISH, insertTemplate, i, i));
+      }
+      statement.executeBatch();
+      statement.clearBatch();
+      statement.execute("FLUSH");
+
+      // unseq4
+      for (int i = 37; i <= 60; i++) {
+        statement.addBatch(String.format(Locale.ENGLISH, insertTemplate, i, i));
+      }
+      statement.executeBatch();
+      statement.clearBatch();
+      statement.execute("FLUSH");
+
+      statement.execute("delete from root.vehicle.d0.s0 where time>=26 and time<=27");
+      statement.execute("delete from root.vehicle.d0.s0 where time>=35 and time<=40");
+      statement.execute("delete from root.vehicle.d0.s0 where time>=48 and time<=75");
+      statement.execute("delete from root.vehicle.d0.s0 where time>=85");
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+}