You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by le...@apache.org on 2022/01/29 16:30:40 UTC
[iotdb] branch kyy-2022 updated: add MAC implemented using UDTF
This is an automated email from the ASF dual-hosted git repository.
leirui pushed a commit to branch kyy-2022
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/kyy-2022 by this push:
new 37dbfbd add MAC implemented using UDTF
37dbfbd is described below
commit 37dbfbd19fb336c383e2963e3a3fcfac77a6032d
Author: Lei Rui <10...@qq.com>
AuthorDate: Sun Jan 30 00:19:42 2022 +0800
add MAC implemented using UDTF
---
.../db/query/udf/builtin/BuiltinFunction.java | 1 +
.../iotdb/db/query/udf/builtin/UDTFM4MAC.java | 462 +++++++++++++++++++++
.../apache/iotdb/db/integration/m4/MyTestM4.java | 209 ++++++++++
3 files changed, 672 insertions(+)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java
index 92e25f8..fc8da6b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java
@@ -47,6 +47,7 @@ public enum BuiltinFunction {
NON_NEGATIVE_DERIVATIVE("NON_NEGATIVE_DERIVATIVE", UDTFNonNegativeDerivative.class),
TOP_K("TOP_K", UDTFTopK.class),
BOTTOM_K("BOTTOM_K", UDTFBottomK.class),
+ M4("M4", UDTFM4MAC.class),
;
private final String functionName;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4MAC.java b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4MAC.java
new file mode 100644
index 0000000..8d7207d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFM4MAC.java
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.udf.builtin;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.udf.api.UDTF;
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.exception.UDFException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.io.IOException;
+
+// The integration test for MAC is in org.apache.iotdb.db.integration.m4.MyTestM4.testMAC()
+public class UDTFM4MAC implements UDTF {
+
+ protected TSDataType dataType;
+ protected long tqs;
+ protected long tqe;
+ protected int w;
+
+ private long minTime;
+ private long maxTime;
+
+ private long bottomTime;
+ private long topTime;
+
+ private int intMaxV;
+ private long longMaxV;
+ private float floatMaxV;
+ private double doubleMaxV;
+
+ private int intMinV;
+ private long longMinV;
+ private float floatMinV;
+ private double doubleMinV;
+
+ private int intFirstV;
+ private long longFirstV;
+ private float floatFirstV;
+ private double doubleFirstV;
+
+ private int intLastV;
+ private long longLastV;
+ private float floatLastV;
+ private double doubleLastV;
+
+ private String[] result;
+ private int idx;
+
+ private void init() {
+ this.minTime = Long.MAX_VALUE;
+ this.maxTime = Long.MIN_VALUE;
+
+ this.intFirstV = 0;
+ this.longFirstV = 0;
+ this.floatFirstV = 0;
+ this.doubleFirstV = 0;
+
+ this.intLastV = 0;
+ this.longLastV = 0;
+ this.floatLastV = 0;
+ this.doubleLastV = 0;
+
+ this.bottomTime = 0;
+ this.topTime = 0;
+
+ this.intMinV = Integer.MAX_VALUE;
+ this.longMinV = Long.MAX_VALUE;
+ this.floatMinV = Float.MAX_VALUE;
+ this.doubleMinV = Double.MAX_VALUE;
+
+ this.intMaxV = Integer.MIN_VALUE;
+ this.longMaxV = Long.MIN_VALUE;
+ this.floatMaxV = Float.MIN_VALUE;
+ this.doubleMaxV = Double.MIN_VALUE;
+ }
+
+ @Override
+ public void validate(UDFParameterValidator validator) throws UDFException {
+ validator
+ .validateInputSeriesNumber(1)
+ .validateInputSeriesDataType(
+ 0, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE)
+ .validateRequiredAttribute("tqs")
+ .validateRequiredAttribute("tqe")
+ .validateRequiredAttribute("w");
+ }
+
+ @Override
+ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+ throws MetadataException {
+ dataType = parameters.getDataType(0);
+ tqs = parameters.getLong("tqs"); // closed
+ tqe = parameters.getLong("tqe"); // open
+ w = parameters.getInt("w");
+ if ((tqe - tqs) % w != 0) {
+ throw new MetadataException("You should make tqe-tqs integer divide w");
+ }
+ configurations
+ .setAccessStrategy(new RowByRowAccessStrategy())
+ .setOutputDataType(TSDataType.TEXT);
+ init();
+ this.idx = 0;
+ result = new String[w];
+ for (int i = 0; i < w; i++) {
+ result[i] = "empty";
+ }
+ }
+
+ @Override
+ public void transform(Row row, PointCollector collector)
+ throws QueryProcessException, IOException {
+ switch (dataType) {
+ case INT32:
+ transformInt(row.getTime(), row.getInt(0));
+ break;
+ case INT64:
+ transformLong(row.getTime(), row.getLong(0));
+ break;
+ case FLOAT:
+ transformFloat(row.getTime(), row.getFloat(0));
+ break;
+ case DOUBLE:
+ transformDouble(row.getTime(), row.getDouble(0));
+ break;
+ default:
+ break;
+ }
+ }
+
+ protected void transformInt(long time, int value) throws IOException {
+ long intervalLen = (tqe - tqs) / w;
+ int pos = (int) Math.floor((time - tqs) * 1.0 / intervalLen);
+ if (pos >= w) {
+ throw new IOException("Make sure the range time filter is time>=tqs and time<tqe");
+ }
+
+ if (pos > idx) {
+ result[idx] =
+ "FirstPoint=("
+ + minTime
+ + ","
+ + intFirstV
+ + "), "
+ + "LastPoint=("
+ + maxTime
+ + ","
+ + intLastV
+ + "), "
+ + "BottomPoint=("
+ + bottomTime
+ + ","
+ + intMinV
+ + "), "
+ + "TopPoint=("
+ + topTime
+ + ","
+ + intMaxV
+ + ")";
+ idx = pos;
+ init(); // clear environment for this new interval
+ }
+ // update for the current interval
+ if (time < minTime) {
+ minTime = time;
+ intFirstV = value;
+ }
+ if (time > maxTime) {
+ maxTime = time;
+ intLastV = value;
+ }
+ if (value < intMinV) {
+ bottomTime = time;
+ intMinV = value;
+ }
+ if (value > intMaxV) {
+ topTime = time;
+ intMaxV = value;
+ }
+ }
+
+ protected void transformLong(long time, long value) throws IOException {
+ long intervalLen = (tqe - tqs) / w;
+ int pos = (int) Math.floor((time - tqs) * 1.0 / intervalLen);
+
+ if (pos >= w) {
+ throw new IOException("Make sure the range time filter is time>=tqs and time<tqe");
+ }
+
+ if (pos > idx) {
+ result[idx] =
+ "FirstPoint=("
+ + minTime
+ + ","
+ + longFirstV
+ + "), "
+ + "LastPoint=("
+ + maxTime
+ + ","
+ + longLastV
+ + "), "
+ + "BottomPoint=("
+ + bottomTime
+ + ","
+ + longMinV
+ + "), "
+ + "TopPoint=("
+ + topTime
+ + ","
+ + longMaxV
+ + ")";
+ idx = pos;
+ init(); // clear environment for this new interval
+ }
+ if (time < minTime) {
+ minTime = time;
+ longFirstV = value;
+ }
+ if (time > maxTime) {
+ maxTime = time;
+ longLastV = value;
+ }
+ if (value < longMinV) {
+ bottomTime = time;
+ longMinV = value;
+ }
+ if (value > longMaxV) {
+ topTime = time;
+ longMaxV = value;
+ }
+ }
+
+ protected void transformFloat(long time, float value) throws IOException {
+ long intervalLen = (tqe - tqs) / w;
+ int pos = (int) Math.floor((time - tqs) * 1.0 / intervalLen);
+
+ if (pos >= w) {
+ throw new IOException("Make sure the range time filter is time>=tqs and time<tqe");
+ }
+
+ if (pos > idx) {
+ result[idx] =
+ "FirstPoint=("
+ + minTime
+ + ","
+ + floatFirstV
+ + "), "
+ + "LastPoint=("
+ + maxTime
+ + ","
+ + floatLastV
+ + "), "
+ + "BottomPoint=("
+ + bottomTime
+ + ","
+ + floatMinV
+ + "), "
+ + "TopPoint=("
+ + topTime
+ + ","
+ + floatMaxV
+ + ")";
+ idx = pos;
+ init(); // clear environment for this new interval
+ }
+ if (time < minTime) {
+ minTime = time;
+ floatFirstV = value;
+ }
+ if (time > maxTime) {
+ maxTime = time;
+ floatLastV = value;
+ }
+ if (value < floatMinV) {
+ bottomTime = time;
+ floatMinV = value;
+ }
+ if (value > floatMaxV) {
+ topTime = time;
+ floatMaxV = value;
+ }
+ }
+
+ protected void transformDouble(long time, double value) throws IOException {
+ long intervalLen = (tqe - tqs) / w;
+ int pos = (int) Math.floor((time - tqs) * 1.0 / intervalLen);
+
+ if (pos >= w) {
+ throw new IOException("Make sure the range time filter is time>=tqs and time<tqe");
+ }
+
+ if (pos > idx) {
+ result[idx] =
+ "FirstPoint=("
+ + minTime
+ + ","
+ + doubleFirstV
+ + "), "
+ + "LastPoint=("
+ + maxTime
+ + ","
+ + doubleLastV
+ + "), "
+ + "BottomPoint=("
+ + bottomTime
+ + ","
+ + doubleMinV
+ + "), "
+ + "TopPoint=("
+ + topTime
+ + ","
+ + doubleMaxV
+ + ")";
+ idx = pos;
+ init(); // clear environment for this new interval
+ }
+ if (time < minTime) {
+ minTime = time;
+ doubleFirstV = value;
+ }
+ if (time > maxTime) {
+ maxTime = time;
+ doubleLastV = value;
+ }
+ if (value < doubleMinV) {
+ bottomTime = time;
+ doubleMinV = value;
+ }
+ if (value > doubleMaxV) {
+ topTime = time;
+ doubleMaxV = value;
+ }
+ }
+
+ @Override
+ public void terminate(PointCollector collector) throws IOException, QueryProcessException {
+ // record the last interval (not necessarily idx=w-1) in the transform stage
+ switch (dataType) {
+ case INT32:
+ result[idx] =
+ "FirstPoint=("
+ + minTime
+ + ","
+ + intFirstV
+ + "), "
+ + "LastPoint=("
+ + maxTime
+ + ","
+ + intLastV
+ + "), "
+ + "BottomPoint=("
+ + bottomTime
+ + ","
+ + intMinV
+ + "), "
+ + "TopPoint=("
+ + topTime
+ + ","
+ + intMaxV
+ + ")";
+ break;
+ case INT64:
+ result[idx] =
+ "FirstPoint=("
+ + minTime
+ + ","
+ + longFirstV
+ + "), "
+ + "LastPoint=("
+ + maxTime
+ + ","
+ + longLastV
+ + "), "
+ + "BottomPoint=("
+ + bottomTime
+ + ","
+ + longMinV
+ + "), "
+ + "TopPoint=("
+ + topTime
+ + ","
+ + longMaxV
+ + ")";
+ break;
+ case FLOAT:
+ result[idx] =
+ "FirstPoint=("
+ + minTime
+ + ","
+ + floatFirstV
+ + "), "
+ + "LastPoint=("
+ + maxTime
+ + ","
+ + floatLastV
+ + "), "
+ + "BottomPoint=("
+ + bottomTime
+ + ","
+ + floatMinV
+ + "), "
+ + "TopPoint=("
+ + topTime
+ + ","
+ + floatMaxV
+ + ")";
+ break;
+ case DOUBLE:
+ result[idx] =
+ "FirstPoint=("
+ + minTime
+ + ","
+ + doubleFirstV
+ + "), "
+ + "LastPoint=("
+ + maxTime
+ + ","
+ + doubleLastV
+ + "), "
+ + "BottomPoint=("
+ + bottomTime
+ + ","
+ + doubleMinV
+ + "), "
+ + "TopPoint=("
+ + topTime
+ + ","
+ + doubleMaxV
+ + ")";
+ break;
+ default:
+ break;
+ }
+ // collect result
+ for (int i = 0; i < w; i++) {
+ long startInterval = tqs + (tqe - tqs) / w * i;
+ collector.putString(startInterval, result[i]);
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTestM4.java b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTestM4.java
new file mode 100644
index 0000000..d2aabb8
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTestM4.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration.m4;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Locale;
+
+import static org.junit.Assert.fail;
+
+public class MyTestM4 {
+
+ private static final String TIMESTAMP_STR = "Time";
+
+ private static String[] creationSqls =
+ new String[] {
+ "SET STORAGE GROUP TO root.vehicle.d0",
+ "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=FLOAT, ENCODING=RLE",
+ };
+
+ private final String d0s0 = "root.vehicle.d0.s0";
+
+ private static final String insertTemplate =
+ "INSERT INTO root.vehicle.d0(timestamp,s0)" + " VALUES(%d,%d)";
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+ EnvironmentUtils.envSetUp();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ prepareData1();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+ }
+
+ /**
+ * MAC: merge all chunks. Use UDTF to mimic the process of merging all chunks to calculate
+ * aggregation points.
+ */
+ @Test
+ public void testMAC() {
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ "create function M4 as \"org.apache.iotdb.db.query.udf.builtin.UDTFM4MAC\"");
+
+ long tqs = 0L;
+ long tqe = 100L;
+ int w = 4;
+ boolean hasResultSet =
+ statement.execute(
+ String.format(
+ "select M4(s0,'tqs'='%1$d','tqe'='%2$d','w'='%3$d') from root.vehicle.d0 where "
+ + "time>=%1$d and time<%2$d",
+ tqs, tqe, w));
+
+ Assert.assertTrue(hasResultSet);
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(
+ "M4(root.vehicle.d0.s0, \"tqs\"=\"0\", \"tqe\"=\"100\", \"w\"=\"4\")");
+ System.out.println(ans);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /** MOC: merge overlapping chunks. This is what IoTDB does. */
+ @Test
+ public void testMOC() {
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "SELECT min_time(s0),first_value(s0),"
+ + "max_time(s0), last_value(s0),"
+ + "max_value(s0), min_value(s0)"
+ + " FROM root.vehicle.d0 group by ([0,100),25ms)");
+
+ Assert.assertTrue(hasResultSet);
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(String.format("min_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("first_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("last_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("min_value(%s)", d0s0));
+ System.out.println(ans);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * The data written is shown in figure:
+ * https://user-images.githubusercontent.com/33376433/151664843-6afcb40d-fe6e-4ea8-bdd6-efe467f40c1c.png
+ */
+ private static void prepareData1() {
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ for (String sql : creationSqls) {
+ statement.execute(sql);
+ }
+
+ // seq1
+ for (int i = 0; i <= 99; i++) {
+ statement.addBatch(String.format(Locale.ENGLISH, insertTemplate, i, i));
+ }
+ statement.executeBatch();
+ statement.clearBatch();
+ statement.execute("FLUSH");
+
+ // unseq2
+ for (int i = 10; i <= 28; i++) {
+ statement.addBatch(String.format(Locale.ENGLISH, insertTemplate, i, i));
+ }
+ statement.executeBatch();
+ statement.clearBatch();
+ statement.execute("FLUSH");
+
+ // unseq3
+ for (int i = 29; i <= 36; i++) {
+ statement.addBatch(String.format(Locale.ENGLISH, insertTemplate, i, i));
+ }
+ statement.executeBatch();
+ statement.clearBatch();
+ statement.execute("FLUSH");
+
+ // unseq4
+ for (int i = 37; i <= 60; i++) {
+ statement.addBatch(String.format(Locale.ENGLISH, insertTemplate, i, i));
+ }
+ statement.executeBatch();
+ statement.clearBatch();
+ statement.execute("FLUSH");
+
+ statement.execute("delete from root.vehicle.d0.s0 where time>=26 and time<=27");
+ statement.execute("delete from root.vehicle.d0.s0 where time>=35 and time<=40");
+ statement.execute("delete from root.vehicle.d0.s0 where time>=48 and time<=75");
+ statement.execute("delete from root.vehicle.d0.s0 where time>=85");
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}