You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/10/28 14:55:55 UTC

[iotdb] branch research/timeseries-master-data created (now b00b77b75b)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a change to branch research/timeseries-master-data
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at b00b77b75b Research/timeseries master data (#6890)

This branch includes the following new commits:

     new b00b77b75b Research/timeseries master data (#6890)

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Research/timeseries master data (#6890)

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch research/timeseries-master-data
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b00b77b75b0217d371b67aa90465880e35519b3f
Author: Husaimawx <44...@users.noreply.github.com>
AuthorDate: Thu Aug 4 10:38:10 2022 +0800

    Research/timeseries master data (#6890)
    
    * add new udf function(MasterRepair)
    
    * add docs of new udf function(MasterRepair)
---
 docs/UserGuide/UDF-Library/Data-Repairing.md       |  61 ++++
 docs/zh/UserGuide/UDF-Library/Data-Repairing.md    |  62 ++++
 .../iotdb/library/drepair/UDTFMasterRepair.java    |  79 +++++
 .../iotdb/library/drepair/util/KDTreeUtil.java     | 336 +++++++++++++++++++++
 .../library/drepair/util/MasterRepairUtil.java     | 260 ++++++++++++++++
 5 files changed, 798 insertions(+)

diff --git a/docs/UserGuide/UDF-Library/Data-Repairing.md b/docs/UserGuide/UDF-Library/Data-Repairing.md
index a6832fa244..2c1740e0c7 100644
--- a/docs/UserGuide/UDF-Library/Data-Repairing.md
+++ b/docs/UserGuide/UDF-Library/Data-Repairing.md
@@ -352,4 +352,65 @@ Output series:
 |2020-01-01T00:00:28.000+08:00|                                            126.0|
 |2020-01-01T00:00:30.000+08:00|                                            128.0|
 +-----------------------------+-------------------------------------------------+
+```
+
+## MasterRepair
+
+### Usage
+
+This function is used to clean time series with master data.
+
+**Name**: MasterRepair
+**Input Series:** Support multiple input series. The types are are in INT32 / INT64 / FLOAT / DOUBLE.
+
+**Parameters:**
+
++ `omega`: The window size. It is a non-negative integer whose unit is millisecond. By default, it will be estimated according to the distances of two tuples with various time differences.
++ `eta`: The distance threshold. It is a positive number. By default, it will be estimated according to the distance distribution of tuples in windows.
++ `k`: The number of neighbors in master data. It is a positive integer. By default, it will be estimated according to the tuple dis- tance of the k-th nearest neighbor in the master data.
++ `output_column`: The repaired column to output, defaults to 1 which means output the repair result of the first column.
+
+**Output Series:** Output a single series. The type is the same as the input. This series is the input after repairing.
+
+### Examples
+
+Input series:
+
+```
++-----------------------------+------------+------------+------------+------------+------------+------------+
+|                         Time|root.test.t1|root.test.t2|root.test.t3|root.test.m1|root.test.m2|root.test.m3|
++-----------------------------+------------+------------+------------+------------+------------+------------+
+|2021-07-01T12:00:01.000+08:00|        1704|     1154.55|       0.195|        1704|     1154.55|       0.195|
+|2021-07-01T12:00:02.000+08:00|        1702|     1152.30|       0.193|        1702|     1152.30|       0.193|
+|2021-07-01T12:00:03.000+08:00|        1702|     1148.65|       0.192|        1702|     1148.65|       0.192|
+|2021-07-01T12:00:04.000+08:00|        1701|     1145.20|       0.194|        1701|     1145.20|       0.194|
+|2021-07-01T12:00:07.000+08:00|        1703|     1150.55|       0.195|        1703|     1150.55|       0.195|
+|2021-07-01T12:00:08.000+08:00|        1694|     1151.55|       0.193|        1704|     1151.55|       0.193|
+|2021-07-01T12:01:09.000+08:00|        1705|     1153.55|       0.194|        1705|     1153.55|       0.194|
+|2021-07-01T12:01:10.000+08:00|        1706|     1152.30|       0.190|        1706|     1152.30|       0.190|
++-----------------------------+------------+------------+------------+------------+------------+------------+
+```
+
+SQL for query:
+
+```sql
+select MasterRepair(t1,t2,t3,m1,m2,m3) from root.test
+```
+
+Output series:
+
+
+```
++-----------------------------+-------------------------------------------------------------------------------------------+
+|                         Time|MasterRepair(root.test.t1,root.test.t2,root.test.t3,root.test.m1,root.test.m2,root.test.m3)|
++-----------------------------+-------------------------------------------------------------------------------------------+
+|2021-07-01T12:00:01.000+08:00|                                                                                       1704|
+|2021-07-01T12:00:02.000+08:00|                                                                                       1702|
+|2021-07-01T12:00:03.000+08:00|                                                                                       1702|
+|2021-07-01T12:00:04.000+08:00|                                                                                       1701|
+|2021-07-01T12:00:07.000+08:00|                                                                                       1703|
+|2021-07-01T12:00:08.000+08:00|                                                                                       1704|
+|2021-07-01T12:01:09.000+08:00|                                                                                       1705|
+|2021-07-01T12:01:10.000+08:00|                                                                                       1706|
++-----------------------------+-------------------------------------------------------------------------------------------+
 ```
\ No newline at end of file
diff --git a/docs/zh/UserGuide/UDF-Library/Data-Repairing.md b/docs/zh/UserGuide/UDF-Library/Data-Repairing.md
index e75d7c1782..c5d92fbfdd 100644
--- a/docs/zh/UserGuide/UDF-Library/Data-Repairing.md
+++ b/docs/zh/UserGuide/UDF-Library/Data-Repairing.md
@@ -343,4 +343,66 @@ select valuerepair(s1,'method'='LsGreedy') from root.test.d2
 |2020-01-01T00:00:28.000+08:00|                                            126.0|
 |2020-01-01T00:00:30.000+08:00|                                            128.0|
 +-----------------------------+-------------------------------------------------+
+```
+
+## MasterRepair
+
+### 函数简介
+
+本函数实现基于主数据的时间序列数据修复。
+
+**函数名:**MasterRepair
+
+**输入序列:** 支持多个输入序列,类型为 INT32 / INT64 / FLOAT / DOUBLE。
+
+**参数:**
+
+- `omega`:算法窗口大小,非负整数(单位为毫秒), 在缺省情况下,算法根据不同时间差下的两个元组距离自动估计该参数。
+- `eta`:算法距离阈值,正数, 在缺省情况下,算法根据窗口中元组的距离分布自动估计该参数。
+- `k`:主数据中的近邻数量,正整数, 在缺省情况下,算法根据主数据中的k个近邻的元组距离自动估计该参数。
+- `output_column`:输出列的序号,默认输出第一列的修复结果。
+
+**输出序列:**输出单个序列,类型与输入数据中对应列的类型相同,序列为输入列修复后的结果。
+
+### 使用示例
+
+输入序列:
+
+```
++-----------------------------+------------+------------+------------+------------+------------+------------+
+|                         Time|root.test.t1|root.test.t2|root.test.t3|root.test.m1|root.test.m2|root.test.m3|
++-----------------------------+------------+------------+------------+------------+------------+------------+
+|2021-07-01T12:00:01.000+08:00|        1704|     1154.55|       0.195|        1704|     1154.55|       0.195|
+|2021-07-01T12:00:02.000+08:00|        1702|     1152.30|       0.193|        1702|     1152.30|       0.193|
+|2021-07-01T12:00:03.000+08:00|        1702|     1148.65|       0.192|        1702|     1148.65|       0.192|
+|2021-07-01T12:00:04.000+08:00|        1701|     1145.20|       0.194|        1701|     1145.20|       0.194|
+|2021-07-01T12:00:07.000+08:00|        1703|     1150.55|       0.195|        1703|     1150.55|       0.195|
+|2021-07-01T12:00:08.000+08:00|        1694|     1151.55|       0.193|        1704|     1151.55|       0.193|
+|2021-07-01T12:01:09.000+08:00|        1705|     1153.55|       0.194|        1705|     1153.55|       0.194|
+|2021-07-01T12:01:10.000+08:00|        1706|     1152.30|       0.190|        1706|     1152.30|       0.190|
++-----------------------------+------------+------------+------------+------------+------------+------------+
+```
+
+用于查询的 SQL 语句:
+
+```sql
+select MasterRepair(t1,t2,t3,m1,m2,m3) from root.test
+```
+
+输出序列:
+
+
+```
++-----------------------------+-------------------------------------------------------------------------------------------+
+|                         Time|MasterRepair(root.test.t1,root.test.t2,root.test.t3,root.test.m1,root.test.m2,root.test.m3)|
++-----------------------------+-------------------------------------------------------------------------------------------+
+|2021-07-01T12:00:01.000+08:00|                                                                                       1704|
+|2021-07-01T12:00:02.000+08:00|                                                                                       1702|
+|2021-07-01T12:00:03.000+08:00|                                                                                       1702|
+|2021-07-01T12:00:04.000+08:00|                                                                                       1701|
+|2021-07-01T12:00:07.000+08:00|                                                                                       1703|
+|2021-07-01T12:00:08.000+08:00|                                                                                       1704|
+|2021-07-01T12:01:09.000+08:00|                                                                                       1705|
+|2021-07-01T12:01:10.000+08:00|                                                                                       1706|
++-----------------------------+-------------------------------------------------------------------------------------------+
 ```
\ No newline at end of file
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFMasterRepair.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFMasterRepair.java
new file mode 100644
index 0000000000..07e7eae093
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFMasterRepair.java
@@ -0,0 +1,79 @@
+package org.apache.iotdb.library.drepair;
+
+import org.apache.iotdb.library.drepair.util.MasterRepairUtil;
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.util.ArrayList;
+
+public class UDTFMasterRepair implements UDTF {
+  private MasterRepairUtil masterRepairUtil;
+  private int output_column;
+
+  @Override
+  public void validate(UDFParameterValidator validator) throws Exception {
+    for (int i = 0; i < validator.getParameters().getAttributes().size(); i++) {
+      validator.validateInputSeriesDataType(i, Type.DOUBLE, Type.FLOAT, Type.INT32, Type.INT64);
+    }
+    if (validator.getParameters().hasAttribute("omega")) {
+      validator.validate(
+          omega -> (int) omega >= 0,
+          "Parameter omega should be non-negative.",
+          validator.getParameters().getInt("omega"));
+    }
+    if (validator.getParameters().hasAttribute("eta")) {
+      validator.validate(
+          eta -> (double) eta > 0,
+          "Parameter eta should be larger than 0.",
+          validator.getParameters().getDouble("eta"));
+    }
+    if (validator.getParameters().hasAttribute("k")) {
+      validator.validate(
+          k -> (int) k > 0,
+          "Parameter k should be a positive integer.",
+          validator.getParameters().getInt("k"));
+    }
+    if (validator.getParameters().hasAttribute("output_column")) {
+      validator.validate(
+          output_column -> (int) output_column > 0,
+          "Parameter output_column should be a positive integer.",
+          validator.getParameters().getInt("output_column"));
+    }
+  }
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    configurations.setAccessStrategy(new RowByRowAccessStrategy());
+    configurations.setOutputDataType(Type.DOUBLE);
+    int columnCnt = parameters.getDataTypes().size() / 2;
+    long omega = parameters.getLongOrDefault("omega", -1);
+    double eta = parameters.getDoubleOrDefault("eta", Double.NaN);
+    int k = parameters.getIntOrDefault("k", -1);
+    masterRepairUtil = new MasterRepairUtil(columnCnt, omega, eta, k);
+    output_column = parameters.getIntOrDefault("output_column", 1);
+  }
+
+  @Override
+  public void transform(Row row, PointCollector collector) throws Exception {
+    if (!masterRepairUtil.isNullRow(row)) {
+      masterRepairUtil.addRow(row);
+    }
+  }
+
+  @Override
+  public void terminate(PointCollector collector) throws Exception {
+    masterRepairUtil.repair();
+    ArrayList<Long> times = masterRepairUtil.getTime();
+    ArrayList<Double> column = masterRepairUtil.getCleanResultColumn(this.output_column);
+    for (int i = 0; i < column.size(); i++) {
+      collector.putDouble(times.get(i), column.get(i));
+    }
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/KDTreeUtil.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/KDTreeUtil.java
new file mode 100644
index 0000000000..49f2c48da0
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/KDTreeUtil.java
@@ -0,0 +1,336 @@
+package org.apache.iotdb.library.drepair.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Stack;
+
+import static java.lang.Math.min;
+import static java.lang.Math.sqrt;
+
+public class KDTreeUtil {
+  private Node kdtree;
+
+  private static class Node {
+    // 分割的维度
+    int partitiondimension;
+    // 分割的值
+    double partitionValue;
+    // 如果为非叶子节点,该属性为空
+    // 否则为数据
+    ArrayList<Double> value;
+    // 是否为叶子
+    boolean isLeaf = false;
+    // 左树
+    Node left;
+    // 右树
+    Node right;
+    // 每个维度的最小值
+    ArrayList<Double> min;
+    // 每个维度的最大值
+    ArrayList<Double> max;
+  }
+
+  public static KDTreeUtil build(ArrayList<ArrayList<Double>> input, int dimension) {
+    KDTreeUtil tree = new KDTreeUtil();
+    tree.kdtree = new Node();
+    tree.buildDetail(tree.kdtree, input, dimension);
+    return tree;
+  }
+
+  private void buildDetail(Node node, ArrayList<ArrayList<Double>> data, int dimensions) {
+    if (data.size() == 0) {
+      return;
+    }
+    if (data.size() == 1) {
+      node.isLeaf = true;
+      node.value = data.get(0);
+      return;
+    }
+    // 选择方差最大的维度
+    node.partitiondimension = -1;
+    double var = -1;
+    double tmpvar;
+    for (int i = 0; i < dimensions; i++) {
+      tmpvar = UtilZ.variance(data, i);
+      if (tmpvar > var) {
+        var = tmpvar;
+        node.partitiondimension = i;
+      }
+    }
+    // 如果方差=0,表示所有数据都相同,判定为叶子节点
+    if (var == 0d) {
+      node.isLeaf = true;
+      node.value = data.get(0);
+      return;
+    }
+
+    // 选择分割的值
+    node.partitionValue = UtilZ.median(data, node.partitiondimension);
+
+    ArrayList<ArrayList<Double>> maxmin = UtilZ.maxmin(data, dimensions);
+    node.min = maxmin.get(0);
+    node.max = maxmin.get(1);
+
+    ArrayList<ArrayList<Double>> left = new ArrayList<>();
+    ArrayList<ArrayList<Double>> right = new ArrayList<>();
+
+    for (ArrayList<Double> d : data) {
+      if (d.get(node.partitiondimension) < node.partitionValue) {
+        left.add(d);
+      } else if (d.get(node.partitiondimension) > node.partitionValue) {
+        right.add(d);
+      }
+    }
+    for (ArrayList<Double> d : data) {
+      if (d.get(node.partitiondimension) == node.partitionValue) {
+        if (left.size() == 0) {
+          left.add(d);
+        } else {
+          right.add(d);
+        }
+      }
+    }
+
+    Node leftnode = new Node();
+    Node rightnode = new Node();
+    node.left = leftnode;
+    node.right = rightnode;
+    buildDetail(leftnode, left, dimensions);
+    buildDetail(rightnode, right, dimensions);
+  }
+
+  public ArrayList<Double> query(ArrayList<Double> input, double[] std) {
+    Node node = kdtree;
+    Stack<Node> stack = new Stack<>();
+    while (!node.isLeaf) {
+      if (input.get(node.partitiondimension) < node.partitionValue) {
+        stack.add(node.right);
+        node = node.left;
+      } else {
+        stack.push(node.left);
+        node = node.right;
+      }
+    }
+
+    double distance = UtilZ.distance(input, node.value, std);
+    ArrayList<Double> nearest = queryRec(input, distance, stack, std);
+    return nearest == null ? node.value : nearest;
+  }
+
+  public ArrayList<Double> queryRec(
+      ArrayList<Double> input, double distance, Stack<Node> stack, double[] std) {
+    ArrayList<Double> nearest = null;
+    Node node;
+    double tdis;
+    while (stack.size() != 0) {
+      node = stack.pop();
+      if (node.isLeaf) {
+        tdis = UtilZ.distance(input, node.value, std);
+        if (tdis < distance) {
+          distance = tdis;
+          nearest = node.value;
+        }
+      } else {
+        double mindistance = UtilZ.mindistance(input, node.max, node.min, std);
+        if (mindistance < distance) {
+          while (!node.isLeaf) {
+            if (input.get(node.partitiondimension) < node.partitionValue) {
+              stack.add(node.right);
+              node = node.left;
+            } else {
+              stack.push(node.left);
+              node = node.right;
+            }
+          }
+          tdis = UtilZ.distance(input, node.value, std);
+          if (tdis < distance) {
+            distance = tdis;
+            nearest = node.value;
+          }
+        }
+      }
+    }
+    return nearest;
+  }
+
+  public ArrayList<ArrayList<Double>> queryRecKNN(
+      ArrayList<Double> input, double distance, Stack<Node> stack, double[] std) {
+    ArrayList<ArrayList<Double>> nearest = new ArrayList<>();
+    Node node;
+    double tdis;
+    while (stack.size() != 0) {
+      node = stack.pop();
+      if (node.isLeaf) {
+        tdis = UtilZ.distance(input, node.value, std);
+        if (tdis < distance) {
+          distance = tdis;
+          nearest.add(node.value);
+        }
+      } else {
+        /*
+         * 得到该节点代表的超矩形中点到查找点的最小距离mindistance
+         * 如果mindistance<distance表示有可能在这个节点的子节点上找到更近的点
+         * 否则不可能找到
+         */
+        double mindistance = UtilZ.mindistance(input, node.max, node.min, std);
+        if (mindistance < distance) {
+          while (!node.isLeaf) {
+            if (input.get(node.partitiondimension) < node.partitionValue) {
+              stack.add(node.right);
+              node = node.left;
+            } else {
+              stack.push(node.left);
+              node = node.right;
+            }
+          }
+          tdis = UtilZ.distance(input, node.value, std);
+          if (tdis < distance) {
+            distance = tdis;
+            nearest.add(node.value);
+          }
+        }
+      }
+    }
+    return nearest;
+  }
+
+  public ArrayList<Double> findNearest(
+      ArrayList<Double> input, ArrayList<ArrayList<Double>> nearest, double[] std) {
+    double min_dis = Double.MAX_VALUE;
+    int min_index = 0;
+    for (int i = 0; i < nearest.size(); i++) {
+      double dis = UtilZ.distance(input, nearest.get(i), std);
+      if (dis < min_dis) {
+        min_dis = dis;
+        min_index = i;
+      }
+    }
+    ArrayList<Double> nt = nearest.get(min_index);
+    nearest.remove(min_index);
+    return nt;
+  }
+
+  public ArrayList<ArrayList<Double>> queryKNN(ArrayList<Double> input, int k, double[] std) {
+    ArrayList<ArrayList<Double>> kNearest = new ArrayList<>();
+    Node node = kdtree;
+    Stack<Node> stack = new Stack<>();
+    while (!node.isLeaf) {
+      if (input.get(node.partitiondimension) < node.partitionValue) {
+        stack.add(node.right);
+        node = node.left;
+      } else {
+        stack.push(node.left);
+        node = node.right;
+      }
+    }
+    double distance = UtilZ.distance(input, node.value, std);
+    ArrayList<ArrayList<Double>> nearest = queryRecKNN(input, distance, stack, std);
+    for (int i = 0; i < min(k, nearest.size()); i++) {
+      kNearest.add(findNearest(input, nearest, std));
+    }
+    if (kNearest.size() == 0) {
+      kNearest.add(node.value);
+    }
+    for (ArrayList<Double> doubles : kNearest) {
+      UtilZ.distance(doubles, input, std);
+    }
+    return kNearest;
+  }
+
+  public static class TupleWithDistance implements Comparable<TupleWithDistance> {
+    private final Double distance;
+    private final ArrayList<Double> tuple;
+
+    public TupleWithDistance(Double distance, ArrayList<Double> tuple) {
+      this.distance = distance;
+      this.tuple = tuple;
+    }
+
+    @Override
+    public int compareTo(TupleWithDistance t) {
+      return this.distance.compareTo(t.distance);
+    }
+
+    public Double getDistance() {
+      return distance;
+    }
+
+    public ArrayList<Double> getTuple() {
+      return tuple;
+    }
+  }
+
+  private static class UtilZ {
+
+    static double variance(ArrayList<ArrayList<Double>> data, int dimension) {
+      double sum = 0d;
+      for (ArrayList<Double> d : data) {
+        sum += d.get(dimension);
+      }
+      double avg = sum / data.size();
+      double ans = 0d;
+      for (ArrayList<Double> d : data) {
+        double temp = d.get(dimension) - avg;
+        ans += temp * temp;
+      }
+      return ans / data.size();
+    }
+
+    static double median(ArrayList<ArrayList<Double>> data, int dimension) {
+      ArrayList<Double> d = new ArrayList<>();
+      for (ArrayList<Double> k : data) {
+        d.add(k.get(dimension));
+      }
+      Collections.sort(d);
+      int pos = d.size() / 2;
+      return d.get(pos);
+    }
+
+    static ArrayList<ArrayList<Double>> maxmin(ArrayList<ArrayList<Double>> data, int dimensions) {
+      ArrayList<ArrayList<Double>> mm = new ArrayList<>();
+      ArrayList<Double> min_v = new ArrayList<>();
+      ArrayList<Double> max_v = new ArrayList<>();
+      // 初始化 第一行为min,第二行为max
+      for (int i = 0; i < dimensions; i++) {
+        double min_temp = Double.MAX_VALUE;
+        double max_temp = Double.MIN_VALUE;
+        for (int j = 1; j < data.size(); j++) {
+          ArrayList<Double> d = data.get(j);
+          if (d.get(i) < min_temp) {
+            min_temp = d.get(i);
+          } else if (d.get(i) > max_temp) {
+            max_temp = d.get(i);
+          }
+        }
+        min_v.add(min_temp);
+        max_v.add(max_temp);
+      }
+      mm.add(min_v);
+      mm.add(max_v);
+      return mm;
+    }
+
+    static double distance(ArrayList<Double> a, ArrayList<Double> b, double[] std) {
+      double sum = 0d;
+      for (int i = 0; i < a.size(); i++) {
+        if (a.get(i) != null && b.get(i) != null)
+          sum += Math.pow((a.get(i) - b.get(i)) / std[i], 2);
+      }
+      sum = sqrt(sum);
+      return sum;
+    }
+
+    static double mindistance(
+        ArrayList<Double> a, ArrayList<Double> max, ArrayList<Double> min, double[] std) {
+      double sum = 0d;
+      for (int i = 0; i < a.size(); i++) {
+        if (a.get(i) > max.get(i)) sum += Math.pow((a.get(i) - max.get(i)) / std[i], 2);
+        else if (a.get(i) < min.get(i)) {
+          sum += Math.pow((min.get(i) - a.get(i)) / std[i], 2);
+        }
+      }
+      sum = sqrt(sum);
+      return sum;
+    }
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/MasterRepairUtil.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/MasterRepairUtil.java
new file mode 100644
index 0000000000..f99a81a731
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/MasterRepairUtil.java
@@ -0,0 +1,260 @@
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.library.util.Util;
+import org.apache.iotdb.udf.api.access.Row;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class MasterRepairUtil {
+  private final ArrayList<ArrayList<Double>> td = new ArrayList<>();
+  private final ArrayList<ArrayList<Double>> td_cleaned = new ArrayList<>();
+  private final ArrayList<ArrayList<Double>> md = new ArrayList<>();
+  private final ArrayList<Long> td_time = new ArrayList<>();
+  private final int columnCnt;
+  private long omega;
+  private Double eta;
+  private int k;
+  private double[] std;
+  private KDTreeUtil kdTreeUtil;
+
+  public MasterRepairUtil(int columnCnt, long omega, double eta, int k) throws Exception {
+    this.columnCnt = columnCnt;
+    this.omega = omega;
+    this.eta = eta;
+    this.k = k;
+  }
+
+  public boolean isNullRow(Row row) {
+    boolean flag = true;
+    for (int i = 0; i < row.size(); i++) {
+      if (!row.isNull(i)) {
+        flag = false;
+        break;
+      }
+    }
+    return flag;
+  }
+
+  public void addRow(Row row) throws Exception {
+    ArrayList<Double> tt = new ArrayList<>(); // time-series tuple
+    boolean containsNotNull = false;
+    for (int i = 0; i < this.columnCnt; i++) {
+      if (!row.isNull(i)) {
+        containsNotNull = true;
+        BigDecimal bd = BigDecimal.valueOf(Util.getValueAsDouble(row, i));
+        tt.add(bd.doubleValue());
+      } else {
+        tt.add(null);
+      }
+    }
+    if (containsNotNull) {
+      td.add(tt);
+      td_time.add(row.getTime());
+    }
+
+    ArrayList<Double> mt = new ArrayList<>(); // master tuple
+    containsNotNull = false;
+    for (int i = this.columnCnt; i < row.size(); i++) {
+      if (!row.isNull(i)) {
+        containsNotNull = true;
+        BigDecimal bd = BigDecimal.valueOf(Util.getValueAsDouble(row, i));
+        mt.add(bd.doubleValue());
+      } else {
+        mt.add(null);
+      }
+    }
+    if (containsNotNull) {
+      md.add(mt);
+    }
+  }
+
+  public void buildKDTree() {
+    this.kdTreeUtil = KDTreeUtil.build(md, this.columnCnt);
+  }
+
+  public ArrayList<Double> getCleanResultColumn(int columnPos) {
+    ArrayList<Double> column = new ArrayList<>();
+    for (ArrayList<Double> tuple : this.td_cleaned) {
+      column.add(tuple.get(columnPos - 1));
+    }
+    return column;
+  }
+
+  public ArrayList<Long> getTime() {
+    return td_time;
+  }
+
+  public double get_tm_distance(ArrayList<Double> t_tuple, ArrayList<Double> m_tuple) {
+    double distance = 0d;
+    for (int pos = 0; pos < columnCnt; pos++) {
+      double temp = t_tuple.get(pos) - m_tuple.get(pos);
+      temp = temp / std[pos];
+      distance += temp * temp;
+    }
+    distance = Math.sqrt(distance);
+    return distance;
+  }
+
+  public ArrayList<Integer> cal_W(int i) {
+    ArrayList<Integer> W_i = new ArrayList<>();
+    for (int l = i - 1; l >= 0; l--) {
+      if (this.td_time.get(i) <= this.td_time.get(l) + omega) {
+        W_i.add(l);
+      }
+    }
+    return W_i;
+  }
+
+  public ArrayList<ArrayList<Double>> cal_C(int i, ArrayList<Integer> W_i) {
+    ArrayList<ArrayList<Double>> C_i = new ArrayList<>();
+    if (W_i.size() == 0) {
+      C_i.add(this.kdTreeUtil.query(this.td.get(i), std));
+    } else {
+      C_i.addAll(this.kdTreeUtil.queryKNN(this.td.get(i), k, std));
+      for (Integer integer : W_i) {
+        C_i.addAll(this.kdTreeUtil.queryKNN(this.td_cleaned.get(integer), k, std));
+      }
+    }
+    return C_i;
+  }
+
+  public void master_repair() {
+    for (int i = 0; i < this.td.size(); i++) {
+      ArrayList<Double> tuple = this.td.get(i);
+      ArrayList<Integer> W_i = cal_W(i);
+      ArrayList<ArrayList<Double>> C_i = this.cal_C(i, W_i);
+      double min_dis = Double.MAX_VALUE;
+      ArrayList<Double> repair_tuple = new ArrayList<>();
+      for (ArrayList<Double> c_i : C_i) {
+        boolean smooth = true;
+        for (Integer w_i : W_i) {
+          ArrayList<Double> w_is = td_cleaned.get(w_i);
+          if (get_tm_distance(c_i, w_is) > eta) {
+            smooth = false;
+            break;
+          }
+        }
+        if (smooth) {
+          double dis = get_tm_distance(c_i, tuple);
+          if (dis < min_dis) {
+            min_dis = dis;
+            repair_tuple = c_i;
+          }
+        }
+      }
+      this.td_cleaned.add(repair_tuple);
+    }
+  }
+
+  public void set_parameters() {
+    if (omega == -1) {
+      ArrayList<Long> intervals = getIntervals();
+      Collections.sort(intervals);
+      long interval = intervals.get(intervals.size() / 2);
+      omega = interval * 10;
+    }
+    if (Double.isNaN(eta)) {
+      ArrayList<Double> distance_list = new ArrayList<>();
+      for (int i = 1; i < this.td.size(); i++) {
+        for (int l = i - 1; l >= 0; l--) {
+          if (this.td_time.get(i) <= this.td_time.get(l) + omega) {
+            distance_list.add(get_tm_distance(this.td.get(i), this.td.get(l)));
+          } else break;
+        }
+      }
+      Collections.sort(distance_list);
+      eta = distance_list.get((int) (distance_list.size() * 0.9973));
+    }
+    if (k == -1) {
+      for (int temp_k = 2; temp_k <= 5; temp_k++) {
+        ArrayList<Double> distance_list = new ArrayList<>();
+        for (ArrayList<Double> tuple : this.td) {
+          ArrayList<ArrayList<Double>> neighbors = this.kdTreeUtil.queryKNN(tuple, temp_k, std);
+          for (ArrayList<Double> neighbor : neighbors) {
+            distance_list.add(get_tm_distance(tuple, neighbor));
+          }
+        }
+        Collections.sort(distance_list);
+        if (distance_list.get((int) (distance_list.size() * 0.9)) > eta) {
+          k = temp_k;
+          break;
+        }
+      }
+      if (k == -1) {
+        k = 5;
+      }
+    }
+  }
+
+  private double varianceImperative(double[] value) {
+    double average = 0.0;
+    int cnt = 0;
+    for (double p : value) {
+      if (!Double.isNaN(p)) {
+        cnt += 1;
+        average += p;
+      }
+    }
+    if (cnt == 0) {
+      return 0d;
+    }
+    average /= cnt;
+
+    double variance = 0.0;
+    for (double p : value) {
+      if (!Double.isNaN(p)) {
+        variance += (p - average) * (p - average);
+      }
+    }
+    return variance / cnt;
+  }
+
+  private double[] getColumn(int pos) {
+    double[] column = new double[this.td.size()];
+    for (int i = 0; i < this.td.size(); i++) {
+      column[i] = this.td.get(i).get(pos);
+    }
+    return column;
+  }
+
+  public void call_std() {
+    this.std = new double[this.columnCnt];
+    for (int i = 0; i < this.columnCnt; i++) {
+      std[i] = Math.sqrt(varianceImperative(getColumn(i)));
+    }
+  }
+
+  public void repair() {
+    fillNullValue();
+    buildKDTree();
+    call_std();
+    set_parameters();
+    System.out.println(this.omega);
+    System.out.println(this.eta);
+    System.out.println(this.k);
+    master_repair();
+  }
+
+  public ArrayList<Long> getIntervals() {
+    ArrayList<Long> intervals = new ArrayList<>();
+    for (int i = 1; i < this.td_time.size(); i++) {
+      intervals.add(this.td_time.get(i) - this.td_time.get(i - 1));
+    }
+    return intervals;
+  }
+
+  public void fillNullValue() {
+    for (int i = 0; i < columnCnt; i++) {
+      double temp = this.td.get(0).get(i);
+      for (ArrayList<Double> arrayList : this.td) {
+        if (arrayList.get(i) == null) {
+          arrayList.set(i, temp);
+        } else {
+          temp = arrayList.get(i);
+        }
+      }
+    }
+  }
+}