You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/08/11 03:51:24 UTC

[iotdb] branch master updated: UDF function: MasterRepair (#6892)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4abfa2f32d UDF function: MasterRepair (#6892)
4abfa2f32d is described below

commit 4abfa2f32d74c5eb6df4a2844dd29e14acb4f7b5
Author: Husaimawx <44...@users.noreply.github.com>
AuthorDate: Thu Aug 11 11:51:18 2022 +0800

    UDF function: MasterRepair (#6892)
---
 docs/UserGuide/UDF-Library/Data-Repairing.md       |  61 ++++
 docs/zh/UserGuide/UDF-Library/Data-Repairing.md    |  62 ++++
 .../BuiltinTimeSeriesGeneratingFunctionEnum.java   |   3 +-
 .../db/it/udf/IoTDBUDTFBuiltinFunctionIT.java      | 170 +++++++++++
 .../BuiltinTimeSeriesGeneratingFunction.java       |   3 +-
 .../commons/udf/builtin/UDTFMasterRepair.java      |  97 +++++++
 .../apache/iotdb/commons/udf/utils/KDTreeUtil.java | 314 +++++++++++++++++++++
 .../iotdb/commons/udf/utils/MasterRepairUtil.java  | 300 ++++++++++++++++++++
 8 files changed, 1008 insertions(+), 2 deletions(-)

diff --git a/docs/UserGuide/UDF-Library/Data-Repairing.md b/docs/UserGuide/UDF-Library/Data-Repairing.md
index a6832fa244..2c1740e0c7 100644
--- a/docs/UserGuide/UDF-Library/Data-Repairing.md
+++ b/docs/UserGuide/UDF-Library/Data-Repairing.md
@@ -352,4 +352,65 @@ Output series:
 |2020-01-01T00:00:28.000+08:00|                                            126.0|
 |2020-01-01T00:00:30.000+08:00|                                            128.0|
 +-----------------------------+-------------------------------------------------+
+```
+
+## MasterRepair
+
+### Usage
+
+This function is used to clean time series with master data.
+
+**Name**: MasterRepair
+**Input Series:** Support multiple input series. The types are are in INT32 / INT64 / FLOAT / DOUBLE.
+
+**Parameters:**
+
++ `omega`: The window size. It is a non-negative integer whose unit is millisecond. By default, it will be estimated according to the distances of two tuples with various time differences.
++ `eta`: The distance threshold. It is a positive number. By default, it will be estimated according to the distance distribution of tuples in windows.
++ `k`: The number of neighbors in master data. It is a positive integer. By default, it will be estimated according to the tuple dis- tance of the k-th nearest neighbor in the master data.
++ `output_column`: The repaired column to output, defaults to 1 which means output the repair result of the first column.
+
+**Output Series:** Output a single series. The type is the same as the input. This series is the input after repairing.
+
+### Examples
+
+Input series:
+
+```
++-----------------------------+------------+------------+------------+------------+------------+------------+
+|                         Time|root.test.t1|root.test.t2|root.test.t3|root.test.m1|root.test.m2|root.test.m3|
++-----------------------------+------------+------------+------------+------------+------------+------------+
+|2021-07-01T12:00:01.000+08:00|        1704|     1154.55|       0.195|        1704|     1154.55|       0.195|
+|2021-07-01T12:00:02.000+08:00|        1702|     1152.30|       0.193|        1702|     1152.30|       0.193|
+|2021-07-01T12:00:03.000+08:00|        1702|     1148.65|       0.192|        1702|     1148.65|       0.192|
+|2021-07-01T12:00:04.000+08:00|        1701|     1145.20|       0.194|        1701|     1145.20|       0.194|
+|2021-07-01T12:00:07.000+08:00|        1703|     1150.55|       0.195|        1703|     1150.55|       0.195|
+|2021-07-01T12:00:08.000+08:00|        1694|     1151.55|       0.193|        1704|     1151.55|       0.193|
+|2021-07-01T12:01:09.000+08:00|        1705|     1153.55|       0.194|        1705|     1153.55|       0.194|
+|2021-07-01T12:01:10.000+08:00|        1706|     1152.30|       0.190|        1706|     1152.30|       0.190|
++-----------------------------+------------+------------+------------+------------+------------+------------+
+```
+
+SQL for query:
+
+```sql
+select MasterRepair(t1,t2,t3,m1,m2,m3) from root.test
+```
+
+Output series:
+
+
+```
++-----------------------------+-------------------------------------------------------------------------------------------+
+|                         Time|MasterRepair(root.test.t1,root.test.t2,root.test.t3,root.test.m1,root.test.m2,root.test.m3)|
++-----------------------------+-------------------------------------------------------------------------------------------+
+|2021-07-01T12:00:01.000+08:00|                                                                                       1704|
+|2021-07-01T12:00:02.000+08:00|                                                                                       1702|
+|2021-07-01T12:00:03.000+08:00|                                                                                       1702|
+|2021-07-01T12:00:04.000+08:00|                                                                                       1701|
+|2021-07-01T12:00:07.000+08:00|                                                                                       1703|
+|2021-07-01T12:00:08.000+08:00|                                                                                       1704|
+|2021-07-01T12:01:09.000+08:00|                                                                                       1705|
+|2021-07-01T12:01:10.000+08:00|                                                                                       1706|
++-----------------------------+-------------------------------------------------------------------------------------------+
 ```
\ No newline at end of file
diff --git a/docs/zh/UserGuide/UDF-Library/Data-Repairing.md b/docs/zh/UserGuide/UDF-Library/Data-Repairing.md
index e75d7c1782..c5d92fbfdd 100644
--- a/docs/zh/UserGuide/UDF-Library/Data-Repairing.md
+++ b/docs/zh/UserGuide/UDF-Library/Data-Repairing.md
@@ -343,4 +343,66 @@ select valuerepair(s1,'method'='LsGreedy') from root.test.d2
 |2020-01-01T00:00:28.000+08:00|                                            126.0|
 |2020-01-01T00:00:30.000+08:00|                                            128.0|
 +-----------------------------+-------------------------------------------------+
+```
+
+## MasterRepair
+
+### 函数简介
+
+本函数实现基于主数据的时间序列数据修复。
+
+**函数名:**MasterRepair
+
+**输入序列:** 支持多个输入序列,类型为 INT32 / INT64 / FLOAT / DOUBLE。
+
+**参数:**
+
+- `omega`:算法窗口大小,非负整数(单位为毫秒), 在缺省情况下,算法根据不同时间差下的两个元组距离自动估计该参数。
+- `eta`:算法距离阈值,正数, 在缺省情况下,算法根据窗口中元组的距离分布自动估计该参数。
+- `k`:主数据中的近邻数量,正整数, 在缺省情况下,算法根据主数据中的k个近邻的元组距离自动估计该参数。
+- `output_column`:输出列的序号,默认输出第一列的修复结果。
+
+**输出序列:**输出单个序列,类型与输入数据中对应列的类型相同,序列为输入列修复后的结果。
+
+### 使用示例
+
+输入序列:
+
+```
++-----------------------------+------------+------------+------------+------------+------------+------------+
+|                         Time|root.test.t1|root.test.t2|root.test.t3|root.test.m1|root.test.m2|root.test.m3|
++-----------------------------+------------+------------+------------+------------+------------+------------+
+|2021-07-01T12:00:01.000+08:00|        1704|     1154.55|       0.195|        1704|     1154.55|       0.195|
+|2021-07-01T12:00:02.000+08:00|        1702|     1152.30|       0.193|        1702|     1152.30|       0.193|
+|2021-07-01T12:00:03.000+08:00|        1702|     1148.65|       0.192|        1702|     1148.65|       0.192|
+|2021-07-01T12:00:04.000+08:00|        1701|     1145.20|       0.194|        1701|     1145.20|       0.194|
+|2021-07-01T12:00:07.000+08:00|        1703|     1150.55|       0.195|        1703|     1150.55|       0.195|
+|2021-07-01T12:00:08.000+08:00|        1694|     1151.55|       0.193|        1704|     1151.55|       0.193|
+|2021-07-01T12:01:09.000+08:00|        1705|     1153.55|       0.194|        1705|     1153.55|       0.194|
+|2021-07-01T12:01:10.000+08:00|        1706|     1152.30|       0.190|        1706|     1152.30|       0.190|
++-----------------------------+------------+------------+------------+------------+------------+------------+
+```
+
+用于查询的 SQL 语句:
+
+```sql
+select MasterRepair(t1,t2,t3,m1,m2,m3) from root.test
+```
+
+输出序列:
+
+
+```
++-----------------------------+-------------------------------------------------------------------------------------------+
+|                         Time|MasterRepair(root.test.t1,root.test.t2,root.test.t3,root.test.m1,root.test.m2,root.test.m3)|
++-----------------------------+-------------------------------------------------------------------------------------------+
+|2021-07-01T12:00:01.000+08:00|                                                                                       1704|
+|2021-07-01T12:00:02.000+08:00|                                                                                       1702|
+|2021-07-01T12:00:03.000+08:00|                                                                                       1702|
+|2021-07-01T12:00:04.000+08:00|                                                                                       1701|
+|2021-07-01T12:00:07.000+08:00|                                                                                       1703|
+|2021-07-01T12:00:08.000+08:00|                                                                                       1704|
+|2021-07-01T12:01:09.000+08:00|                                                                                       1705|
+|2021-07-01T12:01:10.000+08:00|                                                                                       1706|
++-----------------------------+-------------------------------------------------------------------------------------------+
 ```
\ No newline at end of file
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinTimeSeriesGeneratingFunctionEnum.java b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinTimeSeriesGeneratingFunctionEnum.java
index e18223182c..8ab1490d6c 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinTimeSeriesGeneratingFunctionEnum.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinTimeSeriesGeneratingFunctionEnum.java
@@ -74,7 +74,8 @@ public enum BuiltinTimeSeriesGeneratingFunctionEnum {
   EQUAL_SIZE_BUCKET_AGG_SAMPLE("EQUAL_SIZE_BUCKET_AGG_SAMPLE"),
   EQUAL_SIZE_BUCKET_M4_SAMPLE("EQUAL_SIZE_BUCKET_M4_SAMPLE"),
   EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE("EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE"),
-  JEXL("JEXL");
+  JEXL("JEXL"),
+  MASTER_REPAIR("MASTER_REPAIR");
 
   private final String functionName;
 
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDTFBuiltinFunctionIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDTFBuiltinFunctionIT.java
index 131bb9d399..759b3f53d1 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDTFBuiltinFunctionIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDTFBuiltinFunctionIT.java
@@ -1232,6 +1232,176 @@ public class IoTDBUDTFBuiltinFunctionIT {
     }
   }
 
+  @Test
+  public void testMasterRepair() {
+    // create time series with master data
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("SET STORAGE GROUP TO root.testMasterRepair");
+      statement.execute(
+          "CREATE TIMESERIES root.testMasterRepair.d1.s1 with datatype=FLOAT,encoding=PLAIN");
+      statement.execute(
+          "CREATE TIMESERIES root.testMasterRepair.d1.s2 with datatype=FLOAT,encoding=PLAIN");
+      statement.execute(
+          "CREATE TIMESERIES root.testMasterRepair.d1.s3 with datatype=FLOAT,encoding=PLAIN");
+      statement.execute(
+          "CREATE TIMESERIES root.testMasterRepair.d1.m1 with datatype=FLOAT,encoding=PLAIN");
+      statement.execute(
+          "CREATE TIMESERIES root.testMasterRepair.d1.m2 with datatype=FLOAT,encoding=PLAIN");
+      statement.execute(
+          "CREATE TIMESERIES root.testMasterRepair.d1.m3 with datatype=FLOAT,encoding=PLAIN");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+
+    String[] INSERT_SQL = {
+      "insert into root.testMasterRepair.d1(time, s1, s2, s3, m1, m2, m3) values (1,1704,1154.55,0.195,1704,1154.55,0.195)",
+      "insert into root.testMasterRepair.d1(time, s1, s2, s3, m1, m2, m3) values (2,1702,1152.30,0.193,1702,1152.30,0.193)",
+      "insert into root.testMasterRepair.d1(time, s1, s2, s3, m1, m2, m3) values (3,1702,1148.65,0.192,1702,1148.65,0.192)",
+      "insert into root.testMasterRepair.d1(time, s1, s2, s3, m1, m2, m3) values (4,1701,1145.20,0.194,1701,1145.20,0.194)",
+      "insert into root.testMasterRepair.d1(time, s1, s2, s3, m1, m2, m3) values (7,1703,1150.55,0.195,1703,1150.55,0.195)",
+      "insert into root.testMasterRepair.d1(time, s1, s2, s3, m1, m2, m3) values (8,1694,1151.55,0.193,1704,1151.55,0.193)",
+      "insert into root.testMasterRepair.d1(time, s1, s2, s3, m1, m2, m3) values (9,1705,1153.55,0.194,1705,1153.55,0.194)",
+      "insert into root.testMasterRepair.d1(time, s1, s2, s3, m1, m2, m3) values (10,1706,1152.30,0.190,1706,1152.30,0.190)",
+    };
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      for (String dataGenerationSql : INSERT_SQL) {
+        statement.execute(dataGenerationSql);
+      }
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      int[] timestamps = {1, 2, 3, 4, 7, 8, 9, 10};
+
+      // test 1
+      double[] r1 = {1704.0, 1702.0, 1702.0, 1701.0, 1703.0, 1702.0, 1705.0, 1706.0};
+      try (ResultSet resultSet =
+          statement.executeQuery(
+              "select master_repair(s1,s2,s3,m1,m2,m3) from root.testMasterRepair.d1")) {
+        int columnCount = resultSet.getMetaData().getColumnCount();
+        assertEquals(1 + 1, columnCount);
+        for (int i = 0; i < timestamps.length; i++) {
+          resultSet.next();
+          long expectedTimestamp = timestamps[i];
+          long actualTimestamp = Long.parseLong(resultSet.getString(1));
+          assertEquals(expectedTimestamp, actualTimestamp);
+          double expectedResult = r1[i];
+          double actualResult = resultSet.getDouble(2);
+          double delta = 0.001;
+          assertEquals(expectedResult, actualResult, delta);
+        }
+      }
+
+      // test 2
+      double[] r2 = {1154.55, 1152.30, 1148.65, 1145.20, 1150.55, 1152.30, 1153.55, 1152.30};
+
+      try (ResultSet resultSet =
+          statement.executeQuery(
+              "select master_repair(s1,s2,s3,m1,m2,m3,'output_column'='2') from root.testMasterRepair.d1")) {
+        int columnCount = resultSet.getMetaData().getColumnCount();
+        assertEquals(1 + 1, columnCount);
+        for (int i = 0; i < timestamps.length; i++) {
+          resultSet.next();
+          long expectedTimestamp = timestamps[i];
+          long actualTimestamp = Long.parseLong(resultSet.getString(1));
+          assertEquals(expectedTimestamp, actualTimestamp);
+
+          double expectedResult = r2[i];
+          double actualResult = resultSet.getDouble(2);
+          double delta = 0.001;
+          assertEquals(expectedResult, actualResult, delta);
+        }
+      }
+
+      // test 3
+      double[] r3 = {0.195, 0.193, 0.192, 0.194, 0.195, 0.193, 0.194, 0.190};
+      try (ResultSet resultSet =
+          statement.executeQuery(
+              "select master_repair(s1,s2,s3,m1,m2,m3,'output_column'='3') from root.testMasterRepair.d1")) {
+        int columnCount = resultSet.getMetaData().getColumnCount();
+        assertEquals(1 + 1, columnCount);
+        for (int i = 0; i < timestamps.length; i++) {
+          resultSet.next();
+          long expectedTimestamp = timestamps[i];
+          long actualTimestamp = Long.parseLong(resultSet.getString(1));
+          assertEquals(expectedTimestamp, actualTimestamp);
+
+          double expectedResult = r3[i];
+          double actualResult = resultSet.getDouble(2);
+          double delta = 0.001;
+          assertEquals(expectedResult, actualResult, delta);
+        }
+      }
+
+      // test 4
+      double[] r4 = {1704.0, 1702.0, 1702.0, 1701.0, 1703.0, 1704.0, 1705.0, 1706.0};
+      try (ResultSet resultSet =
+          statement.executeQuery(
+              "select master_repair(s1,s2,s3,m1,m2,m3,'omega'='2','eta'='3.0','k'='5') from root.testMasterRepair.d1")) {
+        int columnCount = resultSet.getMetaData().getColumnCount();
+        assertEquals(1 + 1, columnCount);
+        for (int i = 0; i < timestamps.length; i++) {
+          resultSet.next();
+          long expectedTimestamp = timestamps[i];
+          long actualTimestamp = Long.parseLong(resultSet.getString(1));
+          assertEquals(expectedTimestamp, actualTimestamp);
+
+          double expectedResult = r4[i];
+          double actualResult = resultSet.getDouble(2);
+          double delta = 0.001;
+          assertEquals(expectedResult, actualResult, delta);
+        }
+      }
+
+      // test 5
+      double[] r5 = {1154.55, 1152.30, 1148.65, 1145.20, 1150.55, 1151.55, 1153.55, 1152.30};
+      try (ResultSet resultSet =
+          statement.executeQuery(
+              "select master_repair(s1,s2,s3,m1,m2,m3,'omega'='2','eta'='3.0','k'='5','output_column'='2') from root.testMasterRepair.d1")) {
+        int columnCount = resultSet.getMetaData().getColumnCount();
+        assertEquals(1 + 1, columnCount);
+        for (int i = 0; i < timestamps.length; i++) {
+          resultSet.next();
+          long expectedTimestamp = timestamps[i];
+          long actualTimestamp = Long.parseLong(resultSet.getString(1));
+          assertEquals(expectedTimestamp, actualTimestamp);
+
+          double expectedResult = r5[i];
+          double actualResult = resultSet.getDouble(2);
+          double delta = 0.001;
+          assertEquals(expectedResult, actualResult, delta);
+        }
+      }
+
+      // test 6
+      double[] r6 = {0.195, 0.193, 0.192, 0.194, 0.195, 0.193, 0.194, 0.190};
+      try (ResultSet resultSet =
+          statement.executeQuery(
+              "select master_repair(s1,s2,s3,m1,m2,m3,'omega'='2','eta'='3.0','k'='5','output_column'='3') from root.testMasterRepair.d1")) {
+        int columnCount = resultSet.getMetaData().getColumnCount();
+        assertEquals(1 + 1, columnCount);
+        for (int i = 0; i < timestamps.length; i++) {
+          resultSet.next();
+          long expectedTimestamp = timestamps[i];
+          long actualTimestamp = Long.parseLong(resultSet.getString(1));
+          assertEquals(expectedTimestamp, actualTimestamp);
+
+          double expectedResult = r6[i];
+          double actualResult = resultSet.getDouble(2);
+          double delta = 0.001;
+          assertEquals(expectedResult, actualResult, delta);
+        }
+      }
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
   @Test
   public void testDeDup() {
     String[] createSQLs =
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java
index eaeb7b4edf..7f35189ef5 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java
@@ -88,7 +88,8 @@ public enum BuiltinTimeSeriesGeneratingFunction {
   EQUAL_SIZE_BUCKET_M4_SAMPLE("EQUAL_SIZE_BUCKET_M4_SAMPLE", UDTFEqualSizeBucketM4Sample.class),
   EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE(
       "EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE", UDTFEqualSizeBucketOutlierSample.class),
-  JEXL("JEXL", UDTFJexl.class);
+  JEXL("JEXL", UDTFJexl.class),
+  MASTER_REPAIR("MASTER_REPAIR", UDTFMasterRepair.class);
 
   private final String functionName;
   private final Class<?> functionClass;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFMasterRepair.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFMasterRepair.java
new file mode 100644
index 0000000000..23b711ba7e
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFMasterRepair.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.udf.builtin;
+
+import org.apache.iotdb.commons.udf.utils.MasterRepairUtil;
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.util.ArrayList;
+
+public class UDTFMasterRepair implements UDTF {
+  private MasterRepairUtil masterRepairUtil;
+  private int outputColumn;
+
+  @Override
+  public void validate(UDFParameterValidator validator) throws Exception {
+    for (int i = 0; i < validator.getParameters().getChildExpressionsSize(); i++) {
+      validator.validateInputSeriesDataType(i, Type.DOUBLE, Type.FLOAT, Type.INT32, Type.INT64);
+    }
+    if (validator.getParameters().hasAttribute("omega")) {
+      validator.validate(
+          omega -> (int) omega >= 0,
+          "Parameter omega should be non-negative.",
+          validator.getParameters().getInt("omega"));
+    }
+    if (validator.getParameters().hasAttribute("eta")) {
+      validator.validate(
+          eta -> (double) eta > 0,
+          "Parameter eta should be larger than 0.",
+          validator.getParameters().getDouble("eta"));
+    }
+    if (validator.getParameters().hasAttribute("k")) {
+      validator.validate(
+          k -> (int) k > 0,
+          "Parameter k should be a positive integer.",
+          validator.getParameters().getInt("k"));
+    }
+    if (validator.getParameters().hasAttribute("output_column")) {
+      validator.validate(
+          outputColumn -> (int) outputColumn > 0,
+          "Parameter output_column should be a positive integer.",
+          validator.getParameters().getInt("output_column"));
+    }
+  }
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    configurations.setAccessStrategy(new RowByRowAccessStrategy());
+    configurations.setOutputDataType(Type.DOUBLE);
+    int columnCnt = parameters.getDataTypes().size() / 2;
+    long omega = parameters.getLongOrDefault("omega", -1);
+    double eta = parameters.getDoubleOrDefault("eta", Double.NaN);
+    int k = parameters.getIntOrDefault("k", -1);
+    masterRepairUtil = new MasterRepairUtil(columnCnt, omega, eta, k);
+    outputColumn = parameters.getIntOrDefault("output_column", 1);
+  }
+
+  @Override
+  public void transform(Row row, PointCollector collector) throws Exception {
+    if (!masterRepairUtil.isNullRow(row)) {
+      masterRepairUtil.addRow(row);
+    }
+  }
+
+  @Override
+  public void terminate(PointCollector collector) throws Exception {
+    masterRepairUtil.repair();
+    ArrayList<Long> times = masterRepairUtil.getTime();
+    ArrayList<Double> column = masterRepairUtil.getCleanResultColumn(this.outputColumn);
+    for (int i = 0; i < column.size(); i++) {
+      collector.putDouble(times.get(i), column.get(i));
+    }
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/KDTreeUtil.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/KDTreeUtil.java
new file mode 100644
index 0000000000..4df4e7cf79
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/KDTreeUtil.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.udf.utils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Stack;
+
+import static java.lang.Math.min;
+import static java.lang.Math.sqrt;
+
+public class KDTreeUtil {
+  private Node kdTree;
+
+  private static class Node {
+    int partitionDimension;
+    double partitionValue;
+    ArrayList<Double> value;
+    boolean isLeaf = false;
+    Node left;
+    Node right;
+    //    min value of each dimension
+    ArrayList<Double> min;
+    //    max value of each dimension
+    ArrayList<Double> max;
+  }
+
+  public static KDTreeUtil build(ArrayList<ArrayList<Double>> input, int dimension) {
+    KDTreeUtil tree = new KDTreeUtil();
+    tree.kdTree = new Node();
+    tree.buildDetail(tree.kdTree, input, dimension);
+    return tree;
+  }
+
+  private void buildDetail(Node node, ArrayList<ArrayList<Double>> data, int dimensions) {
+    if (data.size() == 0) {
+      return;
+    }
+    if (data.size() == 1) {
+      node.isLeaf = true;
+      node.value = data.get(0);
+      return;
+    }
+    node.partitionDimension = -1;
+    double var = -1;
+    double tmpvar;
+    for (int i = 0; i < dimensions; i++) {
+      tmpvar = UtilZ.variance(data, i);
+      if (tmpvar > var) {
+        var = tmpvar;
+        node.partitionDimension = i;
+      }
+    }
+    if (var == 0d) {
+      node.isLeaf = true;
+      node.value = data.get(0);
+      return;
+    }
+    node.partitionValue = UtilZ.median(data, node.partitionDimension);
+
+    ArrayList<ArrayList<Double>> maxMin = UtilZ.maxMin(data, dimensions);
+    node.min = maxMin.get(0);
+    node.max = maxMin.get(1);
+
+    ArrayList<ArrayList<Double>> left = new ArrayList<>();
+    ArrayList<ArrayList<Double>> right = new ArrayList<>();
+
+    for (ArrayList<Double> d : data) {
+      if (d.get(node.partitionDimension) < node.partitionValue) {
+        left.add(d);
+      } else if (d.get(node.partitionDimension) > node.partitionValue) {
+        right.add(d);
+      }
+    }
+    for (ArrayList<Double> d : data) {
+      if (d.get(node.partitionDimension) == node.partitionValue) {
+        if (left.size() == 0) {
+          left.add(d);
+        } else {
+          right.add(d);
+        }
+      }
+    }
+
+    Node leftNode = new Node();
+    Node rightNode = new Node();
+    node.left = leftNode;
+    node.right = rightNode;
+    buildDetail(leftNode, left, dimensions);
+    buildDetail(rightNode, right, dimensions);
+  }
+
+  public ArrayList<Double> query(ArrayList<Double> input, double[] std) {
+    Node node = kdTree;
+    Stack<Node> stack = new Stack<>();
+    while (!node.isLeaf) {
+      if (input.get(node.partitionDimension) < node.partitionValue) {
+        stack.add(node.right);
+        node = node.left;
+      } else {
+        stack.push(node.left);
+        node = node.right;
+      }
+    }
+
+    double distance = UtilZ.distance(input, node.value, std);
+    ArrayList<Double> nearest = queryRec(input, distance, stack, std);
+    return nearest == null ? node.value : nearest;
+  }
+
+  public ArrayList<Double> queryRec(
+      ArrayList<Double> input, double distance, Stack<Node> stack, double[] std) {
+    ArrayList<Double> nearest = null;
+    Node node;
+    double tdis;
+    while (stack.size() != 0) {
+      node = stack.pop();
+      if (node.isLeaf) {
+        tdis = UtilZ.distance(input, node.value, std);
+        if (tdis < distance) {
+          distance = tdis;
+          nearest = node.value;
+        }
+      } else {
+        double minDistance = UtilZ.minDistance(input, node.max, node.min, std);
+        if (minDistance < distance) {
+          while (!node.isLeaf) {
+            if (input.get(node.partitionDimension) < node.partitionValue) {
+              stack.add(node.right);
+              node = node.left;
+            } else {
+              stack.push(node.left);
+              node = node.right;
+            }
+          }
+          tdis = UtilZ.distance(input, node.value, std);
+          if (tdis < distance) {
+            distance = tdis;
+            nearest = node.value;
+          }
+        }
+      }
+    }
+    return nearest;
+  }
+
+  public ArrayList<ArrayList<Double>> queryRecKNN(
+      ArrayList<Double> input, double distance, Stack<Node> stack, double[] std) {
+    ArrayList<ArrayList<Double>> nearest = new ArrayList<>();
+    Node node;
+    double tdis;
+    while (stack.size() != 0) {
+      node = stack.pop();
+      if (node.isLeaf) {
+        tdis = UtilZ.distance(input, node.value, std);
+        if (tdis < distance) {
+          distance = tdis;
+          nearest.add(node.value);
+        }
+      } else {
+        double minDistance = UtilZ.minDistance(input, node.max, node.min, std);
+        if (minDistance < distance) {
+          while (!node.isLeaf) {
+            if (input.get(node.partitionDimension) < node.partitionValue) {
+              stack.add(node.right);
+              node = node.left;
+            } else {
+              stack.push(node.left);
+              node = node.right;
+            }
+          }
+          tdis = UtilZ.distance(input, node.value, std);
+          if (tdis < distance) {
+            distance = tdis;
+            nearest.add(node.value);
+          }
+        }
+      }
+    }
+    return nearest;
+  }
+
+  public ArrayList<Double> findNearest(
+      ArrayList<Double> input, ArrayList<ArrayList<Double>> nearest, double[] std) {
+    double min_dis = Double.MAX_VALUE;
+    int min_index = 0;
+    for (int i = 0; i < nearest.size(); i++) {
+      double dis = UtilZ.distance(input, nearest.get(i), std);
+      if (dis < min_dis) {
+        min_dis = dis;
+        min_index = i;
+      }
+    }
+    ArrayList<Double> nt = nearest.get(min_index);
+    nearest.remove(min_index);
+    return nt;
+  }
+
+  public ArrayList<ArrayList<Double>> queryKNN(ArrayList<Double> input, int k, double[] std) {
+    ArrayList<ArrayList<Double>> kNearest = new ArrayList<>();
+    Node node = kdTree;
+    Stack<Node> stack = new Stack<>();
+    while (!node.isLeaf) {
+      if (input.get(node.partitionDimension) < node.partitionValue) {
+        stack.add(node.right);
+        node = node.left;
+      } else {
+        stack.push(node.left);
+        node = node.right;
+      }
+    }
+    double distance = UtilZ.distance(input, node.value, std);
+    ArrayList<ArrayList<Double>> nearest = queryRecKNN(input, distance, stack, std);
+    for (int i = 0; i < min(k, nearest.size()); i++) {
+      kNearest.add(findNearest(input, nearest, std));
+    }
+    if (kNearest.size() == 0) {
+      kNearest.add(node.value);
+    }
+    for (ArrayList<Double> doubles : kNearest) {
+      UtilZ.distance(doubles, input, std);
+    }
+    return kNearest;
+  }
+
+  private static class UtilZ {
+
+    static double variance(ArrayList<ArrayList<Double>> data, int dimension) {
+      double sum = 0d;
+      for (ArrayList<Double> d : data) {
+        sum += d.get(dimension);
+      }
+      double avg = sum / data.size();
+      double ans = 0d;
+      for (ArrayList<Double> d : data) {
+        double temp = d.get(dimension) - avg;
+        ans += temp * temp;
+      }
+      return ans / data.size();
+    }
+
+    static double median(ArrayList<ArrayList<Double>> data, int dimension) {
+      ArrayList<Double> d = new ArrayList<>();
+      for (ArrayList<Double> k : data) {
+        d.add(k.get(dimension));
+      }
+      Collections.sort(d);
+      int pos = d.size() / 2;
+      return d.get(pos);
+    }
+
+    static ArrayList<ArrayList<Double>> maxMin(ArrayList<ArrayList<Double>> data, int dimensions) {
+      ArrayList<ArrayList<Double>> mm = new ArrayList<>();
+      ArrayList<Double> min_v = new ArrayList<>();
+      ArrayList<Double> max_v = new ArrayList<>();
+      for (int i = 0; i < dimensions; i++) {
+        double min_temp = Double.MAX_VALUE;
+        double max_temp = Double.MIN_VALUE;
+        for (int j = 1; j < data.size(); j++) {
+          ArrayList<Double> d = data.get(j);
+          if (d.get(i) < min_temp) {
+            min_temp = d.get(i);
+          } else if (d.get(i) > max_temp) {
+            max_temp = d.get(i);
+          }
+        }
+        min_v.add(min_temp);
+        max_v.add(max_temp);
+      }
+      mm.add(min_v);
+      mm.add(max_v);
+      return mm;
+    }
+
+    static double distance(ArrayList<Double> a, ArrayList<Double> b, double[] std) {
+      double sum = 0d;
+      for (int i = 0; i < a.size(); i++) {
+        if (a.get(i) != null && b.get(i) != null)
+          sum += Math.pow((a.get(i) - b.get(i)) / std[i], 2);
+      }
+      sum = sqrt(sum);
+      return sum;
+    }
+
+    static double minDistance(
+        ArrayList<Double> a, ArrayList<Double> max, ArrayList<Double> min, double[] std) {
+      double sum = 0d;
+      for (int i = 0; i < a.size(); i++) {
+        if (a.get(i) > max.get(i)) sum += Math.pow((a.get(i) - max.get(i)) / std[i], 2);
+        else if (a.get(i) < min.get(i)) {
+          sum += Math.pow((min.get(i) - a.get(i)) / std[i], 2);
+        }
+      }
+      sum = sqrt(sum);
+      return sum;
+    }
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/MasterRepairUtil.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/MasterRepairUtil.java
new file mode 100644
index 0000000000..28ae8bb083
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/MasterRepairUtil.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.udf.utils;
+
+import org.apache.iotdb.udf.api.access.Row;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class MasterRepairUtil {
+  private final ArrayList<ArrayList<Double>> td = new ArrayList<>();
+  private final ArrayList<ArrayList<Double>> tdCleaned = new ArrayList<>();
+  private final ArrayList<ArrayList<Double>> md = new ArrayList<>();
+  private final ArrayList<Long> tdTime = new ArrayList<>();
+  private final int columnCnt;
+  private long omega;
+  private Double eta;
+  private int k;
+  private double[] std;
+  private KDTreeUtil kdTreeUtil;
+
+  public MasterRepairUtil(int columnCnt, long omega, double eta, int k) {
+    this.columnCnt = columnCnt;
+    this.omega = omega;
+    this.eta = eta;
+    this.k = k;
+  }
+
+  public boolean isNullRow(Row row) {
+    boolean flag = true;
+    for (int i = 0; i < row.size(); i++) {
+      if (!row.isNull(i)) {
+        flag = false;
+        break;
+      }
+    }
+    return flag;
+  }
+
+  public void addRow(Row row) throws Exception {
+    ArrayList<Double> tt = new ArrayList<>(); // time-series tuple
+    boolean containsNotNull = false;
+    for (int i = 0; i < this.columnCnt; i++) {
+      if (!row.isNull(i)) {
+        containsNotNull = true;
+        BigDecimal bd = BigDecimal.valueOf(getValueAsDouble(row, i));
+        tt.add(bd.doubleValue());
+      } else {
+        tt.add(null);
+      }
+    }
+    if (containsNotNull) {
+      td.add(tt);
+      tdTime.add(row.getTime());
+    }
+
+    ArrayList<Double> mt = new ArrayList<>(); // master tuple
+    containsNotNull = false;
+    for (int i = this.columnCnt; i < row.size(); i++) {
+      if (!row.isNull(i)) {
+        containsNotNull = true;
+        BigDecimal bd = BigDecimal.valueOf(getValueAsDouble(row, i));
+        mt.add(bd.doubleValue());
+      } else {
+        mt.add(null);
+      }
+    }
+    if (containsNotNull) {
+      md.add(mt);
+    }
+  }
+
+  public static double getValueAsDouble(Row row, int index) throws Exception {
+    double ans;
+    try {
+      switch (row.getDataType(index)) {
+        case INT32:
+          ans = row.getInt(index);
+          break;
+        case INT64:
+          ans = row.getLong(index);
+          break;
+        case FLOAT:
+          ans = row.getFloat(index);
+          break;
+        case DOUBLE:
+          ans = row.getDouble(index);
+          break;
+        default:
+          throw new Exception("The value of the input time series is not numeric.\n");
+      }
+    } catch (IOException e) {
+      throw new Exception("Fail to get data type in row " + row.getTime(), e);
+    }
+    return ans;
+  }
+
+  public void buildKDTree() {
+    this.kdTreeUtil = KDTreeUtil.build(md, this.columnCnt);
+  }
+
+  public ArrayList<Double> getCleanResultColumn(int columnPos) {
+    ArrayList<Double> column = new ArrayList<>();
+    for (ArrayList<Double> tuple : this.tdCleaned) {
+      column.add(tuple.get(columnPos - 1));
+    }
+    return column;
+  }
+
+  public ArrayList<Long> getTime() {
+    return tdTime;
+  }
+
+  public double getTmDistance(ArrayList<Double> tTuple, ArrayList<Double> mTuple) {
+    double distance = 0d;
+    for (int pos = 0; pos < columnCnt; pos++) {
+      double temp = tTuple.get(pos) - mTuple.get(pos);
+      temp = temp / std[pos];
+      distance += temp * temp;
+    }
+    distance = Math.sqrt(distance);
+    return distance;
+  }
+
+  public ArrayList<Integer> calW(int i) {
+    ArrayList<Integer> Wi = new ArrayList<>();
+    for (int l = i - 1; l >= 0; l--) {
+      if (this.tdTime.get(i) <= this.tdTime.get(l) + omega) {
+        Wi.add(l);
+      }
+    }
+    return Wi;
+  }
+
+  public ArrayList<ArrayList<Double>> calC(int i, ArrayList<Integer> Wi) {
+    ArrayList<ArrayList<Double>> Ci = new ArrayList<>();
+    if (Wi.size() == 0) {
+      Ci.add(this.kdTreeUtil.query(this.td.get(i), std));
+    } else {
+      Ci.addAll(this.kdTreeUtil.queryKNN(this.td.get(i), k, std));
+      for (Integer integer : Wi) {
+        Ci.addAll(this.kdTreeUtil.queryKNN(this.tdCleaned.get(integer), k, std));
+      }
+    }
+    return Ci;
+  }
+
+  public void masterRepair() {
+    for (int i = 0; i < this.td.size(); i++) {
+      ArrayList<Double> tuple = this.td.get(i);
+      ArrayList<Integer> Wi = calW(i);
+      ArrayList<ArrayList<Double>> Ci = this.calC(i, Wi);
+      double minDis = Double.MAX_VALUE;
+      ArrayList<Double> repairTuple = new ArrayList<>();
+      for (ArrayList<Double> ci : Ci) {
+        boolean smooth = true;
+        for (Integer wi : Wi) {
+          ArrayList<Double> wis = tdCleaned.get(wi);
+          if (getTmDistance(ci, wis) > eta) {
+            smooth = false;
+            break;
+          }
+        }
+        if (smooth) {
+          double dis = getTmDistance(ci, tuple);
+          if (dis < minDis) {
+            minDis = dis;
+            repairTuple = ci;
+          }
+        }
+      }
+      this.tdCleaned.add(repairTuple);
+    }
+  }
+
+  public void setParameters() {
+    if (omega == -1) {
+      ArrayList<Long> intervals = getIntervals();
+      Collections.sort(intervals);
+      long interval = intervals.get(intervals.size() / 2);
+      omega = interval * 10;
+    }
+    if (Double.isNaN(eta)) {
+      ArrayList<Double> distanceList = new ArrayList<>();
+      for (int i = 1; i < this.td.size(); i++) {
+        for (int l = i - 1; l >= 0; l--) {
+          if (this.tdTime.get(i) <= this.tdTime.get(l) + omega) {
+            distanceList.add(getTmDistance(this.td.get(i), this.td.get(l)));
+          } else break;
+        }
+      }
+      Collections.sort(distanceList);
+      eta = distanceList.get((int) (distanceList.size() * 0.9973));
+    }
+    if (k == -1) {
+      for (int tempK = 2; tempK <= 5; tempK++) {
+        ArrayList<Double> distanceList = new ArrayList<>();
+        for (ArrayList<Double> tuple : this.td) {
+          ArrayList<ArrayList<Double>> neighbors = this.kdTreeUtil.queryKNN(tuple, tempK, std);
+          for (ArrayList<Double> neighbor : neighbors) {
+            distanceList.add(getTmDistance(tuple, neighbor));
+          }
+        }
+        Collections.sort(distanceList);
+        if (distanceList.get((int) (distanceList.size() * 0.9)) > eta) {
+          k = tempK;
+          break;
+        }
+      }
+      if (k == -1) {
+        k = Integer.min(5, this.md.size());
+      }
+    }
+  }
+
+  private double varianceImperative(double[] value) {
+    double average = 0.0;
+    int cnt = 0;
+    for (double p : value) {
+      if (!Double.isNaN(p)) {
+        cnt += 1;
+        average += p;
+      }
+    }
+    if (cnt == 0) {
+      return 0d;
+    }
+    average /= cnt;
+
+    double variance = 0.0;
+    for (double p : value) {
+      if (!Double.isNaN(p)) {
+        variance += (p - average) * (p - average);
+      }
+    }
+    return variance / cnt;
+  }
+
+  private double[] getColumn(int pos) {
+    double[] column = new double[this.td.size()];
+    for (int i = 0; i < this.td.size(); i++) {
+      column[i] = this.td.get(i).get(pos);
+    }
+    return column;
+  }
+
+  public void callStd() {
+    this.std = new double[this.columnCnt];
+    for (int i = 0; i < this.columnCnt; i++) {
+      std[i] = Math.sqrt(varianceImperative(getColumn(i)));
+    }
+  }
+
+  public void repair() {
+    fillNullValue();
+    buildKDTree();
+    callStd();
+    setParameters();
+    masterRepair();
+  }
+
+  public ArrayList<Long> getIntervals() {
+    ArrayList<Long> intervals = new ArrayList<>();
+    for (int i = 1; i < this.tdTime.size(); i++) {
+      intervals.add(this.tdTime.get(i) - this.tdTime.get(i - 1));
+    }
+    return intervals;
+  }
+
+  public void fillNullValue() {
+    for (int i = 0; i < columnCnt; i++) {
+      double temp = this.td.get(0).get(i);
+      for (ArrayList<Double> arrayList : this.td) {
+        if (arrayList.get(i) == null) {
+          arrayList.set(i, temp);
+        } else {
+          temp = arrayList.get(i);
+        }
+      }
+    }
+  }
+}