You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/12/11 10:43:05 UTC
[iotdb] branch research/algebra updated: [research/algebra] To include UDF functions for queries on different data types (#8389)
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch research/algebra
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/research/algebra by this push:
new d529e52d93 [research/algebra] To include UDF functions for queries on different data types (#8389)
d529e52d93 is described below
commit d529e52d93640f2373f083839b16f07a4ddd4c34
Author: Rui Kang <15...@qq.com>
AuthorDate: Sun Dec 11 18:42:58 2022 +0800
[research/algebra] To include UDF functions for queries on different data types (#8389)
---
.../apache/iotdb/library/query/UDTFAggCWindow.java | 70 ++++++++++++++++
.../apache/iotdb/library/query/UDTFAggEvent.java | 96 ++++++++++++++++++++++
.../org/apache/iotdb/library/query/UDTFAggMap.java | 72 ++++++++++++++++
.../apache/iotdb/library/query/UDTFAggRWindow.java | 75 +++++++++++++++++
.../iotdb/library/query/UDTFClosedRange.java | 32 ++++++++
.../iotdb/library/query/UDTFClosedValue.java | 32 ++++++++
.../org/apache/iotdb/library/query/UDTFMerge.java | 30 +++++++
.../apache/iotdb/library/query/UDTFOpenRange.java | 32 ++++++++
.../apache/iotdb/library/query/UDTFOpenValue.java | 32 ++++++++
.../iotdb/library/query/UDTFPatternCWindow.java | 68 +++++++++++++++
.../iotdb/library/query/UDTFPatternRWindow.java | 81 ++++++++++++++++++
.../iotdb/library/query/UDTFRelJoinDict.java | 38 +++++++++
.../iotdb/library/query/UDTFRelJoinTemporal.java | 50 +++++++++++
.../org/apache/iotdb/library/query/UDTFSample.java | 34 ++++++++
.../iotdb/library/query/UDTFSimAlignMulti.java | 48 +++++++++++
.../apache/iotdb/library/query/UDTFSimJoin.java | 40 +++++++++
16 files changed, 830 insertions(+)
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFAggCWindow.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFAggCWindow.java
new file mode 100644
index 0000000000..5abdf1d908
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFAggCWindow.java
@@ -0,0 +1,70 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.RowWindow;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.io.IOException;
+
+/** Aggregation by sliding windows, each window has a fixed number of tuples. */
+public class UDTFAggCWindow implements UDTF {
+ private String func;
+
+ @Override
+ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+ throws Exception {
+ func = parameters.getString("func");
+ int window = parameters.getInt("window");
+ int slide = parameters.getInt("skip");
+ configurations
+ .setAccessStrategy(new SlidingSizeWindowAccessStrategy(window, slide))
+ .setOutputDataType(Type.DOUBLE);
+ }
+
+ @Override
+ public void transform(RowWindow rowWindow, PointCollector collector) throws Exception {
+ if (rowWindow.windowSize() == 0) { // skip null rows
+ return;
+ }
+ double res;
+ switch (func) {
+ case "avg":
+ res = findAvg(rowWindow);
+ break;
+ case "spe":
+ res = findSpecial(rowWindow);
+ break;
+ default:
+ res = findMax(rowWindow);
+ }
+ collector.putDouble(rowWindow.windowStartTime(), res);
+ }
+
+ public double findMax(RowWindow rowWindow) throws IOException {
+ double ans = Double.MIN_VALUE;
+ for (int i = 0; i < rowWindow.windowSize(); i++) {
+ ans = Double.max(ans, rowWindow.getRow(i).getDouble(0));
+ }
+ return ans;
+ }
+
+ public double findAvg(RowWindow rowWindow) throws IOException {
+ double ans = 0.;
+ for (int i = 0; i < rowWindow.windowSize(); i++) {
+ ans += rowWindow.getRow(i).getDouble(0);
+ }
+ return ans / rowWindow.windowSize();
+ }
+
+ public double findSpecial(RowWindow rowWindow) throws IOException {
+ double ans = 0.;
+ for (int i = 0; i < rowWindow.windowSize(); i++) {
+ ans += rowWindow.getRow(i).getDouble(0) * rowWindow.getRow(i).getDouble(0);
+ }
+ return ans;
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFAggEvent.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFAggEvent.java
new file mode 100644
index 0000000000..fbeaa62dc0
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFAggEvent.java
@@ -0,0 +1,96 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.access.RowWindow;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Aggregation by events/general aggregation, find the e.g. averaged series value on each event tag
+ */
+public class UDTFAggEvent implements UDTF {
+
+ List<Temporal> temporal_data = new ArrayList<>();
+ Map<Integer, Double> res = new HashMap<>();
+ Map<Integer, Integer> cnt = new HashMap<>();
+
+ String func;
+
+ @Override
+ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+ throws Exception {
+ String event = parameters.getString("Events");
+ func = parameters.getString("func");
+ String[] temp = event.split(",");
+ for (int i = 0; i < temp.length; i++) {
+ String[] tmp = temp[i].split("\\|");
+ temporal_data.add(
+ new Temporal(Long.parseLong(tmp[0]), Long.parseLong(tmp[1]), Integer.parseInt(tmp[2])));
+ }
+ temporal_data.sort(
+ new Comparator<Temporal>() {
+ @Override
+ public int compare(Temporal o1, Temporal o2) {
+ if (o1.st < o2.st) return -1;
+ else return 1;
+ }
+ });
+ configurations
+ .setAccessStrategy(
+ new SlidingSizeWindowAccessStrategy(Integer.MAX_VALUE, Integer.MAX_VALUE))
+ .setOutputDataType(Type.DOUBLE);
+ }
+
+ @Override
+ public void transform(RowWindow rowWindow, PointCollector collector) throws Exception {
+ for (int i = 0; i < rowWindow.windowSize(); i++) {
+ Row curr = rowWindow.getRow(i);
+ for (int j = 0; j < temporal_data.size(); j++) {
+ if (curr.getTime() >= temporal_data.get(j).st
+ && curr.getTime() >= temporal_data.get(j).st) {
+ int tag = temporal_data.get(j).tag;
+ if (func.equals("avg")) {
+ if (!cnt.containsKey(tag)) cnt.put(tag, 0);
+ cnt.replace(tag, cnt.get(tag) + 1);
+ }
+ if (func.equals("avg") || func.equals("sum")) {
+ if (!res.containsKey(temporal_data.get(j).tag)) {
+ res.put(temporal_data.get(j).tag, 0.);
+ }
+ res.replace(
+ temporal_data.get(j).tag, curr.getDouble(0) + res.get(temporal_data.get(j).tag));
+ }
+ }
+ }
+ }
+ if (func.equals("avg")) {
+ for (int k : res.keySet()) {
+ res.replace(k, res.get(k) / cnt.get(k));
+ }
+ }
+ for (int k : res.keySet()) {
+ collector.putDouble(k, res.get(k));
+ }
+ }
+
+ private class Temporal {
+ long st, ed;
+ int tag;
+
+ public Temporal(long st, long ed, int tag) {
+ this.st = st;
+ this.ed = ed;
+ this.tag = tag;
+ }
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFAggMap.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFAggMap.java
new file mode 100644
index 0000000000..05eaf21ca7
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFAggMap.java
@@ -0,0 +1,72 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.io.IOException;
+
+/** Conversion. */
+public class UDTFAggMap implements UDTF {
+ private String func;
+ long st, ed;
+
+ @Override
+ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+ throws Exception {
+ func = parameters.getString("func");
+ st = parameters.getLong("start");
+ ed = parameters.getLong("end");
+ configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(Type.DOUBLE);
+ }
+
+ @Override
+ public void transform(Row row, PointCollector collector) throws Exception {
+ if (row.isNull(0) || row.isNull(1)) { // skip null rows
+ return;
+ }
+ if (row.getTime() > ed || row.getTime() < st) { // skip null rows
+ return;
+ }
+ double res;
+ switch (func) {
+ case "min":
+ res = findAvg(row);
+ break;
+ case "spe":
+ res = findSpecial(row);
+ break;
+ default:
+ res = findMax(row);
+ }
+ collector.putDouble(row.getTime(), res);
+ }
+
+ public double findMax(Row row) throws IOException {
+ double ans = Double.MIN_VALUE;
+ for (int i = 0; i < row.size(); i++) {
+ ans = Double.max(ans, row.getDouble(i));
+ }
+ return ans;
+ }
+
+ public double findAvg(Row row) throws IOException {
+ double ans = 0.;
+ for (int i = 0; i < row.size(); i++) {
+ ans += row.getDouble(i);
+ }
+ return ans / row.size();
+ }
+
+ public double findSpecial(Row row) throws IOException {
+ double ans = 0.;
+ for (int i = 0; i < row.size(); i++) {
+ ans += row.getDouble(i) * row.getDouble(i);
+ }
+ return ans;
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFAggRWindow.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFAggRWindow.java
new file mode 100644
index 0000000000..0c6f02d806
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFAggRWindow.java
@@ -0,0 +1,75 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.RowWindow;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.io.IOException;
+
+/** Aggregation by sliding windows, each window has a fixed size of time range. */
+public class UDTFAggRWindow implements UDTF {
+ private String func;
+ long st, ed;
+
+ @Override
+ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+ throws Exception {
+ func = parameters.getString("func");
+ long window = parameters.getLong("window");
+ long slide = parameters.getLong("skip");
+ st = parameters.getLong("start");
+ ed = parameters.getLong("end");
+ configurations
+ .setAccessStrategy(new SlidingTimeWindowAccessStrategy(window, slide))
+ .setOutputDataType(Type.DOUBLE);
+ }
+
+ @Override
+ public void transform(RowWindow rowWindow, PointCollector collector) throws Exception {
+ if (rowWindow.windowSize() == 0
+ || rowWindow.windowStartTime() > ed
+ || rowWindow.windowEndTime() < st) { // skip null rows
+ return;
+ }
+ double res;
+ switch (func) {
+ case "avg":
+ res = findAvg(rowWindow);
+ break;
+ case "spe":
+ res = findSpecial(rowWindow);
+ break;
+ default:
+ res = findMax(rowWindow);
+ }
+ collector.putDouble(rowWindow.windowStartTime(), res);
+ }
+
+ public double findMax(RowWindow rowWindow) throws IOException {
+ double ans = Double.MIN_VALUE;
+ for (int i = 0; i < rowWindow.windowSize(); i++) {
+ ans = Double.max(ans, rowWindow.getRow(i).getDouble(0));
+ }
+ return ans;
+ }
+
+ public double findAvg(RowWindow rowWindow) throws IOException {
+ double ans = 0.;
+ for (int i = 0; i < rowWindow.windowSize(); i++) {
+ ans += rowWindow.getRow(i).getDouble(0);
+ }
+ return ans / rowWindow.windowSize();
+ }
+
+ public double findSpecial(RowWindow rowWindow) throws IOException {
+ double ans = 0.;
+ for (int i = 0; i < rowWindow.windowSize(); i++) {
+ ans += rowWindow.getRow(i).getDouble(0) * rowWindow.getRow(i).getDouble(0);
+ }
+ return ans;
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFClosedRange.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFClosedRange.java
new file mode 100644
index 0000000000..4e4284aa97
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFClosedRange.java
@@ -0,0 +1,32 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+/** Find subsequence of time-series data satisfying ta<=T<tb. */
+public class UDTFClosedRange implements UDTF {
+ private long lower, upper;
+
+ @Override
+ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+ throws Exception {
+ lower = parameters.getLong("from");
+ upper = parameters.getLong("to");
+ configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(Type.DOUBLE);
+ }
+
+ @Override
+ public void transform(Row row, PointCollector collector) throws Exception {
+ if (row.isNull(0) || row.isNull(1)) { // skip null rows
+ return;
+ }
+ if (row.getTime() >= this.lower && row.getTime() < this.upper) {
+ collector.putDouble(row.getTime(), row.getDouble(0));
+ }
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFClosedValue.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFClosedValue.java
new file mode 100644
index 0000000000..8b29ecb853
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFClosedValue.java
@@ -0,0 +1,32 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+/** Find subsequence of time-series data satisfying a<=V<b. */
+public class UDTFClosedValue implements UDTF {
+ private double lower, upper;
+
+ @Override
+ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+ throws Exception {
+ lower = parameters.getDouble("from");
+ upper = parameters.getDouble("to");
+ configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(Type.DOUBLE);
+ }
+
+ @Override
+ public void transform(Row row, PointCollector collector) throws Exception {
+ if (row.isNull(0) || row.isNull(1)) { // skip null rows
+ return;
+ }
+ if (row.getDouble(0) >= lower && row.getDouble(0) < upper) {
+ collector.putDouble(row.getTime(), row.getDouble(0));
+ }
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFMerge.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFMerge.java
new file mode 100644
index 0000000000..46be1f4437
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFMerge.java
@@ -0,0 +1,30 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+/** Merge two series with priority. */
+public class UDTFMerge implements UDTF {
+ long imx;
+
+ @Override
+ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+ throws Exception {
+ imx = parameters.getLong("pivot");
+ configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(Type.DOUBLE);
+ }
+
+ @Override
+ public void transform(Row row, PointCollector collector) throws Exception {
+ if (row.getTime() < imx) {
+ collector.putDouble(row.getTime(), row.getDouble(0));
+ } else {
+ collector.putDouble(row.getTime(), row.getDouble(1));
+ }
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFOpenRange.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFOpenRange.java
new file mode 100644
index 0000000000..c638400dd0
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFOpenRange.java
@@ -0,0 +1,32 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+/** Find subsequence of time-series data satisfying T < tb || T > ta, ta > tb. */
+public class UDTFOpenRange implements UDTF {
+ private double lessThan, greaterThan;
+
+ @Override
+ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+ throws Exception {
+ lessThan = parameters.getDouble("less_than");
+ greaterThan = parameters.getDouble("greater_than");
+ configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(Type.DOUBLE);
+ }
+
+ @Override
+ public void transform(Row row, PointCollector collector) throws Exception {
+ if (row.isNull(0) || row.isNull(1)) { // skip null rows
+ return;
+ }
+ if (row.getTime() <= lessThan || row.getTime() >= greaterThan) {
+ collector.putDouble(row.getTime(), row.getDouble(0));
+ }
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFOpenValue.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFOpenValue.java
new file mode 100644
index 0000000000..90876504b4
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFOpenValue.java
@@ -0,0 +1,32 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+/** Find subsequence of time-series data satisfying V < b || V > a, a > b. */
+public class UDTFOpenValue implements UDTF {
+ private double lessThan, greaterThan;
+
+ @Override
+ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+ throws Exception {
+ lessThan = parameters.getDouble("less_than");
+ greaterThan = parameters.getDouble("greater_than");
+ configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(Type.DOUBLE);
+ }
+
+ @Override
+ public void transform(Row row, PointCollector collector) throws Exception {
+ if (row.isNull(0) || row.isNull(1)) { // skip null rows
+ return;
+ }
+ if (row.getDouble(0) <= lessThan || row.getDouble(0) >= greaterThan) {
+ collector.putDouble(row.getTime(), row.getDouble(0));
+ }
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFPatternCWindow.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFPatternCWindow.java
new file mode 100644
index 0000000000..916f00276d
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFPatternCWindow.java
@@ -0,0 +1,68 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.RowWindow;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Find distance between matching patterns and a continuous subsequence of time-series, the length
+ * of subseq is equal to the size of the pattern.
+ */
+public class UDTFPatternCWindow implements UDTF {
+ private List<Double> pattern = new ArrayList<>();
+
+ public void init(int len) {
+ Random rd = new Random();
+ for (int i = 0; i < len; i++) pattern.add(rd.nextDouble());
+ }
+
+ @Override
+ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+ throws Exception {
+ // int psz = parameters.getIntOrDefault("len", 5);
+ String[] pat = parameters.getString("pattern").split(":");
+ pattern =
+ Arrays.stream(pat)
+ .map(
+ new Function<String, Double>() {
+ @Override
+ public Double apply(String s) {
+ return Double.parseDouble(s);
+ }
+ })
+ .collect(Collectors.toList());
+ // init(psz);
+ configurations
+ .setAccessStrategy(new SlidingSizeWindowAccessStrategy(pat.length, 1))
+ .setOutputDataType(Type.DOUBLE);
+ }
+
+ @Override
+ public void transform(RowWindow rowWindow, PointCollector collector) throws Exception {
+ if (rowWindow.windowSize() == 0) { // skip null rows
+ return;
+ }
+ collector.putDouble(rowWindow.windowStartTime(), distance(rowWindow));
+ }
+
+ private double distance(RowWindow rowWindow) throws IOException {
+ double ans = 0.;
+ for (int i = 0; i < pattern.size(); i++) {
+ double diff = (pattern.get(i) - rowWindow.getRow(i).getDouble(0));
+ ans += diff * diff;
+ }
+ return Math.sqrt(ans);
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFPatternRWindow.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFPatternRWindow.java
new file mode 100644
index 0000000000..6ab549a912
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFPatternRWindow.java
@@ -0,0 +1,81 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.RowWindow;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Find distance between matching patterns and a continuous subsequence of time-series, the subseq
+ * and the pattern share the same size of time intervals.
+ */
+public class UDTFPatternRWindow implements UDTF {
+ private List<Double> pattern = new ArrayList<>();
+
+ public void init(int len) {
+ Random rd = new Random();
+ for (int i = 0; i < len; i++) pattern.add(rd.nextDouble());
+ }
+
+ @Override
+ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+ throws Exception {
+ // int psz = parameters.getIntOrDefault("len", 5);
+ String[] pat = parameters.getString("pattern").split(":");
+ pattern =
+ Arrays.stream(pat)
+ .map(
+ new Function<String, Double>() {
+ @Override
+ public Double apply(String s) {
+ return Double.parseDouble(s);
+ }
+ })
+ .collect(Collectors.toList());
+ long wsz = parameters.getLong("window");
+ long skip = parameters.getLong("skip");
+ configurations
+ .setAccessStrategy(new SlidingTimeWindowAccessStrategy(wsz, skip))
+ .setOutputDataType(Type.DOUBLE);
+ }
+
+ @Override
+ public void transform(RowWindow rowWindow, PointCollector collector) throws Exception {
+ if (rowWindow.windowSize() == 0) { // skip null rows
+ return;
+ }
+ int avgPts = (int) rowWindow.windowSize() / pattern.size();
+ collector.putDouble(rowWindow.windowStartTime(), distance(rowWindow, avgPts));
+ }
+
+ private double distance(RowWindow rowWindow, int avgPts) throws IOException {
+ double ans = 0.;
+ List<Double> tmp = new ArrayList<>();
+ for (int i = 0; i < pattern.size(); i++) {
+ double s = 0.;
+ int cnt = 0;
+ for (int j = 0; j < avgPts && j + i * avgPts < rowWindow.windowSize(); j++) {
+ s += rowWindow.getRow(j + i * avgPts).getDouble(0);
+ cnt += 1;
+ }
+ s /= cnt;
+ tmp.add(s);
+ }
+ for (int i = 0; i < pattern.size() && i < tmp.size(); i++) {
+ double diff = (pattern.get(i) - tmp.get(i));
+ ans += diff * diff;
+ }
+ return Math.sqrt(ans);
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFRelJoinDict.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFRelJoinDict.java
new file mode 100644
index 0000000000..a697699c53
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFRelJoinDict.java
@@ -0,0 +1,38 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Joining a series with a relational table, joining dimension: series value */
+public class UDTFRelJoinDict implements UDTF {
+ Map<Integer, Double> dic = new HashMap<>();
+
+ @Override
+ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+ throws Exception {
+ String[] dict = parameters.getString("dict").split(":");
+ for (int i = 0; i < dict.length; i++) {
+ String[] app = dict[i].split("\\|");
+ if (!dic.containsKey(Integer.parseInt(app[0]))) {
+ dic.put(Integer.parseInt(app[0]), Double.parseDouble(app[1]));
+ }
+ }
+ configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(Type.DOUBLE);
+ }
+
+ @Override
+ public void transform(Row row, PointCollector collector) throws Exception {
+ Integer key = (int) row.getDouble(0) * 1000;
+ if (dic.containsKey(key)) {
+ collector.putDouble(row.getTime(), dic.get(key));
+ } // else collector.putDouble(row.getTime(), key);
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFRelJoinTemporal.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFRelJoinTemporal.java
new file mode 100644
index 0000000000..d65f30712b
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFRelJoinTemporal.java
@@ -0,0 +1,50 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Joining a series with a temporal table, joining dimension: series time */
+public class UDTFRelJoinTemporal implements UDTF {
+ List<Temporal> temporal_data = new ArrayList<>();
+
+ @Override
+ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+ throws Exception {
+ String event = parameters.getString("Events");
+ String[] temp = event.split(",");
+ for (int i = 0; i < temp.length; i++) {
+ String[] tmp = temp[i].split("\\|");
+ temporal_data.add(
+ new Temporal(Long.parseLong(tmp[0]), Long.parseLong(tmp[1]), Integer.parseInt(tmp[2])));
+ }
+ configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(Type.DOUBLE);
+ }
+
+ @Override
+ public void transform(Row row, PointCollector collector) throws Exception {
+ for (Temporal rec : temporal_data) {
+ if (row.getTime() >= rec.st && row.getTime() < rec.ed) {
+ collector.putDouble(row.getTime(), rec.tag);
+ }
+ }
+ }
+
+ private class Temporal {
+ long st, ed;
+ int tag;
+
+ public Temporal(long st, long ed, int tag) {
+ this.st = st;
+ this.ed = ed;
+ this.tag = tag;
+ }
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFSample.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFSample.java
new file mode 100644
index 0000000000..f64a8f0841
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFSample.java
@@ -0,0 +1,34 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.util.Random;
+
+/**
+ * Sampling with fixed range coverage on 1d series, the tuple in each ranged window is taken
+ * randomly.
+ */
+public class UDTFSample implements UDTF {
+ double rate;
+ Random rd = new Random(System.currentTimeMillis());
+
+ @Override
+ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+ throws Exception {
+ rate = parameters.getDoubleOrDefault("rate", 0.5);
+ configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(Type.DOUBLE);
+ }
+
+ @Override
+ public void transform(Row row, PointCollector collector) throws Exception {
+ if (rd.nextDouble() <= rate) {
+ collector.putDouble(row.getTime(), row.getDouble(0));
+ }
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFSimAlignMulti.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFSimAlignMulti.java
new file mode 100644
index 0000000000..9c88c1e19a
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFSimAlignMulti.java
@@ -0,0 +1,48 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.access.RowWindow;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+/** Joining >=3 series with the tolerance on matching timestamps, defined by eps. */
+public class UDTFSimAlignMulti implements UDTF {
+ double eps;
+
+ @Override
+ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+ throws Exception {
+ eps = parameters.getDoubleOrDefault("eps", 1);
+ configurations
+ .setAccessStrategy(
+ new SlidingSizeWindowAccessStrategy(Integer.MAX_VALUE, Integer.MAX_VALUE))
+ .setOutputDataType(Type.DOUBLE);
+ }
+
+ @Override
+ public void transform(RowWindow rowWindow, PointCollector collector) throws Exception {
+ for (int i = 0; i < rowWindow.windowSize(); i++) {
+ Row curr = rowWindow.getRow(i);
+ long signal1 = -1, signal2 = -1;
+ for (int j = Math.max(i - 5, 0); j < Math.min(i + 5, rowWindow.windowSize()); j++) {
+ Row comp1 = rowWindow.getRow(j);
+ if (Math.abs(curr.getTime() - comp1.getTime()) > eps) continue;
+ for (int k = Math.max(j - 5, 0); k < Math.min(j + 5, rowWindow.windowSize()); k++) {
+ Row comp2 = rowWindow.getRow(k);
+ if (Math.abs(curr.getTime() - comp2.getTime()) <= eps
+ && Math.abs(comp1.getTime() - comp2.getTime()) <= eps) {
+ signal1 = comp1.getTime();
+ signal2 = comp2.getTime();
+ break;
+ }
+ }
+ if (signal1 != -1) break;
+ }
+ collector.putDouble(curr.getTime(), signal2);
+ }
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFSimJoin.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFSimJoin.java
new file mode 100644
index 0000000000..36eed6e284
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFSimJoin.java
@@ -0,0 +1,40 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.access.RowWindow;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+/** Joining 2 series with the tolerance on matching timestamps, defined by eps. */
+public class UDTFSimJoin implements UDTF {
+ double eps;
+
+ @Override
+ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+ throws Exception {
+ eps = parameters.getDoubleOrDefault("eps", 1);
+ configurations
+ .setAccessStrategy(
+ new SlidingSizeWindowAccessStrategy(Integer.MAX_VALUE, Integer.MAX_VALUE))
+ .setOutputDataType(Type.DOUBLE);
+ }
+
+ @Override
+ public void transform(RowWindow rowWindow, PointCollector collector) throws Exception {
+ for (int i = 0; i < rowWindow.windowSize(); i++) {
+ Row curr = rowWindow.getRow(i);
+ long signal = -1;
+ for (int j = Math.max(i - 5, 0); j < Math.min(i + 5, rowWindow.windowSize()); j++) {
+ Row comp = rowWindow.getRow(j);
+ if (Math.abs(curr.getTime() - comp.getTime()) <= eps) {
+ signal = comp.getTime();
+ }
+ }
+ collector.putDouble(curr.getTime(), signal);
+ }
+ }
+}