You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/12/11 10:43:05 UTC

[iotdb] branch research/algebra updated: [research/algebra] To include UDF functions for queries on different data types (#8389)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch research/algebra
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/research/algebra by this push:
     new d529e52d93 [research/algebra] To include UDF functions for queries on different data types (#8389)
d529e52d93 is described below

commit d529e52d93640f2373f083839b16f07a4ddd4c34
Author: Rui Kang <15...@qq.com>
AuthorDate: Sun Dec 11 18:42:58 2022 +0800

    [research/algebra] To include UDF functions for queries on different data types (#8389)
---
 .../apache/iotdb/library/query/UDTFAggCWindow.java | 70 ++++++++++++++++
 .../apache/iotdb/library/query/UDTFAggEvent.java   | 96 ++++++++++++++++++++++
 .../org/apache/iotdb/library/query/UDTFAggMap.java | 72 ++++++++++++++++
 .../apache/iotdb/library/query/UDTFAggRWindow.java | 75 +++++++++++++++++
 .../iotdb/library/query/UDTFClosedRange.java       | 32 ++++++++
 .../iotdb/library/query/UDTFClosedValue.java       | 32 ++++++++
 .../org/apache/iotdb/library/query/UDTFMerge.java  | 30 +++++++
 .../apache/iotdb/library/query/UDTFOpenRange.java  | 32 ++++++++
 .../apache/iotdb/library/query/UDTFOpenValue.java  | 32 ++++++++
 .../iotdb/library/query/UDTFPatternCWindow.java    | 68 +++++++++++++++
 .../iotdb/library/query/UDTFPatternRWindow.java    | 81 ++++++++++++++++++
 .../iotdb/library/query/UDTFRelJoinDict.java       | 38 +++++++++
 .../iotdb/library/query/UDTFRelJoinTemporal.java   | 50 +++++++++++
 .../org/apache/iotdb/library/query/UDTFSample.java | 34 ++++++++
 .../iotdb/library/query/UDTFSimAlignMulti.java     | 48 +++++++++++
 .../apache/iotdb/library/query/UDTFSimJoin.java    | 40 +++++++++
 16 files changed, 830 insertions(+)

diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFAggCWindow.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFAggCWindow.java
new file mode 100644
index 0000000000..5abdf1d908
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFAggCWindow.java
@@ -0,0 +1,70 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.RowWindow;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.io.IOException;
+
+/** Aggregation by sliding windows, each window has a fixed number of tuples. */
+public class UDTFAggCWindow implements UDTF {
+  private String func;
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    func = parameters.getString("func");
+    int window = parameters.getInt("window");
+    int slide = parameters.getInt("skip");
+    configurations
+        .setAccessStrategy(new SlidingSizeWindowAccessStrategy(window, slide))
+        .setOutputDataType(Type.DOUBLE);
+  }
+
+  @Override
+  public void transform(RowWindow rowWindow, PointCollector collector) throws Exception {
+    if (rowWindow.windowSize() == 0) { // skip null rows
+      return;
+    }
+    double res;
+    switch (func) {
+      case "avg":
+        res = findAvg(rowWindow);
+        break;
+      case "spe":
+        res = findSpecial(rowWindow);
+        break;
+      default:
+        res = findMax(rowWindow);
+    }
+    collector.putDouble(rowWindow.windowStartTime(), res);
+  }
+
+  public double findMax(RowWindow rowWindow) throws IOException {
+    double ans = Double.MIN_VALUE;
+    for (int i = 0; i < rowWindow.windowSize(); i++) {
+      ans = Double.max(ans, rowWindow.getRow(i).getDouble(0));
+    }
+    return ans;
+  }
+
+  public double findAvg(RowWindow rowWindow) throws IOException {
+    double ans = 0.;
+    for (int i = 0; i < rowWindow.windowSize(); i++) {
+      ans += rowWindow.getRow(i).getDouble(0);
+    }
+    return ans / rowWindow.windowSize();
+  }
+
+  public double findSpecial(RowWindow rowWindow) throws IOException {
+    double ans = 0.;
+    for (int i = 0; i < rowWindow.windowSize(); i++) {
+      ans += rowWindow.getRow(i).getDouble(0) * rowWindow.getRow(i).getDouble(0);
+    }
+    return ans;
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFAggEvent.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFAggEvent.java
new file mode 100644
index 0000000000..fbeaa62dc0
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFAggEvent.java
@@ -0,0 +1,96 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.access.RowWindow;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Aggregation by events/general aggregation, find the e.g. averaged series value on each event tag
+ */
+public class UDTFAggEvent implements UDTF {
+
+  List<Temporal> temporal_data = new ArrayList<>();
+  Map<Integer, Double> res = new HashMap<>();
+  Map<Integer, Integer> cnt = new HashMap<>();
+
+  String func;
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    String event = parameters.getString("Events");
+    func = parameters.getString("func");
+    String[] temp = event.split(",");
+    for (int i = 0; i < temp.length; i++) {
+      String[] tmp = temp[i].split("\\|");
+      temporal_data.add(
+          new Temporal(Long.parseLong(tmp[0]), Long.parseLong(tmp[1]), Integer.parseInt(tmp[2])));
+    }
+    temporal_data.sort(
+        new Comparator<Temporal>() {
+          @Override
+          public int compare(Temporal o1, Temporal o2) {
+            if (o1.st < o2.st) return -1;
+            else return 1;
+          }
+        });
+    configurations
+        .setAccessStrategy(
+            new SlidingSizeWindowAccessStrategy(Integer.MAX_VALUE, Integer.MAX_VALUE))
+        .setOutputDataType(Type.DOUBLE);
+  }
+
+  @Override
+  public void transform(RowWindow rowWindow, PointCollector collector) throws Exception {
+    for (int i = 0; i < rowWindow.windowSize(); i++) {
+      Row curr = rowWindow.getRow(i);
+      for (int j = 0; j < temporal_data.size(); j++) {
+        if (curr.getTime() >= temporal_data.get(j).st
+            && curr.getTime() >= temporal_data.get(j).st) {
+          int tag = temporal_data.get(j).tag;
+          if (func.equals("avg")) {
+            if (!cnt.containsKey(tag)) cnt.put(tag, 0);
+            cnt.replace(tag, cnt.get(tag) + 1);
+          }
+          if (func.equals("avg") || func.equals("sum")) {
+            if (!res.containsKey(temporal_data.get(j).tag)) {
+              res.put(temporal_data.get(j).tag, 0.);
+            }
+            res.replace(
+                temporal_data.get(j).tag, curr.getDouble(0) + res.get(temporal_data.get(j).tag));
+          }
+        }
+      }
+    }
+    if (func.equals("avg")) {
+      for (int k : res.keySet()) {
+        res.replace(k, res.get(k) / cnt.get(k));
+      }
+    }
+    for (int k : res.keySet()) {
+      collector.putDouble(k, res.get(k));
+    }
+  }
+
+  private class Temporal {
+    long st, ed;
+    int tag;
+
+    public Temporal(long st, long ed, int tag) {
+      this.st = st;
+      this.ed = ed;
+      this.tag = tag;
+    }
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFAggMap.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFAggMap.java
new file mode 100644
index 0000000000..05eaf21ca7
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFAggMap.java
@@ -0,0 +1,72 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.io.IOException;
+
+/** Conversion. */
+public class UDTFAggMap implements UDTF {
+  private String func;
+  long st, ed;
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    func = parameters.getString("func");
+    st = parameters.getLong("start");
+    ed = parameters.getLong("end");
+    configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(Type.DOUBLE);
+  }
+
+  @Override
+  public void transform(Row row, PointCollector collector) throws Exception {
+    if (row.isNull(0) || row.isNull(1)) { // skip null rows
+      return;
+    }
+    if (row.getTime() > ed || row.getTime() < st) { // skip null rows
+      return;
+    }
+    double res;
+    switch (func) {
+      case "min":
+        res = findAvg(row);
+        break;
+      case "spe":
+        res = findSpecial(row);
+        break;
+      default:
+        res = findMax(row);
+    }
+    collector.putDouble(row.getTime(), res);
+  }
+
+  public double findMax(Row row) throws IOException {
+    double ans = Double.MIN_VALUE;
+    for (int i = 0; i < row.size(); i++) {
+      ans = Double.max(ans, row.getDouble(i));
+    }
+    return ans;
+  }
+
+  public double findAvg(Row row) throws IOException {
+    double ans = 0.;
+    for (int i = 0; i < row.size(); i++) {
+      ans += row.getDouble(i);
+    }
+    return ans / row.size();
+  }
+
+  public double findSpecial(Row row) throws IOException {
+    double ans = 0.;
+    for (int i = 0; i < row.size(); i++) {
+      ans += row.getDouble(i) * row.getDouble(i);
+    }
+    return ans;
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFAggRWindow.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFAggRWindow.java
new file mode 100644
index 0000000000..0c6f02d806
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFAggRWindow.java
@@ -0,0 +1,75 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.RowWindow;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.io.IOException;
+
+/** Aggregation by sliding windows, each window has a fixed size of time range. */
+public class UDTFAggRWindow implements UDTF {
+  private String func;
+  long st, ed;
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    func = parameters.getString("func");
+    long window = parameters.getLong("window");
+    long slide = parameters.getLong("skip");
+    st = parameters.getLong("start");
+    ed = parameters.getLong("end");
+    configurations
+        .setAccessStrategy(new SlidingTimeWindowAccessStrategy(window, slide))
+        .setOutputDataType(Type.DOUBLE);
+  }
+
+  @Override
+  public void transform(RowWindow rowWindow, PointCollector collector) throws Exception {
+    if (rowWindow.windowSize() == 0
+        || rowWindow.windowStartTime() > ed
+        || rowWindow.windowEndTime() < st) { // skip null rows
+      return;
+    }
+    double res;
+    switch (func) {
+      case "avg":
+        res = findAvg(rowWindow);
+        break;
+      case "spe":
+        res = findSpecial(rowWindow);
+        break;
+      default:
+        res = findMax(rowWindow);
+    }
+    collector.putDouble(rowWindow.windowStartTime(), res);
+  }
+
+  public double findMax(RowWindow rowWindow) throws IOException {
+    double ans = Double.MIN_VALUE;
+    for (int i = 0; i < rowWindow.windowSize(); i++) {
+      ans = Double.max(ans, rowWindow.getRow(i).getDouble(0));
+    }
+    return ans;
+  }
+
+  public double findAvg(RowWindow rowWindow) throws IOException {
+    double ans = 0.;
+    for (int i = 0; i < rowWindow.windowSize(); i++) {
+      ans += rowWindow.getRow(i).getDouble(0);
+    }
+    return ans / rowWindow.windowSize();
+  }
+
+  public double findSpecial(RowWindow rowWindow) throws IOException {
+    double ans = 0.;
+    for (int i = 0; i < rowWindow.windowSize(); i++) {
+      ans += rowWindow.getRow(i).getDouble(0) * rowWindow.getRow(i).getDouble(0);
+    }
+    return ans;
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFClosedRange.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFClosedRange.java
new file mode 100644
index 0000000000..4e4284aa97
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFClosedRange.java
@@ -0,0 +1,32 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+/** Find subsequence of time-series data satisfying ta<=T<tb. */
+public class UDTFClosedRange implements UDTF {
+  private long lower, upper;
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    lower = parameters.getLong("from");
+    upper = parameters.getLong("to");
+    configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(Type.DOUBLE);
+  }
+
+  @Override
+  public void transform(Row row, PointCollector collector) throws Exception {
+    if (row.isNull(0) || row.isNull(1)) { // skip null rows
+      return;
+    }
+    if (row.getTime() >= this.lower && row.getTime() < this.upper) {
+      collector.putDouble(row.getTime(), row.getDouble(0));
+    }
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFClosedValue.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFClosedValue.java
new file mode 100644
index 0000000000..8b29ecb853
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFClosedValue.java
@@ -0,0 +1,32 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+/** Find subsequence of time-series data satisfying a<=V<b. */
+public class UDTFClosedValue implements UDTF {
+  private double lower, upper;
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    lower = parameters.getDouble("from");
+    upper = parameters.getDouble("to");
+    configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(Type.DOUBLE);
+  }
+
+  @Override
+  public void transform(Row row, PointCollector collector) throws Exception {
+    if (row.isNull(0) || row.isNull(1)) { // skip null rows
+      return;
+    }
+    if (row.getDouble(0) >= lower && row.getDouble(0) < upper) {
+      collector.putDouble(row.getTime(), row.getDouble(0));
+    }
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFMerge.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFMerge.java
new file mode 100644
index 0000000000..46be1f4437
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFMerge.java
@@ -0,0 +1,30 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+/** Merge two series with priority. */
+public class UDTFMerge implements UDTF {
+  long imx;
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    imx = parameters.getLong("pivot");
+    configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(Type.DOUBLE);
+  }
+
+  @Override
+  public void transform(Row row, PointCollector collector) throws Exception {
+    if (row.getTime() < imx) {
+      collector.putDouble(row.getTime(), row.getDouble(0));
+    } else {
+      collector.putDouble(row.getTime(), row.getDouble(1));
+    }
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFOpenRange.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFOpenRange.java
new file mode 100644
index 0000000000..c638400dd0
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFOpenRange.java
@@ -0,0 +1,32 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+/** Find subsequence of time-series data satisfying T < tb || T > ta, ta > tb. */
+public class UDTFOpenRange implements UDTF {
+  private double lessThan, greaterThan;
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    lessThan = parameters.getDouble("less_than");
+    greaterThan = parameters.getDouble("greater_than");
+    configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(Type.DOUBLE);
+  }
+
+  @Override
+  public void transform(Row row, PointCollector collector) throws Exception {
+    if (row.isNull(0) || row.isNull(1)) { // skip null rows
+      return;
+    }
+    if (row.getTime() <= lessThan || row.getTime() >= greaterThan) {
+      collector.putDouble(row.getTime(), row.getDouble(0));
+    }
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFOpenValue.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFOpenValue.java
new file mode 100644
index 0000000000..90876504b4
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFOpenValue.java
@@ -0,0 +1,32 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+/** Find subsequence of time-series data satisfying V < b || V > a, a > b. */
+public class UDTFOpenValue implements UDTF {
+  private double lessThan, greaterThan;
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    lessThan = parameters.getDouble("less_than");
+    greaterThan = parameters.getDouble("greater_than");
+    configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(Type.DOUBLE);
+  }
+
+  @Override
+  public void transform(Row row, PointCollector collector) throws Exception {
+    if (row.isNull(0) || row.isNull(1)) { // skip null rows
+      return;
+    }
+    if (row.getDouble(0) <= lessThan || row.getDouble(0) >= greaterThan) {
+      collector.putDouble(row.getTime(), row.getDouble(0));
+    }
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFPatternCWindow.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFPatternCWindow.java
new file mode 100644
index 0000000000..916f00276d
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFPatternCWindow.java
@@ -0,0 +1,68 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.RowWindow;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Find distance between matching patterns and a continuous subsequence of time-series, the length
+ * of subseq is equal to the size of the pattern.
+ */
+public class UDTFPatternCWindow implements UDTF {
+  private List<Double> pattern = new ArrayList<>();
+
+  public void init(int len) {
+    Random rd = new Random();
+    for (int i = 0; i < len; i++) pattern.add(rd.nextDouble());
+  }
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    // int psz = parameters.getIntOrDefault("len", 5);
+    String[] pat = parameters.getString("pattern").split(":");
+    pattern =
+        Arrays.stream(pat)
+            .map(
+                new Function<String, Double>() {
+                  @Override
+                  public Double apply(String s) {
+                    return Double.parseDouble(s);
+                  }
+                })
+            .collect(Collectors.toList());
+    // init(psz);
+    configurations
+        .setAccessStrategy(new SlidingSizeWindowAccessStrategy(pat.length, 1))
+        .setOutputDataType(Type.DOUBLE);
+  }
+
+  @Override
+  public void transform(RowWindow rowWindow, PointCollector collector) throws Exception {
+    if (rowWindow.windowSize() == 0) { // skip null rows
+      return;
+    }
+    collector.putDouble(rowWindow.windowStartTime(), distance(rowWindow));
+  }
+
+  private double distance(RowWindow rowWindow) throws IOException {
+    double ans = 0.;
+    for (int i = 0; i < pattern.size(); i++) {
+      double diff = (pattern.get(i) - rowWindow.getRow(i).getDouble(0));
+      ans += diff * diff;
+    }
+    return Math.sqrt(ans);
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFPatternRWindow.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFPatternRWindow.java
new file mode 100644
index 0000000000..6ab549a912
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFPatternRWindow.java
@@ -0,0 +1,81 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.RowWindow;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Find distance between matching patterns and a continuous subsequence of time-series, the subseq
+ * and the pattern share the same size of time intervals.
+ */
+public class UDTFPatternRWindow implements UDTF {
+  private List<Double> pattern = new ArrayList<>();
+
+  public void init(int len) {
+    Random rd = new Random();
+    for (int i = 0; i < len; i++) pattern.add(rd.nextDouble());
+  }
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    // int psz = parameters.getIntOrDefault("len", 5);
+    String[] pat = parameters.getString("pattern").split(":");
+    pattern =
+        Arrays.stream(pat)
+            .map(
+                new Function<String, Double>() {
+                  @Override
+                  public Double apply(String s) {
+                    return Double.parseDouble(s);
+                  }
+                })
+            .collect(Collectors.toList());
+    long wsz = parameters.getLong("window");
+    long skip = parameters.getLong("skip");
+    configurations
+        .setAccessStrategy(new SlidingTimeWindowAccessStrategy(wsz, skip))
+        .setOutputDataType(Type.DOUBLE);
+  }
+
+  @Override
+  public void transform(RowWindow rowWindow, PointCollector collector) throws Exception {
+    if (rowWindow.windowSize() == 0) { // skip null rows
+      return;
+    }
+    int avgPts = (int) rowWindow.windowSize() / pattern.size();
+    collector.putDouble(rowWindow.windowStartTime(), distance(rowWindow, avgPts));
+  }
+
+  private double distance(RowWindow rowWindow, int avgPts) throws IOException {
+    double ans = 0.;
+    List<Double> tmp = new ArrayList<>();
+    for (int i = 0; i < pattern.size(); i++) {
+      double s = 0.;
+      int cnt = 0;
+      for (int j = 0; j < avgPts && j + i * avgPts < rowWindow.windowSize(); j++) {
+        s += rowWindow.getRow(j + i * avgPts).getDouble(0);
+        cnt += 1;
+      }
+      s /= cnt;
+      tmp.add(s);
+    }
+    for (int i = 0; i < pattern.size() && i < tmp.size(); i++) {
+      double diff = (pattern.get(i) - tmp.get(i));
+      ans += diff * diff;
+    }
+    return Math.sqrt(ans);
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFRelJoinDict.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFRelJoinDict.java
new file mode 100644
index 0000000000..a697699c53
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFRelJoinDict.java
@@ -0,0 +1,38 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Joining a series with a relational table, joining dimension: series value */
+public class UDTFRelJoinDict implements UDTF {
+  Map<Integer, Double> dic = new HashMap<>();
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    String[] dict = parameters.getString("dict").split(":");
+    for (int i = 0; i < dict.length; i++) {
+      String[] app = dict[i].split("\\|");
+      if (!dic.containsKey(Integer.parseInt(app[0]))) {
+        dic.put(Integer.parseInt(app[0]), Double.parseDouble(app[1]));
+      }
+    }
+    configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(Type.DOUBLE);
+  }
+
+  @Override
+  public void transform(Row row, PointCollector collector) throws Exception {
+    Integer key = (int) row.getDouble(0) * 1000;
+    if (dic.containsKey(key)) {
+      collector.putDouble(row.getTime(), dic.get(key));
+    } // else collector.putDouble(row.getTime(), key);
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFRelJoinTemporal.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFRelJoinTemporal.java
new file mode 100644
index 0000000000..d65f30712b
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFRelJoinTemporal.java
@@ -0,0 +1,50 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Joining a series with a temporal table, joining dimension: series time */
+public class UDTFRelJoinTemporal implements UDTF {
+  List<Temporal> temporal_data = new ArrayList<>();
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    String event = parameters.getString("Events");
+    String[] temp = event.split(",");
+    for (int i = 0; i < temp.length; i++) {
+      String[] tmp = temp[i].split("\\|");
+      temporal_data.add(
+          new Temporal(Long.parseLong(tmp[0]), Long.parseLong(tmp[1]), Integer.parseInt(tmp[2])));
+    }
+    configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(Type.DOUBLE);
+  }
+
+  @Override
+  public void transform(Row row, PointCollector collector) throws Exception {
+    for (Temporal rec : temporal_data) {
+      if (row.getTime() >= rec.st && row.getTime() < rec.ed) {
+        collector.putDouble(row.getTime(), rec.tag);
+      }
+    }
+  }
+
+  private class Temporal {
+    long st, ed;
+    int tag;
+
+    public Temporal(long st, long ed, int tag) {
+      this.st = st;
+      this.ed = ed;
+      this.tag = tag;
+    }
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFSample.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFSample.java
new file mode 100644
index 0000000000..f64a8f0841
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFSample.java
@@ -0,0 +1,34 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.util.Random;
+
+/**
+ * Sampling with fixed range coverage on 1d series, the tuple in each ranged window is taken
+ * randomly.
+ */
+public class UDTFSample implements UDTF {
+  double rate;
+  Random rd = new Random(System.currentTimeMillis());
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    rate = parameters.getDoubleOrDefault("rate", 0.5);
+    configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(Type.DOUBLE);
+  }
+
+  @Override
+  public void transform(Row row, PointCollector collector) throws Exception {
+    if (rd.nextDouble() <= rate) {
+      collector.putDouble(row.getTime(), row.getDouble(0));
+    }
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFSimAlignMulti.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFSimAlignMulti.java
new file mode 100644
index 0000000000..9c88c1e19a
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFSimAlignMulti.java
@@ -0,0 +1,48 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.access.RowWindow;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+/** Joining >=3 series with the tolerance on matching timestamps, defined by eps. */
+public class UDTFSimAlignMulti implements UDTF {
+  double eps;
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    eps = parameters.getDoubleOrDefault("eps", 1);
+    configurations
+        .setAccessStrategy(
+            new SlidingSizeWindowAccessStrategy(Integer.MAX_VALUE, Integer.MAX_VALUE))
+        .setOutputDataType(Type.DOUBLE);
+  }
+
+  @Override
+  public void transform(RowWindow rowWindow, PointCollector collector) throws Exception {
+    for (int i = 0; i < rowWindow.windowSize(); i++) {
+      Row curr = rowWindow.getRow(i);
+      long signal1 = -1, signal2 = -1;
+      for (int j = Math.max(i - 5, 0); j < Math.min(i + 5, rowWindow.windowSize()); j++) {
+        Row comp1 = rowWindow.getRow(j);
+        if (Math.abs(curr.getTime() - comp1.getTime()) > eps) continue;
+        for (int k = Math.max(j - 5, 0); k < Math.min(j + 5, rowWindow.windowSize()); k++) {
+          Row comp2 = rowWindow.getRow(k);
+          if (Math.abs(curr.getTime() - comp2.getTime()) <= eps
+              && Math.abs(comp1.getTime() - comp2.getTime()) <= eps) {
+            signal1 = comp1.getTime();
+            signal2 = comp2.getTime();
+            break;
+          }
+        }
+        if (signal1 != -1) break;
+      }
+      collector.putDouble(curr.getTime(), signal2);
+    }
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFSimJoin.java b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFSimJoin.java
new file mode 100644
index 0000000000..36eed6e284
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/query/UDTFSimJoin.java
@@ -0,0 +1,40 @@
+package org.apache.iotdb.library.query;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.access.RowWindow;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+/** Joining 2 series with the tolerance on matching timestamps, defined by eps. */
+public class UDTFSimJoin implements UDTF {
+  double eps;
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    eps = parameters.getDoubleOrDefault("eps", 1);
+    configurations
+        .setAccessStrategy(
+            new SlidingSizeWindowAccessStrategy(Integer.MAX_VALUE, Integer.MAX_VALUE))
+        .setOutputDataType(Type.DOUBLE);
+  }
+
+  @Override
+  public void transform(RowWindow rowWindow, PointCollector collector) throws Exception {
+    for (int i = 0; i < rowWindow.windowSize(); i++) {
+      Row curr = rowWindow.getRow(i);
+      long signal = -1;
+      for (int j = Math.max(i - 5, 0); j < Math.min(i + 5, rowWindow.windowSize()); j++) {
+        Row comp = rowWindow.getRow(j);
+        if (Math.abs(curr.getTime() - comp.getTime()) <= eps) {
+          signal = comp.getTime();
+        }
+      }
+      collector.putDouble(curr.getTime(), signal);
+    }
+  }
+}