You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/08 10:01:03 UTC

[iotdb] branch master updated: [IOTDB-2303]Library dmatch (#4835)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e82976def [IOTDB-2303]Library dmatch (#4835)
2e82976def is described below

commit 2e82976def16368212b63fb81dc18173340b0f01
Author: Wendi He <55...@users.noreply.github.com>
AuthorDate: Fri Apr 8 18:00:57 2022 +0800

    [IOTDB-2303]Library dmatch (#4835)
---
 .../org/apache/iotdb/library/dmatch/UDAFCov.java   |  86 +++
 .../org/apache/iotdb/library/dmatch/UDAFDtw.java   |  91 +++
 .../apache/iotdb/library/dmatch/UDAFPearson.java   |  95 +++
 .../apache/iotdb/library/dmatch/UDTFPtnSym.java    |  98 +++
 .../org/apache/iotdb/library/dmatch/UDTFXCorr.java |  86 +++
 .../library/dmatch/util/CrossCorrelation.java      |  54 ++
 .../apache/iotdb/library/dmatch/DMatchTests.java   | 826 +++++++++++++++++++++
 7 files changed, 1336 insertions(+)

diff --git a/library-udf/src/main/java/org/apache/iotdb/library/dmatch/UDAFCov.java b/library-udf/src/main/java/org/apache/iotdb/library/dmatch/UDAFCov.java
new file mode 100644
index 0000000000..bf61929ac3
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/dmatch/UDAFCov.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.library.dmatch;
+
+import org.apache.iotdb.db.query.udf.api.UDTF;
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.library.util.Util;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+/** This function calculates Covariance between two input series. */
+public class UDAFCov implements UDTF {
+
+  private long count = 0;
+  private double sum_x = 0.0;
+  private double sum_y = 0.0;
+  private double sum_xy = 0.0;
+
+  @Override
+  public void validate(UDFParameterValidator validator) throws Exception {
+    validator
+        .validateInputSeriesNumber(2)
+        .validateInputSeriesDataType(
+            0, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE)
+        .validateInputSeriesDataType(
+            1, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE);
+  }
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    configurations
+        .setAccessStrategy(new RowByRowAccessStrategy())
+        .setOutputDataType(TSDataType.DOUBLE);
+    count = 0;
+    sum_x = 0;
+    sum_y = 0;
+    sum_xy = 0;
+  }
+
+  @Override
+  public void transform(Row row, PointCollector collector) throws Exception {
+    if (row.isNull(0) || row.isNull(1)) { // skip null rows
+      return;
+    }
+    double x = Util.getValueAsDouble(row, 0);
+    double y = Util.getValueAsDouble(row, 1);
+    if (Double.isFinite(x) && Double.isFinite(y)) { // skip NaN rows
+      count++;
+      sum_x += x;
+      sum_y += y;
+      sum_xy += x * y;
+    }
+  }
+
+  @Override
+  public void terminate(PointCollector collector) throws Exception {
+    if (count > 0) { // calculate Cov only when there is more than 1 point
+      double cov = (sum_xy - sum_x * sum_y / count) / count;
+      collector.putDouble(0, cov);
+    } else {
+      collector.putDouble(0, Double.NaN);
+    }
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/dmatch/UDAFDtw.java b/library-udf/src/main/java/org/apache/iotdb/library/dmatch/UDAFDtw.java
new file mode 100644
index 0000000000..90fae2816e
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/dmatch/UDAFDtw.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.library.dmatch;
+
+import org.apache.iotdb.db.query.udf.api.UDTF;
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.access.RowWindow;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.library.util.Util;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.eclipse.collections.impl.list.mutable.primitive.DoubleArrayList;
+
+/** This function calculates DTW distance between two input series. */
+public class UDAFDtw implements UDTF {
+
+  private double[][] dp;
+  private int m;
+
+  @Override
+  public void validate(UDFParameterValidator validator) throws Exception {
+    validator
+        .validateInputSeriesNumber(2)
+        .validateInputSeriesDataType(
+            0, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE)
+        .validateInputSeriesDataType(
+            1, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE);
+  }
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    configurations
+        .setAccessStrategy(new SlidingSizeWindowAccessStrategy(Integer.MAX_VALUE))
+        .setOutputDataType(TSDataType.DOUBLE);
+  }
+
+  @Override
+  public void transform(RowWindow rowWindow, PointCollector collector) throws Exception {
+    DoubleArrayList a = new DoubleArrayList();
+    DoubleArrayList b = new DoubleArrayList();
+    int n = rowWindow.windowSize();
+    for (int i = 0; i < n; i++) {
+      Row row = rowWindow.getRow(i);
+      if (row.isNull(0) || row.isNull(1)) {
+        continue;
+      }
+      a.add(Util.getValueAsDouble(row, 0));
+      b.add(Util.getValueAsDouble(row, 1));
+    }
+    m = a.size();
+    dp = new double[m + 1][m + 1];
+    for (int i = 1; i <= m; i++) {
+      dp[0][i] = dp[i][0] = Double.MAX_VALUE;
+    }
+    dp[0][0] = 0;
+    for (int i = 1; i <= m; i++) {
+      for (int j = 1; j <= m; j++) {
+        dp[i][j] =
+            Math.abs(a.get(i - 1) - b.get(j - 1))
+                + Math.min(Math.min(dp[i][j - 1], dp[i - 1][j]), dp[i - 1][j - 1]);
+      }
+    }
+  }
+
+  @Override
+  public void terminate(PointCollector collector) throws Exception {
+    collector.putDouble(0, dp[m][m]);
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/dmatch/UDAFPearson.java b/library-udf/src/main/java/org/apache/iotdb/library/dmatch/UDAFPearson.java
new file mode 100644
index 0000000000..3a8e97ac1a
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/dmatch/UDAFPearson.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.library.dmatch;
+
+import org.apache.iotdb.db.query.udf.api.UDTF;
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.library.util.Util;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+/** This function calculates Pearson's r between two input series. */
+public class UDAFPearson implements UDTF {
+
+  private long count = 0;
+  private double sum_x = 0.0;
+  private double sum_y = 0.0;
+  private double sum_xy = 0.0;
+  private double sum_xx = 0.0;
+  private double sum_yy = 0.0;
+
+  @Override
+  public void validate(UDFParameterValidator validator) throws Exception {
+    validator
+        .validateInputSeriesNumber(2)
+        .validateInputSeriesDataType(
+            0, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE)
+        .validateInputSeriesDataType(
+            1, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE);
+  }
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    configurations
+        .setAccessStrategy(new RowByRowAccessStrategy())
+        .setOutputDataType(TSDataType.DOUBLE);
+    count = 0;
+    sum_x = 0.0;
+    sum_y = 0.0;
+    sum_xy = 0.0;
+    sum_xx = 0.0;
+    sum_yy = 0.0;
+  }
+
+  @Override
+  public void transform(Row row, PointCollector collector) throws Exception {
+    if (row.isNull(0) || row.isNull(1)) { // skip null rows
+      return;
+    }
+    double x = Util.getValueAsDouble(row, 0);
+    double y = Util.getValueAsDouble(row, 1);
+    if (Double.isFinite(x) && Double.isFinite(y)) { // skip NaN rows
+      count++;
+      sum_x += x;
+      sum_y += y;
+      sum_xy += x * y;
+      sum_xx += x * x;
+      sum_yy += y * y;
+    }
+  }
+
+  @Override
+  public void terminate(PointCollector collector) throws Exception {
+    if (count > 0) { // calculate R only when there is more than 1 point
+      double pearson =
+          (count * sum_xy - sum_x * sum_y)
+              / Math.sqrt(count * sum_xx - sum_x * sum_x)
+              / Math.sqrt(count * sum_yy - sum_y * sum_y);
+      collector.putDouble(0, pearson);
+    } else {
+      collector.putDouble(0, Double.NaN);
+    }
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/dmatch/UDTFPtnSym.java b/library-udf/src/main/java/org/apache/iotdb/library/dmatch/UDTFPtnSym.java
new file mode 100644
index 0000000000..98478e751b
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/dmatch/UDTFPtnSym.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.library.dmatch;
+
+import org.apache.iotdb.db.query.udf.api.UDTF;
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.access.RowWindow;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.library.util.Util;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.eclipse.collections.impl.list.mutable.primitive.DoubleArrayList;
+
+/** This function finds symmetric patterns in a series according to DTW distance. */
+public class UDTFPtnSym implements UDTF {
+
+  private int window;
+  private double threshold;
+
+  @Override
+  public void validate(UDFParameterValidator validator) throws Exception {
+    validator
+        .validateInputSeriesNumber(1)
+        .validateInputSeriesDataType(
+            0, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE)
+        .validate(
+            x -> (int) x > 0,
+            "window has to be a positive integer.",
+            validator.getParameters().getIntOrDefault("window", 10))
+        .validate(
+            x -> (double) x >= 0.0d,
+            "threshold has to be non-negative.",
+            validator.getParameters().getDoubleOrDefault("threshold", Double.MAX_VALUE));
+  }
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    window = parameters.getIntOrDefault("window", 10);
+    configurations
+        .setAccessStrategy(new SlidingSizeWindowAccessStrategy(window, 1))
+        .setOutputDataType(TSDataType.DOUBLE);
+    threshold = parameters.getDoubleOrDefault("threshold", Double.MAX_VALUE);
+  }
+
+  @Override
+  public void transform(RowWindow rowWindow, PointCollector collector) throws Exception {
+    if (rowWindow.windowSize() < window) { // skip too short series
+      return;
+    }
+    DoubleArrayList a = new DoubleArrayList();
+    int n = rowWindow.windowSize();
+    long time = rowWindow.getRow(0).getTime();
+    for (int i = 0; i < n; i++) {
+      Row row = rowWindow.getRow(i);
+      a.add(Util.getValueAsDouble(row, 0));
+    }
+    int m = a.size();
+    double[][] dp = new double[m + 1][m + 1];
+    for (int i = 1; i <= m; i++) {
+      dp[i][i] = 0;
+      if (i < m) {
+        dp[i][i + 1] = Math.pow(Math.abs(a.get(i - 1) - a.get(i)), 2);
+      }
+    }
+    for (int len = 3; len <= m; len++) {
+      for (int i = 1, j = len; j <= m; j++) {
+        dp[i][j] =
+            Math.pow(Math.abs(a.get(0) - a.get(j - 1)), 2)
+                + Math.min(Math.min(dp[i + 1][j], dp[i][j - 1]), dp[i + 1][j - 1]);
+      }
+    }
+    if (dp[1][m] <= threshold) {
+      collector.putDouble(time, dp[1][m]);
+    }
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/dmatch/UDTFXCorr.java b/library-udf/src/main/java/org/apache/iotdb/library/dmatch/UDTFXCorr.java
new file mode 100644
index 0000000000..b72a3a276d
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/dmatch/UDTFXCorr.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.library.dmatch;
+
+import org.apache.iotdb.db.query.udf.api.UDTF;
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.library.dmatch.util.CrossCorrelation;
+import org.apache.iotdb.library.util.Util;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.ArrayList;
+
+/** This function calculates cross correlation between two input series. */
+public class UDTFXCorr implements UDTF {
+
+  private final ArrayList<Double> valueArrayList0 = new ArrayList<>();
+  private final ArrayList<Double> valueArrayList1 = new ArrayList<>();
+
+  @Override
+  public void beforeStart(UDFParameters udfParameters, UDTFConfigurations udtfConfigurations)
+      throws Exception {
+    udtfConfigurations
+        .setAccessStrategy(new RowByRowAccessStrategy())
+        .setOutputDataType(TSDataType.DOUBLE);
+    valueArrayList0.clear();
+    valueArrayList1.clear();
+  }
+
+  @Override
+  public void transform(Row row, PointCollector collector) throws Exception {
+    if (row.isNull(0) || row.isNull(1)) {
+      valueArrayList0.add(Double.NaN);
+      valueArrayList1.add(Double.NaN);
+    } else {
+      valueArrayList0.add(Util.getValueAsDouble(row, 0));
+      valueArrayList1.add(Util.getValueAsDouble(row, 1));
+    }
+  }
+
+  @Override
+  public void terminate(PointCollector collector) throws Exception {
+    ArrayList<Double> correlationArrayList =
+        CrossCorrelation.calculateCrossCorrelation(valueArrayList0, valueArrayList1);
+    for (int i = 0; i < correlationArrayList.size(); i++) {
+      collector.putDouble(i, correlationArrayList.get(i));
+    }
+  }
+
+  @Override
+  public void validate(UDFParameterValidator validator) throws Exception {
+    validator
+        .validateInputSeriesNumber(2)
+        .validateInputSeriesDataType(
+            0, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE)
+        .validateInputSeriesDataType(
+            1, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE);
+  }
+
+  @Override
+  public void beforeDestroy() {
+    valueArrayList0.clear();
+    valueArrayList1.clear();
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/dmatch/util/CrossCorrelation.java b/library-udf/src/main/java/org/apache/iotdb/library/dmatch/util/CrossCorrelation.java
new file mode 100644
index 0000000000..2e3efcc0f5
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/dmatch/util/CrossCorrelation.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.library.dmatch.util;
+
+import java.util.ArrayList;
+
+/** util for UDTFXCorr and UDTFACF */
+public class CrossCorrelation {
+  public static ArrayList<Double> calculateCrossCorrelation(
+      ArrayList<Double> valueArrayList1, ArrayList<Double> valueArrayList2) {
+    ArrayList<Double> correlationArrayList = new ArrayList<>();
+    int length = valueArrayList1.size();
+    for (int shift = 1; shift <= length; shift++) {
+      double correlation = 0.0;
+      for (int i = 0; i < shift; i++) {
+        if (Double.isFinite(valueArrayList1.get(i))
+            && Double.isFinite(valueArrayList2.get(length - shift + i))) {
+          correlation += valueArrayList1.get(i) * valueArrayList2.get(length - shift + i);
+        }
+      }
+      correlation /= shift;
+      correlationArrayList.add(correlation);
+    }
+    for (int shift = 1; shift < length; shift++) {
+      double correlation = 0.0;
+      for (int i = 0; i < length - shift; i++) {
+        if (Double.isFinite(valueArrayList1.get(shift + i))
+            && Double.isFinite(valueArrayList2.get(i))) {
+          correlation += valueArrayList1.get(shift + i) * valueArrayList2.get(i);
+        }
+      }
+      correlation = correlation / length;
+      correlationArrayList.add(correlation);
+    }
+    return correlationArrayList;
+  }
+}
diff --git a/library-udf/src/test/java/org/apache/iotdb/library/dmatch/DMatchTests.java b/library-udf/src/test/java/org/apache/iotdb/library/dmatch/DMatchTests.java
new file mode 100644
index 0000000000..435a7631e6
--- /dev/null
+++ b/library-udf/src/test/java/org/apache/iotdb/library/dmatch/DMatchTests.java
@@ -0,0 +1,826 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.library.dmatch;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.integration.env.ConfigFactory;
+import org.apache.iotdb.integration.env.EnvFactory;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.junit.Assert.fail;
+
+public class DMatchTests {
+  private static final float oldUdfCollectorMemoryBudgetInMB =
+      IoTDBDescriptor.getInstance().getConfig().getUdfCollectorMemoryBudgetInMB();
+  private static final float oldUdfTransformerMemoryBudgetInMB =
+      IoTDBDescriptor.getInstance().getConfig().getUdfTransformerMemoryBudgetInMB();
+  private static final float oldUdfReaderMemoryBudgetInMB =
+      IoTDBDescriptor.getInstance().getConfig().getUdfReaderMemoryBudgetInMB();
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    ConfigFactory.getConfig()
+        .setUdfCollectorMemoryBudgetInMB(5)
+        .setUdfTransformerMemoryBudgetInMB(5)
+        .setUdfReaderMemoryBudgetInMB(5);
+    EnvFactory.getEnv().initBeforeClass();
+    createTimeSeries();
+    generateData();
+    registerUDF();
+  }
+
+  private static void createTimeSeries() throws MetadataException {
+    IoTDB.schemaEngine.setStorageGroup(new PartialPath("root.vehicle"));
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d1.s1"),
+        TSDataType.INT32,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d1.s2"),
+        TSDataType.INT64,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d1.s3"),
+        TSDataType.FLOAT,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d1.s4"),
+        TSDataType.DOUBLE,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d1.s5"),
+        TSDataType.INT32,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d1.s6"),
+        TSDataType.INT64,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d1.s7"),
+        TSDataType.FLOAT,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d1.s8"),
+        TSDataType.DOUBLE,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d2.s1"),
+        TSDataType.INT32,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d2.s2"),
+        TSDataType.INT64,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d2.s3"),
+        TSDataType.FLOAT,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d2.s4"),
+        TSDataType.DOUBLE,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d2.s5"),
+        TSDataType.INT32,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d2.s6"),
+        TSDataType.INT64,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d2.s7"),
+        TSDataType.FLOAT,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d2.s8"),
+        TSDataType.DOUBLE,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d3.s1"),
+        TSDataType.INT32,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d3.s2"),
+        TSDataType.INT64,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d3.s3"),
+        TSDataType.FLOAT,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d3.s4"),
+        TSDataType.DOUBLE,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d4.s1"),
+        TSDataType.INT32,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d4.s2"),
+        TSDataType.INT64,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d4.s3"),
+        TSDataType.FLOAT,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d4.s4"),
+        TSDataType.DOUBLE,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d4.s5"),
+        TSDataType.INT32,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d4.s6"),
+        TSDataType.INT64,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d4.s7"),
+        TSDataType.FLOAT,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.schemaEngine.createTimeseries(
+        new PartialPath("root.vehicle.d4.s8"),
+        TSDataType.DOUBLE,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+  }
+
+  private static void generateData() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d1(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              100, 100, 100, 100, 100, 101, 101, 101, 101));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d1(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              100, 100, 100, 100, 100, 101, 101, 101, 101));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d1(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              200, 102, 102, 102, 102, 101, 101, 101, 101));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d1(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              300, 104, 104, 104, 104, 102, 102, 102, 102));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d1(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              400, 126, 126, 126, 126, 102, 102, 102, 102));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d1(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              500, 108, 108, 108, 108, 103, 103, 103, 103));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d1(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              600, 112, 112, 112, 112, 104, 104, 104, 104));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d1(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              700, 114, 114, 114, 114, 104, 104, 104, 104));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d1(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              800, 116, 116, 116, 116, 105, 105, 105, 105));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d1(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              900, 118, 118, 118, 118, 105, 105, 105, 105));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d1(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              1000, 100, 100, 100, 100, 106, 106, 106, 106));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d1(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              1100, 124, 124, 124, 124, 108, 108, 108, 108));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d1(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              1200, 126, 126, 126, 126, 108, 108, 108, 108));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d1(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              1300, 116, 116, 116, 116, 105, 105, 105, 105));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d2(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              100, 1, 1, 1, 1, 2, 2, 2, 2));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d2(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              200, 1, 1, 1, 1, 2, 2, 2, 2));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d2(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              300, 1, 1, 1, 1, 2, 2, 2, 2));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d2(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              400, 1, 1, 1, 1, 2, 2, 2, 2));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d2(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              500, 1, 1, 1, 1, 2, 2, 2, 2));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d2(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              600, 1, 1, 1, 1, 2, 2, 2, 2));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d2(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              700, 1, 1, 1, 1, 2, 2, 2, 2));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d2(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              800, 1, 1, 1, 1, 2, 2, 2, 2));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d3(timestamp,s1,s2,s3,s4) values(%d,%d,%d,%d,%d)",
+              100, 1, 1, 1, 1));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d3(timestamp,s1,s2,s3,s4) values(%d,%d,%d,%d,%d)",
+              200, 2, 2, 2, 2));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d3(timestamp,s1,s2,s3,s4) values(%d,%d,%d,%d,%d)",
+              300, 3, 3, 3, 3));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d3(timestamp,s1,s2,s3,s4) values(%d,%d,%d,%d,%d)",
+              400, 2, 2, 2, 2));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d3(timestamp,s1,s2,s3,s4) values(%d,%d,%d,%d,%d)",
+              500, 1, 1, 1, 1));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d3(timestamp,s1,s2,s3,s4) values(%d,%d,%d,%d,%d)",
+              600, 1, 1, 1, 1));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d3(timestamp,s1,s2,s3,s4) values(%d,%d,%d,%d,%d)",
+              700, 1, 1, 1, 1));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d3(timestamp,s1,s2,s3,s4) values(%d,%d,%d,%d,%d)",
+              800, 2, 2, 2, 2));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d3(timestamp,s1,s2,s3,s4) values(%d,%d,%d,%d,%d)",
+              900, 3, 3, 3, 3));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d3(timestamp,s1,s2,s3,s4) values(%d,%d,%d,%d,%d)",
+              1000, 2, 2, 2, 2));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d3(timestamp,s1,s2,s3,s4) values(%d,%d,%d,%d,%d)",
+              1100, 1, 1, 1, 1));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d4(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              100, 0, 0, 0, 0, 6, 6, 6, 6));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d4(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              200, 2, 2, 2, 2, 7, 7, 7, 7));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d4(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              300, 3, 3, 3, 3, 0, 0, 0, 0));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d4(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              400, 4, 4, 4, 4, 9, 9, 9, 9));
+      statement.addBatch(
+          String.format(
+              "insert into root.vehicle.d4(timestamp,s1,s2,s3,s4,s5,s6,s7,s8) values(%d,%d,%d,%d,%d,%d,%d,%d,%d))",
+              500, 5, 5, 5, 5, 10, 10, 10, 10));
+      statement.executeBatch();
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private static void registerUDF() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("create function cov as 'org.apache.iotdb.library.dmatch.UDAFCov'");
+      statement.execute("create function dtw as 'org.apache.iotdb.library.dmatch.UDAFDtw'");
+      statement.execute("create function pearson as 'org.apache.iotdb.library.dmatch.UDAFPearson'");
+      statement.execute("create function ptnsym as 'org.apache.iotdb.library.dmatch.UDTFPtnSym'");
+      statement.execute("create function xcorr as 'org.apache.iotdb.library.dmatch.UDTFXcorr'");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanAfterClass();
+    ConfigFactory.getConfig()
+        .setUdfCollectorMemoryBudgetInMB(oldUdfCollectorMemoryBudgetInMB)
+        .setUdfTransformerMemoryBudgetInMB(oldUdfTransformerMemoryBudgetInMB)
+        .setUdfReaderMemoryBudgetInMB(oldUdfReaderMemoryBudgetInMB);
+  }
+
+  @Test
+  public void testCov1() {
+    String sqlStr = "select cov(d1.s1,d1.s5) from root.vehicle";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      resultSet.next();
+      double result1 = resultSet.getDouble(1);
+      Assert.assertEquals(12.291666666666666, result1, 0.01);
+      Assert.assertFalse(resultSet.next());
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testCov2() {
+    String sqlStr = "select cov(d1.s2,d1.s6) from root.vehicle";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      resultSet.next();
+      double result1 = resultSet.getDouble(1);
+      Assert.assertEquals(12.291666666666666, result1, 0.01);
+      Assert.assertFalse(resultSet.next());
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testCov3() {
+    String sqlStr = "select cov(d1.s3,d1.s7) from root.vehicle";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      resultSet.next();
+      double result1 = resultSet.getDouble(1);
+      Assert.assertEquals(12.291666666666666, result1, 0.01);
+      Assert.assertFalse(resultSet.next());
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testCov4() {
+    String sqlStr = "select cov(d1.s4,d1.s8) from root.vehicle";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      resultSet.next();
+      double result1 = resultSet.getDouble(1);
+      Assert.assertEquals(12.291666666666666, result1, 0.01);
+      Assert.assertFalse(resultSet.next());
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testDtw1() {
+    String sqlStr = "select dtw(d2.s1,d2.s5) from root.vehicle";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      resultSet.next();
+      double result1 = resultSet.getDouble(1);
+      Assert.assertEquals(8.0, result1, 0.01);
+      Assert.assertFalse(resultSet.next());
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testDtw2() {
+    String sqlStr = "select dtw(d2.s2,d2.s6) from root.vehicle";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      resultSet.next();
+      double result1 = resultSet.getDouble(1);
+      Assert.assertEquals(8.0, result1, 0.01);
+      Assert.assertFalse(resultSet.next());
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testDtw3() {
+    String sqlStr = "select dtw(d2.s3,d2.s7) from root.vehicle";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      resultSet.next();
+      double result1 = resultSet.getDouble(1);
+      Assert.assertEquals(8.0, result1, 0.01);
+      Assert.assertFalse(resultSet.next());
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testDtw4() {
+    String sqlStr = "select dtw(d2.s4,d2.s8) from root.vehicle";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      resultSet.next();
+      double result1 = resultSet.getDouble(1);
+      Assert.assertEquals(8.0, result1, 0.01);
+      Assert.assertFalse(resultSet.next());
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testPearson1() {
+    String sqlStr = "select pearson(d1.s1,d1.s5) from root.vehicle";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      resultSet.next();
+      double result1 = resultSet.getDouble(1);
+      Assert.assertEquals(0.5630881927754872, result1, 0.01);
+      Assert.assertFalse(resultSet.next());
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testPearson2() {
+    String sqlStr = "select pearson(d1.s2,d1.s6) from root.vehicle";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      resultSet.next();
+      double result1 = resultSet.getDouble(1);
+      Assert.assertEquals(0.5630881927754872, result1, 0.01);
+      Assert.assertFalse(resultSet.next());
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testPearson3() {
+    String sqlStr = "select pearson(d1.s3,d1.s7) from root.vehicle";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      resultSet.next();
+      double result1 = resultSet.getDouble(1);
+      Assert.assertEquals(0.5630881927754872, result1, 0.01);
+      Assert.assertFalse(resultSet.next());
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testPearson4() {
+    String sqlStr = "select pearson(d1.s4,d1.s8) from root.vehicle";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      resultSet.next();
+      double result1 = resultSet.getDouble(1);
+      Assert.assertEquals(0.5630881927754872, result1, 0.01);
+      Assert.assertFalse(resultSet.next());
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testPthSym1() {
+    String sqlStr = "select ptnsym(d1.s1, 'window'='3', 'threshold'='0') from root.vehicle";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      resultSet.next();
+      double result1 = resultSet.getDouble(1);
+      resultSet.next();
+      double result2 = resultSet.getDouble(1);
+      Assert.assertEquals(0.0, result1, 0.01);
+      Assert.assertEquals(0.0, result2, 0.01);
+      Assert.assertFalse(resultSet.next());
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testPtnSym2() {
+    String sqlStr = "select ptnsym(d1.s2, 'window'='3', 'threshold'='0') from root.vehicle";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      resultSet.next();
+      double result1 = resultSet.getDouble(1);
+      resultSet.next();
+      double result2 = resultSet.getDouble(1);
+      Assert.assertEquals(0.0, result1, 0.01);
+      Assert.assertEquals(0.0, result2, 0.01);
+      Assert.assertFalse(resultSet.next());
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testPtnSym3() {
+    String sqlStr = "select ptnsym(d2.s1, 'window'='3', 'threshold'='0') from";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      resultSet.next();
+      double result1 = resultSet.getDouble(1);
+      resultSet.next();
+      double result2 = resultSet.getDouble(1);
+      Assert.assertEquals(0.0, result1, 0.01);
+      Assert.assertEquals(0.0, result2, 0.01);
+      Assert.assertFalse(resultSet.next());
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testPtnSym4() {
+    String sqlStr = "select ptnsym(d2.s2, 'window'='3', 'threshold'='0') from root.vehicle";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      resultSet.next();
+      double result1 = resultSet.getDouble(1);
+      resultSet.next();
+      double result2 = resultSet.getDouble(1);
+      Assert.assertEquals(0.0, result1, 0.01);
+      Assert.assertEquals(0.0, result2, 0.01);
+      Assert.assertFalse(resultSet.next());
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testXCorr1() {
+    String sqlStr = "select xcorr(d4.s1,d4.s5) from root.vehicle";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      resultSet.next();
+      double result1 = resultSet.getDouble(1);
+      resultSet.next();
+      double result2 = resultSet.getDouble(1);
+      resultSet.next();
+      double result3 = resultSet.getDouble(1);
+      resultSet.next();
+      double result4 = resultSet.getDouble(1);
+      resultSet.next();
+      double result5 = resultSet.getDouble(1);
+      resultSet.next();
+      double result6 = resultSet.getDouble(1);
+      resultSet.next();
+      double result7 = resultSet.getDouble(1);
+      resultSet.next();
+      double result8 = resultSet.getDouble(1);
+      resultSet.next();
+      double result9 = resultSet.getDouble(1);
+      Assert.assertEquals(0.0, result1, 0.01);
+      Assert.assertEquals(4.0, result2, 0.01);
+      Assert.assertEquals(9.6, result3, 0.01);
+      Assert.assertEquals(13.4, result4, 0.01);
+      Assert.assertEquals(20.0, result5, 0.01);
+      Assert.assertEquals(15.6, result6, 0.01);
+      Assert.assertEquals(9.2, result7, 0.01);
+      Assert.assertEquals(11.8, result8, 0.01);
+      Assert.assertEquals(6.0, result9, 0.01);
+      Assert.assertFalse(resultSet.next());
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testXCorr2() {
+    String sqlStr = "select xcorr(d4.s2, d4.s6) from root.vehicle";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      resultSet.next();
+      double result1 = resultSet.getDouble(1);
+      resultSet.next();
+      double result2 = resultSet.getDouble(1);
+      resultSet.next();
+      double result3 = resultSet.getDouble(1);
+      resultSet.next();
+      double result4 = resultSet.getDouble(1);
+      resultSet.next();
+      double result5 = resultSet.getDouble(1);
+      resultSet.next();
+      double result6 = resultSet.getDouble(1);
+      resultSet.next();
+      double result7 = resultSet.getDouble(1);
+      resultSet.next();
+      double result8 = resultSet.getDouble(1);
+      resultSet.next();
+      double result9 = resultSet.getDouble(1);
+      Assert.assertEquals(0.0, result1, 0.01);
+      Assert.assertEquals(4.0, result2, 0.01);
+      Assert.assertEquals(9.6, result3, 0.01);
+      Assert.assertEquals(13.4, result4, 0.01);
+      Assert.assertEquals(20.0, result5, 0.01);
+      Assert.assertEquals(15.6, result6, 0.01);
+      Assert.assertEquals(9.2, result7, 0.01);
+      Assert.assertEquals(11.8, result8, 0.01);
+      Assert.assertEquals(6.0, result9, 0.01);
+      Assert.assertFalse(resultSet.next());
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testXCorr3() {
+    String sqlStr = "select xcorr(d4.s3, d4.s7) from root.vehicle";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      resultSet.next();
+      double result1 = resultSet.getDouble(1);
+      resultSet.next();
+      double result2 = resultSet.getDouble(1);
+      resultSet.next();
+      double result3 = resultSet.getDouble(1);
+      resultSet.next();
+      double result4 = resultSet.getDouble(1);
+      resultSet.next();
+      double result5 = resultSet.getDouble(1);
+      resultSet.next();
+      double result6 = resultSet.getDouble(1);
+      resultSet.next();
+      double result7 = resultSet.getDouble(1);
+      resultSet.next();
+      double result8 = resultSet.getDouble(1);
+      resultSet.next();
+      double result9 = resultSet.getDouble(1);
+      Assert.assertEquals(0.0, result1, 0.01);
+      Assert.assertEquals(4.0, result2, 0.01);
+      Assert.assertEquals(9.6, result3, 0.01);
+      Assert.assertEquals(13.4, result4, 0.01);
+      Assert.assertEquals(20.0, result5, 0.01);
+      Assert.assertEquals(15.6, result6, 0.01);
+      Assert.assertEquals(9.2, result7, 0.01);
+      Assert.assertEquals(11.8, result8, 0.01);
+      Assert.assertEquals(6.0, result9, 0.01);
+      Assert.assertFalse(resultSet.next());
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testXCorr4() {
+    String sqlStr = "select xcorr(d4.s4,d4.s8) from root.vehicle";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      resultSet.next();
+      double result1 = resultSet.getDouble(1);
+      resultSet.next();
+      double result2 = resultSet.getDouble(1);
+      resultSet.next();
+      double result3 = resultSet.getDouble(1);
+      resultSet.next();
+      double result4 = resultSet.getDouble(1);
+      resultSet.next();
+      double result5 = resultSet.getDouble(1);
+      resultSet.next();
+      double result6 = resultSet.getDouble(1);
+      resultSet.next();
+      double result7 = resultSet.getDouble(1);
+      resultSet.next();
+      double result8 = resultSet.getDouble(1);
+      resultSet.next();
+      double result9 = resultSet.getDouble(1);
+      Assert.assertEquals(0.0, result1, 0.01);
+      Assert.assertEquals(4.0, result2, 0.01);
+      Assert.assertEquals(9.6, result3, 0.01);
+      Assert.assertEquals(13.4, result4, 0.01);
+      Assert.assertEquals(20.0, result5, 0.01);
+      Assert.assertEquals(15.6, result6, 0.01);
+      Assert.assertEquals(9.2, result7, 0.01);
+      Assert.assertEquals(11.8, result8, 0.01);
+      Assert.assertEquals(6.0, result9, 0.01);
+      Assert.assertFalse(resultSet.next());
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+}