You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/03 11:32:14 UTC
[iotdb] branch master updated: [IOTDB-3973] Implement a new UDF named 'deDup' to only keep distinct … (#6874)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8f12321e75 [IOTDB-3973] Implement a new UDF named 'deDup' to only keep distinct … (#6874)
8f12321e75 is described below
commit 8f12321e75c1778ad1a9c4a9f9b7527893f3cf58
Author: Weihao Li <60...@users.noreply.github.com>
AuthorDate: Wed Aug 3 19:32:07 2022 +0800
[IOTDB-3973] Implement a new UDF named 'deDup' to only keep distinct … (#6874)
---
docs/UserGuide/UDF-Library/Series-Processing.md | 70 +++++++++++
docs/zh/UserGuide/UDF-Library/Series-Processing.md | 69 ++++++++++
.../BuiltinTimeSeriesGeneratingFunctionEnum.java | 1 +
.../db/it/udf/IoTDBUDTFBuiltinFunctionIT.java | 89 +++++++++++--
.../BuiltinTimeSeriesGeneratingFunction.java | 1 +
.../commons/udf/builtin/UDTFDeDuplication.java | 139 +++++++++++++++++++++
6 files changed, 356 insertions(+), 13 deletions(-)
diff --git a/docs/UserGuide/UDF-Library/Series-Processing.md b/docs/UserGuide/UDF-Library/Series-Processing.md
new file mode 100644
index 0000000000..e3be87f677
--- /dev/null
+++ b/docs/UserGuide/UDF-Library/Series-Processing.md
@@ -0,0 +1,70 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+# Series Processing
+
+## DEDUP
+
+### Usage
+
+This function is used to remove consecutive identical values from an input sequence.
+For example, input:`1,1,2,2,3` output:`1,2,3`.
+
+**Name:** DEDUP
+
+**Input Series:** Support only one input series.
+
+**Parameters:** No parameters.
+
+### Example
+
+Raw data:
+
+```
++-----------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
+| Time|root.testDeDup.d1.s1|root.testDeDup.d1.s2|root.testDeDup.d1.s3|root.testDeDup.d1.s4|root.testDeDup.d1.s5|root.testDeDup.d1.s6|
++-----------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
+|1970-01-01T08:00:00.001+08:00| true| 1| 1| 1.0| 1.0| 1test1|
+|1970-01-01T08:00:00.002+08:00| true| 2| 2| 2.0| 1.0| 2test2|
+|1970-01-01T08:00:00.003+08:00| false| 1| 2| 1.0| 1.0| 2test2|
+|1970-01-01T08:00:00.004+08:00| true| 1| 3| 1.0| 1.0| 1test1|
+|1970-01-01T08:00:00.005+08:00| true| 1| 3| 1.0| 1.0| 1test1|
++-----------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
+```
+
+SQL for query:
+
+```sql
+select deDup(s1), deDup(s2), deDup(s3), deDup(s4), deDup(s5), deDup(s6) from root.testDeDup.d1
+```
+
+Output series:
+
+```
++-----------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
+| Time|deDup(root.testDeDup.d1.s1)|deDup(root.testDeDup.d1.s2)|deDup(root.testDeDup.d1.s3)|deDup(root.testDeDup.d1.s4)|deDup(root.testDeDup.d1.s5)|deDup(root.testDeDup.d1.s6)|
++-----------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
+|1970-01-01T08:00:00.001+08:00| true| 1| 1| 1.0| 1.0| 1test1|
+|1970-01-01T08:00:00.002+08:00| null| 2| 2| 2.0| null| 2test2|
+|1970-01-01T08:00:00.003+08:00| false| 1| null| 1.0| null| null|
+|1970-01-01T08:00:00.004+08:00| true| null| 3| null| null| 1test1|
++-----------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
+```
\ No newline at end of file
diff --git a/docs/zh/UserGuide/UDF-Library/Series-Processing.md b/docs/zh/UserGuide/UDF-Library/Series-Processing.md
new file mode 100644
index 0000000000..677606df4e
--- /dev/null
+++ b/docs/zh/UserGuide/UDF-Library/Series-Processing.md
@@ -0,0 +1,69 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+# 序列处理
+
+## DEDUP
+
+### 函数简介
+
+本函数用于去除输入序列中的连续相同值。如输入序列`1,1,2,2,3`输出序列为`1,2,3`。
+
+**函数名:** DEDUP
+
+**输入序列:** 仅支持输入1个序列。
+
+**参数:** 无
+
+### 使用示例
+
+原始数据:
+
+```
++-----------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
+| Time|root.testDeDup.d1.s1|root.testDeDup.d1.s2|root.testDeDup.d1.s3|root.testDeDup.d1.s4|root.testDeDup.d1.s5|root.testDeDup.d1.s6|
++-----------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
+|1970-01-01T08:00:00.001+08:00| true| 1| 1| 1.0| 1.0| 1test1|
+|1970-01-01T08:00:00.002+08:00| true| 2| 2| 2.0| 1.0| 2test2|
+|1970-01-01T08:00:00.003+08:00| false| 1| 2| 1.0| 1.0| 2test2|
+|1970-01-01T08:00:00.004+08:00| true| 1| 3| 1.0| 1.0| 1test1|
+|1970-01-01T08:00:00.005+08:00| true| 1| 3| 1.0| 1.0| 1test1|
++-----------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
+```
+
+用于查询的SQL语句:
+
+```sql
+select deDup(s1), deDup(s2), deDup(s3), deDup(s4), deDup(s5), deDup(s6) from root.testDeDup.d1
+```
+
+输出序列:
+
+```
++-----------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
+| Time|deDup(root.testDeDup.d1.s1)|deDup(root.testDeDup.d1.s2)|deDup(root.testDeDup.d1.s3)|deDup(root.testDeDup.d1.s4)|deDup(root.testDeDup.d1.s5)|deDup(root.testDeDup.d1.s6)|
++-----------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
+|1970-01-01T08:00:00.001+08:00| true| 1| 1| 1.0| 1.0| 1test1|
+|1970-01-01T08:00:00.002+08:00| null| 2| 2| 2.0| null| 2test2|
+|1970-01-01T08:00:00.003+08:00| false| 1| null| 1.0| null| null|
+|1970-01-01T08:00:00.004+08:00| true| null| 3| null| null| 1test1|
++-----------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
+```
\ No newline at end of file
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinTimeSeriesGeneratingFunctionEnum.java b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinTimeSeriesGeneratingFunctionEnum.java
index dd78d79f7c..e18223182c 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinTimeSeriesGeneratingFunctionEnum.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinTimeSeriesGeneratingFunctionEnum.java
@@ -55,6 +55,7 @@ public enum BuiltinTimeSeriesGeneratingFunctionEnum {
STRING_LOWER("LOWER"),
STRING_TRIM("TRIM"),
STRING_CMP("STRCMP"),
+ DE_DUP("DEDUP"),
DIFFERENCE("DIFFERENCE"),
NON_NEGATIVE_DIFFERENCE("NON_NEGATIVE_DIFFERENCE"),
TIME_DIFFERENCE("TIME_DIFFERENCE"),
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDTFBuiltinFunctionIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDTFBuiltinFunctionIT.java
index 090adb034c..131bb9d399 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDTFBuiltinFunctionIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDTFBuiltinFunctionIT.java
@@ -35,6 +35,8 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest;
+import static org.apache.iotdb.itbase.constant.TestConstant.TIMESTAMP_STR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -84,8 +86,8 @@ public class IoTDBUDTFBuiltinFunctionIT {
private static void insertData() {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
- for (String dataGenerationSql : INSERTION_SQLS) {
- statement.execute(dataGenerationSql);
+ for (String dataGenerationSQL : INSERTION_SQLS) {
+ statement.execute(dataGenerationSQL);
}
} catch (SQLException throwable) {
fail(throwable.getMessage());
@@ -384,8 +386,8 @@ public class IoTDBUDTFBuiltinFunctionIT {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
- for (String dataGenerationSql : ZERO_ONE_SQL) {
- statement.execute(dataGenerationSql);
+ for (String dataGenerationSQL : ZERO_ONE_SQL) {
+ statement.execute(dataGenerationSQL);
}
} catch (SQLException throwable) {
fail(throwable.getMessage());
@@ -743,8 +745,8 @@ public class IoTDBUDTFBuiltinFunctionIT {
int[] ANSWER1 = new int[] {1, 2, 39, 40, 41, 42, 79, 80, 81, 82, 99, 100};
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
- for (String dataGenerationSql : SQL_FOR_SAMPLE) {
- statement.execute(dataGenerationSql);
+ for (String dataGenerationSQL : SQL_FOR_SAMPLE) {
+ statement.execute(dataGenerationSQL);
}
} catch (SQLException throwable) {
fail(throwable.getMessage());
@@ -1041,14 +1043,14 @@ public class IoTDBUDTFBuiltinFunctionIT {
@Test
public void testStringFunctions() {
- String[] createSqls =
+ String[] createSQLs =
new String[] {
"SET STORAGE GROUP TO root.testStringFunctions",
"CREATE TIMESERIES root.testStringFunctions.d1.s1 WITH DATATYPE=TEXT, ENCODING=PLAIN",
"CREATE TIMESERIES root.testStringFunctions.d1.s2 WITH DATATYPE=TEXT, ENCODING=PLAIN",
};
- String[] insertSqls =
+ String[] insertSQLs =
new String[] {
"INSERT INTO root.testStringFunctions.d1(timestamp,s1,s2) values(1, \"1111test1111\", \" 1111test1111 \")",
"INSERT INTO root.testStringFunctions.d1(timestamp,s1) values(2, \"2222test2222\")"
@@ -1057,13 +1059,13 @@ public class IoTDBUDTFBuiltinFunctionIT {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
- for (String createSql : createSqls) {
- statement.execute(createSql);
+ for (String createSQL : createSQLs) {
+ statement.execute(createSQL);
}
- for (String insertSql : insertSqls) {
- // TODO statement.addBatch(insertSql);
- statement.execute(insertSql);
+ for (String insertSQL : insertSQLs) {
+ // TODO statement.addBatch(insertSQL);
+ statement.execute(insertSQL);
}
// TODO statement.executeBatch();
@@ -1229,4 +1231,65 @@ public class IoTDBUDTFBuiltinFunctionIT {
fail(e.getMessage());
}
}
+
+ @Test
+ public void testDeDup() {
+ String[] createSQLs =
+ new String[] {
+ "SET STORAGE GROUP TO root.testDeDup",
+ "CREATE TIMESERIES root.testDeDup.d1.s1 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.testDeDup.d1.s2 WITH DATATYPE=INT32, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.testDeDup.d1.s3 WITH DATATYPE=INT64, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.testDeDup.d1.s4 WITH DATATYPE=FLOAT, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.testDeDup.d1.s5 WITH DATATYPE=DOUBLE, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.testDeDup.d1.s6 WITH DATATYPE=TEXT, ENCODING=PLAIN",
+ };
+
+ String[] insertSQLs =
+ new String[] {
+ "INSERT INTO root.testDeDup.d1(timestamp, s1, s2, s3, s4, s5, s6) values(1, true, 1, 1, 1.0, 1.0, \"1test1\")",
+ "INSERT INTO root.testDeDup.d1(timestamp, s1, s2, s3, s4, s5, s6) values(2, true, 2, 2, 2.0, 1.0, \"2test2\")",
+ "INSERT INTO root.testDeDup.d1(timestamp, s1, s2, s3, s4, s5, s6) values(3, false, 1, 2, 1.0, 1.0, \"2test2\")",
+ "INSERT INTO root.testDeDup.d1(timestamp, s1, s2, s3, s4, s5, s6) values(4, true, 1, 3, 1.0, 1.0, \"1test1\")",
+ "INSERT INTO root.testDeDup.d1(timestamp, s1, s2, s3, s4, s5, s6) values(5, true, 1, 3, 1.0, 1.0, \"1test1\")"
+ };
+
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ for (String createSQL : createSQLs) {
+ statement.execute(createSQL);
+ }
+
+ for (String insertSQL : insertSQLs) {
+ statement.execute(insertSQL);
+ }
+
+ String[] expectedHeader =
+ new String[] {
+ TIMESTAMP_STR,
+ "deDup(root.testDeDup.d1.s1)",
+ "deDup(root.testDeDup.d1.s2)",
+ "deDup(root.testDeDup.d1.s3)",
+ "deDup(root.testDeDup.d1.s4)",
+ "deDup(root.testDeDup.d1.s5)",
+ "deDup(root.testDeDup.d1.s6)"
+ };
+
+ String[] retArray =
+ new String[] {
+ "1,true,1,1,1.0,1.0,1test1,",
+ "2,null,2,2,2.0,null,2test2,",
+ "3,false,1,null,1.0,null,null,",
+ "4,true,null,3,null,null,1test1,"
+ };
+
+ resultSetEqualTest(
+ "select deDup(s1), deDup(s2), deDup(s3), deDup(s4), deDup(s5), deDup(s6) from root.testDeDup.d1",
+ expectedHeader,
+ retArray);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java
index a5b72955f8..eaeb7b4edf 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java
@@ -67,6 +67,7 @@ public enum BuiltinTimeSeriesGeneratingFunction {
STRING_LOWER("LOWER", UDTFLower.class),
STRING_TRIM("TRIM", UDTFTrim.class),
STRING_CMP("STRCMP", UDTFStrCompare.class),
+ DE_DUP("DEDUP", UDTFDeDuplication.class),
DIFFERENCE("DIFFERENCE", UDTFCommonValueDifference.class),
NON_NEGATIVE_DIFFERENCE("NON_NEGATIVE_DIFFERENCE", UDTFNonNegativeValueDifference.class),
TIME_DIFFERENCE("TIME_DIFFERENCE", UDTFTimeDifference.class),
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFDeDuplication.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFDeDuplication.java
new file mode 100644
index 0000000000..f66c635339
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFDeDuplication.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.builtin;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+/**
+ * Return a series that the consecutive identical values in input series are removed (keeping only
+ * the first one).
+ */
+public class UDTFDeDuplication implements UDTF {
+ private boolean isFirst = true;
+ private Type dataType;
+ private boolean cacheBoolean;
+ private int cacheInt;
+ private long cacheLong;
+ private float cacheFloat;
+ private double cacheDouble;
+ private String cacheString;
+
+ @Override
+ public void validate(UDFParameterValidator validator) throws Exception {
+ validator.validateInputSeriesNumber(1);
+ }
+
+ @Override
+ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+ throws Exception {
+ dataType = parameters.getDataType(0);
+ configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(dataType);
+ }
+
+ @Override
+ public void transform(Row row, PointCollector collector) throws Exception {
+ switch (dataType) {
+ case BOOLEAN:
+ if (isFirst) {
+ isFirst = false;
+ cacheBoolean = row.getBoolean(0);
+ collector.putBoolean(row.getTime(), cacheBoolean);
+ } else {
+ boolean rowData = row.getBoolean(0);
+ if (rowData != cacheBoolean) {
+ cacheBoolean = rowData;
+ collector.putBoolean(row.getTime(), cacheBoolean);
+ }
+ }
+ break;
+ case INT32:
+ if (isFirst) {
+ isFirst = false;
+ cacheInt = row.getInt(0);
+ collector.putInt(row.getTime(), cacheInt);
+ } else {
+ int rowData = row.getInt(0);
+ if (rowData != cacheInt) {
+ cacheInt = rowData;
+ collector.putInt(row.getTime(), cacheInt);
+ }
+ }
+ break;
+ case INT64:
+ if (isFirst) {
+ isFirst = false;
+ cacheLong = row.getLong(0);
+ collector.putLong(row.getTime(), cacheLong);
+ } else {
+ long rowData = row.getLong(0);
+ if (rowData != cacheLong) {
+ cacheLong = rowData;
+ collector.putLong(row.getTime(), cacheLong);
+ }
+ }
+ break;
+ case FLOAT:
+ if (isFirst) {
+ isFirst = false;
+ cacheFloat = row.getFloat(0);
+ collector.putFloat(row.getTime(), cacheFloat);
+ } else {
+ float rowData = row.getFloat(0);
+ if (rowData != cacheFloat) {
+ cacheFloat = rowData;
+ collector.putFloat(row.getTime(), cacheFloat);
+ }
+ }
+ break;
+ case DOUBLE:
+ if (isFirst) {
+ isFirst = false;
+ cacheDouble = row.getDouble(0);
+ collector.putDouble(row.getTime(), cacheDouble);
+ } else {
+ double rowData = row.getDouble(0);
+ if (rowData != cacheDouble) {
+ cacheDouble = rowData;
+ collector.putDouble(row.getTime(), cacheDouble);
+ }
+ }
+ break;
+ case TEXT:
+ if (isFirst) {
+ isFirst = false;
+ cacheString = row.getString(0);
+ collector.putString(row.getTime(), cacheString);
+ } else {
+ String rowData = row.getString(0);
+ if (!rowData.equals(cacheString)) {
+ cacheString = rowData;
+ collector.putString(row.getTime(), cacheString);
+ }
+ }
+ }
+ }
+}