You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/08 12:39:11 UTC

[iotdb] branch master updated: [IOTDB-2304]Library-UDF Data Repairing Functions (#4833)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 87c8ab4ac8 [IOTDB-2304]Library-UDF Data Repairing Functions (#4833)
87c8ab4ac8 is described below

commit 87c8ab4ac830be9f3b28a0548035975f563eb5c7
Author: Pengyu Chen <48...@users.noreply.github.com>
AuthorDate: Fri Apr 8 20:39:07 2022 +0800

    [IOTDB-2304]Library-UDF Data Repairing Functions (#4833)
---
 docs/zh/UserGuide/UDF-Library/Data-Repairing.md    |   1 -
 .../iotdb/library/drepair/UDTFTimestampRepair.java | 102 +++++++
 .../iotdb/library/drepair/UDTFValueFill.java       | 104 +++++++
 .../iotdb/library/drepair/UDTFValueRepair.java     | 122 ++++++++
 .../apache/iotdb/library/drepair/util/ARFill.java  |  93 ++++++
 .../iotdb/library/drepair/util/LikelihoodFill.java | 128 +++++++++
 .../iotdb/library/drepair/util/LinearFill.java     |  57 ++++
 .../iotdb/library/drepair/util/LsGreedy.java       | 141 +++++++++
 .../apache/iotdb/library/drepair/util/MAFill.java  |  57 ++++
 .../iotdb/library/drepair/util/MeanFill.java       |  40 +++
 .../iotdb/library/drepair/util/PreviousFill.java   |  49 ++++
 .../apache/iotdb/library/drepair/util/Screen.java  | 138 +++++++++
 .../iotdb/library/drepair/util/ScreenFill.java     | 154 ++++++++++
 .../library/drepair/util/TimestampInterval.java    | 213 ++++++++++++++
 .../library/drepair/util/TimestampRepair.java      | 154 ++++++++++
 .../iotdb/library/drepair/util/ValueFill.java      |  81 ++++++
 .../iotdb/library/drepair/util/ValueRepair.java    | 130 +++++++++
 .../apache/iotdb/library/drepair/DRepairTests.java | 317 +++++++++++++++++++++
 18 files changed, 2080 insertions(+), 1 deletion(-)

diff --git a/docs/zh/UserGuide/UDF-Library/Data-Repairing.md b/docs/zh/UserGuide/UDF-Library/Data-Repairing.md
index 7a2c7637e5..e75d7c1782 100644
--- a/docs/zh/UserGuide/UDF-Library/Data-Repairing.md
+++ b/docs/zh/UserGuide/UDF-Library/Data-Repairing.md
@@ -135,7 +135,6 @@ select timestamprepair(s1) from root.test.d2
 **备注:** AR 模型采用 AR(1),时序列需满足自相关条件,否则将输出单个数据点 (0, 0.0).
 
 ### 使用示例
-
 #### 使用 linear 方法进行填补
 
 当`method`缺省或取值为 'linear' 时,本函数将使用线性插值方法进行填补。
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFTimestampRepair.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFTimestampRepair.java
new file mode 100644
index 0000000000..85bcbc3bb0
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFTimestampRepair.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.library.drepair;
+
+import org.apache.iotdb.db.query.udf.api.UDTF;
+import org.apache.iotdb.db.query.udf.api.access.RowWindow;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.library.drepair.util.TimestampRepair;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+/** This function is used for timestamp repair. */
+public class UDTFTimestampRepair implements UDTF {
+  String intervalMethod;
+  int interval;
+  int intervalMode;
+
+  @Override
+  public void validate(UDFParameterValidator validator) throws Exception {
+    validator
+        .validateInputSeriesNumber(1)
+        .validateInputSeriesDataType(
+            0, TSDataType.DOUBLE, TSDataType.FLOAT, TSDataType.INT32, TSDataType.INT64)
+        .validate(
+            x -> (Integer) x >= 0,
+            "Interval should be a positive integer.",
+            validator.getParameters().getIntOrDefault("interval", 0));
+  }
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    configurations
+        .setAccessStrategy(new SlidingSizeWindowAccessStrategy(Integer.MAX_VALUE))
+        .setOutputDataType(parameters.getDataType(0));
+    intervalMethod = parameters.getStringOrDefault("method", "Median");
+    interval = parameters.getIntOrDefault("interval", 0);
+    if (interval > 0) {
+      intervalMode = interval;
+    } else if ("Median".equalsIgnoreCase(intervalMethod)) {
+      intervalMode = -1;
+    } else if ("Mode".equalsIgnoreCase(intervalMethod)) {
+      intervalMode = -2;
+    } else if ("Cluster".equalsIgnoreCase(intervalMethod)) {
+      intervalMode = -3;
+    } else {
+      throw new Exception("Illegal method.");
+    }
+  }
+
+  @Override
+  public void transform(RowWindow rowWindow, PointCollector collector) throws Exception {
+    TimestampRepair ts = new TimestampRepair(rowWindow.getRowIterator(), intervalMode, 2);
+    ts.dpRepair();
+    long[] timestamp = ts.getRepaired();
+    double[] value = ts.getRepairedValue();
+    switch (rowWindow.getDataType(0)) {
+      case DOUBLE:
+        for (int i = 0; i < timestamp.length; i++) {
+          collector.putDouble(timestamp[i], value[i]);
+        }
+        break;
+      case FLOAT:
+        for (int i = 0; i < timestamp.length; i++) {
+          collector.putFloat(timestamp[i], (float) value[i]);
+        }
+        break;
+      case INT32:
+        for (int i = 0; i < timestamp.length; i++) {
+          collector.putInt(timestamp[i], (int) value[i]);
+        }
+        break;
+      case INT64:
+        for (int i = 0; i < timestamp.length; i++) {
+          collector.putLong(timestamp[i], (long) value[i]);
+        }
+        break;
+      default:
+        throw new Exception();
+    }
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFValueFill.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFValueFill.java
new file mode 100644
index 0000000000..9d148b573b
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFValueFill.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair;
+
+import org.apache.iotdb.db.query.udf.api.UDTF;
+import org.apache.iotdb.db.query.udf.api.access.RowWindow;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.library.drepair.util.ARFill;
+import org.apache.iotdb.library.drepair.util.LikelihoodFill;
+import org.apache.iotdb.library.drepair.util.LinearFill;
+import org.apache.iotdb.library.drepair.util.MeanFill;
+import org.apache.iotdb.library.drepair.util.PreviousFill;
+import org.apache.iotdb.library.drepair.util.ScreenFill;
+import org.apache.iotdb.library.drepair.util.ValueFill;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+/** This function is used to interpolate time series. */
+public class UDTFValueFill implements UDTF {
+  private String method;
+
+  @Override
+  public void validate(UDFParameterValidator validator) throws Exception {
+    validator
+        .validateInputSeriesNumber(1)
+        .validateInputSeriesDataType(
+            0, TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.INT32, TSDataType.INT64);
+  }
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    configurations
+        .setAccessStrategy(new SlidingSizeWindowAccessStrategy(Integer.MAX_VALUE))
+        .setOutputDataType(parameters.getDataType(0));
+    method = parameters.getStringOrDefault("method", "linear");
+  }
+
+  @Override
+  public void transform(RowWindow rowWindow, PointCollector collector) throws Exception {
+    ValueFill vf;
+    if ("previous".equalsIgnoreCase(method)) {
+      vf = new PreviousFill(rowWindow.getRowIterator());
+    } else if ("linear".equalsIgnoreCase(method)) {
+      vf = new LinearFill(rowWindow.getRowIterator());
+    } else if ("mean".equalsIgnoreCase(method)) {
+      vf = new MeanFill(rowWindow.getRowIterator());
+    } else if ("ar".equalsIgnoreCase(method)) {
+      vf = new ARFill(rowWindow.getRowIterator());
+    } else if ("screen".equalsIgnoreCase(method)) {
+      vf = new ScreenFill(rowWindow.getRowIterator());
+    } else if ("likelihood".equalsIgnoreCase(method)) {
+      vf = new LikelihoodFill(rowWindow.getRowIterator());
+    } else {
+      throw new Exception("Illegal method");
+    }
+    vf.fill();
+    double[] repaired = vf.getFilled();
+    long[] time = vf.getTime();
+    switch (rowWindow.getDataType(0)) {
+      case DOUBLE:
+        for (int i = 0; i < time.length; i++) {
+          collector.putDouble(time[i], repaired[i]);
+        }
+        break;
+      case FLOAT:
+        for (int i = 0; i < time.length; i++) {
+          collector.putFloat(time[i], (float) repaired[i]);
+        }
+        break;
+      case INT32:
+        for (int i = 0; i < time.length; i++) {
+          collector.putInt(time[i], (int) Math.round(repaired[i]));
+        }
+        break;
+      case INT64:
+        for (int i = 0; i < time.length; i++) {
+          collector.putLong(time[i], Math.round(repaired[i]));
+        }
+        break;
+      default:
+        throw new Exception();
+    }
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFValueRepair.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFValueRepair.java
new file mode 100644
index 0000000000..a7cd8ca774
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFValueRepair.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.library.drepair;
+
+import org.apache.iotdb.db.query.udf.api.UDTF;
+import org.apache.iotdb.db.query.udf.api.access.RowWindow;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.library.drepair.util.LsGreedy;
+import org.apache.iotdb.library.drepair.util.Screen;
+import org.apache.iotdb.library.drepair.util.ValueRepair;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+/** This function is used to repair the value of the time series. */
+public class UDTFValueRepair implements UDTF {
+  String method;
+  double minSpeed;
+  double maxSpeed;
+  double center;
+  double sigma;
+
+  @Override
+  public void validate(UDFParameterValidator validator) throws Exception {
+    validator
+        .validateInputSeriesNumber(1)
+        .validateInputSeriesDataType(
+            0, TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.INT32, TSDataType.INT64)
+        .validate(
+            x -> (double) x > 0,
+            "Parameter $sigma$ should be larger than 0.",
+            validator.getParameters().getDoubleOrDefault("sigma", 1.0))
+        .validate(
+            params -> (double) params[0] < (double) params[1],
+            "parameter $minSpeed$ should be smaller than $maxSpeed$.",
+            validator.getParameters().getDoubleOrDefault("minSpeed", -1),
+            validator.getParameters().getDoubleOrDefault("maxSpeed", 1));
+  }
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    configurations
+        .setAccessStrategy(new SlidingSizeWindowAccessStrategy(Integer.MAX_VALUE))
+        .setOutputDataType(parameters.getDataType(0));
+    method = parameters.getStringOrDefault("method", "screen");
+    minSpeed = parameters.getDoubleOrDefault("minSpeed", Double.NaN);
+    maxSpeed = parameters.getDoubleOrDefault("maxSpeed", Double.NaN);
+    center = parameters.getDoubleOrDefault("center", 0);
+    sigma = parameters.getDoubleOrDefault("sigma", Double.NaN);
+  }
+
+  @Override
+  public void transform(RowWindow rowWindow, PointCollector collector) throws Exception {
+    ValueRepair vr;
+    if ("screen".equalsIgnoreCase(method)) {
+      Screen screen = new Screen(rowWindow.getRowIterator());
+      if (!Double.isNaN(minSpeed)) {
+        screen.setSmin(minSpeed);
+      }
+      if (!Double.isNaN(maxSpeed)) {
+        screen.setSmax(maxSpeed);
+      }
+      vr = screen;
+    } else if ("lsgreedy".equalsIgnoreCase(method)) {
+      LsGreedy lsGreedy = new LsGreedy(rowWindow.getRowIterator());
+      if (!Double.isNaN(sigma)) {
+        lsGreedy.setSigma(sigma);
+      }
+      lsGreedy.setCenter(center);
+      vr = lsGreedy;
+    } else {
+      throw new Exception("Illegal method.");
+    }
+    vr.repair();
+    double[] repaired = vr.getRepaired();
+    long[] time = vr.getTime();
+    switch (rowWindow.getDataType(0)) {
+      case DOUBLE:
+        for (int i = 0; i < time.length; i++) {
+          collector.putDouble(time[i], repaired[i]);
+        }
+        break;
+      case FLOAT:
+        for (int i = 0; i < time.length; i++) {
+          collector.putFloat(time[i], (float) repaired[i]);
+        }
+        break;
+      case INT32:
+        for (int i = 0; i < time.length; i++) {
+          collector.putInt(time[i], (int) Math.round(repaired[i]));
+        }
+        break;
+      case INT64:
+        for (int i = 0; i < time.length; i++) {
+          collector.putLong(time[i], Math.round(repaired[i]));
+        }
+        break;
+      default:
+        throw new Exception();
+    }
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/ARFill.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/ARFill.java
new file mode 100644
index 0000000000..90e5b7c783
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/ARFill.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.db.query.udf.api.access.RowIterator;
+
+public class ARFill extends ValueFill {
+  // TODO Higer order AR regression
+  private int order = 1;
+  private double theta = 1e10;
+
+  public ARFill(RowIterator dataIterator) throws Exception {
+    super(dataIterator);
+    calMeanAndVar();
+  }
+
+  public void setOrder(int order) {
+    this.order = order;
+  }
+
+  @Override
+  public void fill() {
+    // compute \sum x_t * x_{t-1}
+    double acf = 0;
+    double factor = 0;
+    int acf_cnt = 0;
+    for (int i = 0; i < original.length - 1; i++) {
+      double left = original[i], right = original[i + 1];
+      if (Double.isNaN(left)) {
+        left = 0;
+      }
+      if (Double.isNaN(right)) {
+        right = 0;
+      }
+      acf += left * right;
+      factor += left * left;
+      acf_cnt += 1;
+    }
+    //        acf /= acf_cnt;
+    this.theta = acf / factor;
+    try {
+      assert this.theta < 1;
+    } catch (AssertionError e) {
+      System.out.println("Cannot fit AR(1) model. Please try another method.");
+      this.time = new long[] {0};
+      this.repaired = new double[] {0D};
+      return;
+    }
+    double mean_epsilon = 0;
+    double var_epsilon = 0;
+    double cnt_epsilon = 0;
+    for (int i = 0; i < original.length - 1; i++) {
+      double left = original[i], right = original[i + 1];
+      if (Double.isNaN(left) || Double.isNaN(right)) {
+        continue;
+      }
+      cnt_epsilon += 1;
+      double epsilon = right - left * this.theta;
+      mean_epsilon += epsilon;
+      var_epsilon += epsilon * epsilon;
+    }
+    mean_epsilon /= cnt_epsilon;
+    var_epsilon /= cnt_epsilon;
+    for (int i = 0; i < original.length; i++) {
+      double yt = original[i];
+      if (!Double.isNaN(yt)) {
+        repaired[i] = yt;
+      } else {
+        if (i != 0) {
+          repaired[i] = this.theta * repaired[i - 1] + mean_epsilon;
+        } else {
+          repaired[i] = this.mean;
+        }
+      }
+    }
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/LikelihoodFill.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/LikelihoodFill.java
new file mode 100644
index 0000000000..f293a16734
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/LikelihoodFill.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.db.query.udf.api.access.RowIterator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class LikelihoodFill extends ValueFill {
+
+  private double stopErrorRatio = 0.0001;
+  private int stopIteration = Integer.MAX_VALUE;
+
+  public LikelihoodFill(RowIterator dataIterator) throws Exception {
+    super(dataIterator);
+  }
+
+  @Override
+  public void fill() {
+    // find missing points
+    List<Integer> repairIndexList = new ArrayList<>();
+    List<Double> repairValueList = new ArrayList<>();
+    // initialize linear interpolation
+    int previousNotNaN = -1;
+    for (int i = 0; i < n; i++) {
+      if (Double.isNaN(original[i])) {
+        repairIndexList.add(i);
+        repaired[i] = original[i];
+      } else {
+        if (previousNotNaN >= 0 && previousNotNaN + 1 != i) {
+          double delta =
+              (original[i] - original[previousNotNaN]) / (time[i] - time[previousNotNaN]);
+          for (int j = previousNotNaN + 1; j < i; j++) {
+            repaired[j] = original[previousNotNaN] + delta * (time[j] - time[previousNotNaN]);
+            repairValueList.add(repaired[j]);
+          }
+        }
+        repaired[i] = original[i];
+        previousNotNaN = i;
+      }
+    }
+    // update filled values iteratively
+    double errorRatio = 1;
+    int iteration = 0;
+    while (errorRatio > stopErrorRatio && iteration < stopIteration) {
+      errorRatio = 0.0;
+      // find best repair from last iteration
+      for (int i = 0; i < repairIndexList.size(); i++) {
+        int currentIndex = repairIndexList.get(i);
+        // no repair for data at beginning and end
+        if (currentIndex == 0
+            || Double.isNaN(repaired[currentIndex - 1])
+            || currentIndex == n - 1
+            || Double.isNaN(repaired[currentIndex + 1])) {
+          continue;
+        }
+        double intervalPrev1 = time[currentIndex] - time[currentIndex - 1];
+        double intervalPost1 = time[currentIndex + 1] - time[currentIndex];
+        double squareAPrev = 0.0, squareBPrev = 0.0;
+        if (currentIndex >= 2 && !Double.isNaN(repaired[currentIndex - 2])) {
+          double intervalPrev2 = time[currentIndex - 2] - time[currentIndex - 1];
+          squareAPrev = 1.0 / (intervalPrev1 * intervalPrev1);
+          squareBPrev =
+              2.0 * repaired[currentIndex - 2] / (intervalPrev2 * intervalPrev1)
+                  - 2.0
+                      * (intervalPrev2 + intervalPrev1)
+                      * repaired[currentIndex - 1]
+                      / (intervalPrev2 * intervalPrev1 * intervalPrev1);
+        }
+        double squareACurr =
+            (intervalPrev1 + intervalPost1)
+                * (intervalPrev1 + intervalPost1)
+                / (intervalPrev1 * intervalPrev1 * intervalPost1 * intervalPost1);
+        double squareBCurr =
+            -2.0
+                    * (intervalPrev1 + intervalPost1)
+                    * repaired[currentIndex - 1]
+                    / (intervalPrev1 * intervalPrev1 * intervalPost1)
+                - 2.0
+                    * (intervalPrev1 + intervalPost1)
+                    * repaired[currentIndex + 1]
+                    / (intervalPrev1 * intervalPost1 * intervalPost1);
+        double squareAPost = 0.0, squareBPost = 0.0;
+        if (currentIndex <= n - 3 && !Double.isNaN(repaired[currentIndex + 2])) {
+          double intervalPost2 = time[currentIndex + 2] - time[currentIndex + 1];
+          squareAPost = 1.0 / (intervalPost1 * intervalPost1);
+          squareBPost =
+              2.0 * repaired[currentIndex + 2] / (intervalPost1 * intervalPost2)
+                  - 2.0
+                      * (intervalPost1 + intervalPost2)
+                      * repaired[currentIndex + 1]
+                      / (intervalPost1 * intervalPost1 * intervalPost2);
+        }
+        // minimize likelihood
+        repairValueList.set(
+            i,
+            -(squareBPrev + squareBCurr + squareBPost)
+                / (2.0 * (squareAPrev + squareACurr + squareAPost)));
+      }
+      for (int i = 0; i < repairIndexList.size(); i++) {
+        int currentIndex = repairIndexList.get(i);
+        double previousRepair = repaired[currentIndex];
+        double updatedRepair = repairValueList.get(i);
+        double updatedRatio = Math.abs(updatedRepair - previousRepair) / previousRepair;
+        errorRatio = Math.max(updatedRatio, errorRatio);
+        repaired[currentIndex] = updatedRepair;
+      }
+      iteration++;
+    }
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/LinearFill.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/LinearFill.java
new file mode 100644
index 0000000000..8c8a41b29b
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/LinearFill.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.db.query.udf.api.access.RowIterator;
+
+public class LinearFill extends ValueFill {
+
+  private int prevNotNaN = -1;
+
+  public LinearFill(RowIterator dataIterator) throws Exception {
+    super(dataIterator);
+  }
+
+  @Override
+  public void fill() {
+    for (int i = 0; i < original.length; i++) {
+      if (!Double.isNaN(original[i])) {
+        double k = 0;
+        if (prevNotNaN > 0) {
+          k = original[i] - original[prevNotNaN];
+          k /= i - prevNotNaN;
+        }
+        int t = prevNotNaN + 1;
+        while (t < i) {
+          repaired[t] = original[i] + k * (t - i);
+          t++;
+        }
+        repaired[i] = original[i];
+        prevNotNaN = i;
+      }
+    }
+    if (prevNotNaN < original.length - 1 && prevNotNaN >= 0) {
+      int t = prevNotNaN;
+      while (t <= original.length - 1) {
+        repaired[t] = original[prevNotNaN];
+        t++;
+      }
+    }
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/LsGreedy.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/LsGreedy.java
new file mode 100644
index 0000000000..276841c76f
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/LsGreedy.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.db.query.udf.api.access.RowIterator;
+import org.apache.iotdb.library.util.Util;
+
+import java.util.PriorityQueue;
+
+public class LsGreedy extends ValueRepair {
+
+  private double center = 0, sigma;
+  private final double eps = 1e-12;
+
+  public LsGreedy(RowIterator dataIterator) throws Exception {
+    super(dataIterator);
+    setParameters();
+  }
+
+  public LsGreedy(String filename) throws Exception {
+    super(filename);
+    setParameters();
+  }
+
+  private void setParameters() {
+    double[] speed = Util.speed(original, time);
+    double[] speedchange = Util.variation(speed);
+    sigma = Util.mad(speedchange);
+  }
+
+  @Override
+  public void repair() {
+    repaired = original.clone();
+    RepairNode[] table = new RepairNode[n];
+    PriorityQueue<RepairNode> heap = new PriorityQueue<>();
+    for (int i = 1; i < n - 1; i++) {
+      RepairNode node = new RepairNode(i);
+      table[i] = node;
+      if (Math.abs(node.getU() - center) > 3 * sigma) {
+        heap.add(node);
+      }
+    }
+    while (true) {
+      RepairNode top = heap.peek();
+      if (top == null || Math.abs(top.getU() - center) < Math.max(eps, 3 * sigma)) {
+        break;
+      } // stop greedy algorithm when the heap is empty or all speed changes locate in center±3sigma
+      top.modify();
+      for (int i = Math.max(1, top.getIndex() - 1); i <= Math.min(n - 2, top.getIndex() + 1); i++) {
+        heap.remove(table[i]);
+        RepairNode temp = new RepairNode(i);
+        table[i] = temp;
+        if (Math.abs(temp.getU() - center) > 3 * sigma) {
+          heap.add(temp);
+        }
+      }
+    }
+  }
+
+  class RepairNode implements Comparable<RepairNode> {
+
+    private final int index;
+    private final double u; // speed variation
+
+    public RepairNode(int index) {
+      this.index = index;
+      double v1 = repaired[index + 1] - repaired[index];
+      v1 = v1 / (time[index + 1] - time[index]);
+      double v2 = repaired[index] - repaired[index - 1];
+      v2 = v2 / (time[index] - time[index - 1]);
+      this.u = v1 - v2;
+    }
+
+    /**
+     * modify values of repaired points, to make the difference of its speed variation and center is
+     * 1 sigma
+     */
+    public void modify() {
+      double temp;
+      if (sigma < eps) {
+        temp = Math.abs(u - center);
+      } else {
+        temp = Math.max(sigma, Math.abs(u - center) / 3);
+      }
+      temp *=
+          (double) (time[index + 1] - time[index])
+              * (time[index] - time[index - 1])
+              / (time[index + 1] - time[index - 1]);
+      if (this.u > center) {
+        repaired[index] += temp;
+      } else {
+        repaired[index] -= temp;
+      }
+    }
+
+    @Override
+    public int compareTo(RepairNode o) {
+      double u1 = Math.abs(this.u - center);
+      double u2 = Math.abs(o.u - center);
+      if (u1 > u2) {
+        return -1;
+      } else if (u1 == u2) {
+        return 0;
+      } else {
+        return 1;
+      }
+    }
+
+    public int getIndex() {
+      return index;
+    }
+
+    public double getU() {
+      return u;
+    }
+  }
+
+  public void setCenter(double center) {
+    this.center = center;
+  }
+
+  public void setSigma(double sigma) {
+    this.sigma = sigma;
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/MAFill.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/MAFill.java
new file mode 100644
index 0000000000..c1e90d5382
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/MAFill.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.db.query.udf.api.access.RowIterator;
+
+public class MAFill extends ValueFill {
+  int window_size = 5;
+  double window_sum = 0;
+  int window_cnt = 0;
+  int l = 0;
+  int r = window_size - 1;
+
+  public MAFill(RowIterator dataIterator) throws Exception {
+    super(dataIterator);
+  }
+
+  @Override
+  public void fill() {
+    for (int i = l; i < r && i < original.length; i++) {
+      if (!Double.isNaN(original[i])) {
+        window_sum += original[i];
+        window_cnt += 1;
+      }
+    }
+    for (int i = 0; i < original.length; i++) {
+      if (!Double.isNaN(original[i])) {
+        repaired[i] = original[i];
+      } else {
+        repaired[i] = window_sum / window_cnt;
+      }
+      if (i <= (window_size - 1) / 2 || i >= original.length - (window_size - 1) / 2 - 1) continue;
+      if (!Double.isNaN(original[r])) {
+        window_sum += original[r];
+        window_cnt += 1;
+      }
+      l += 1;
+      r += 1;
+    }
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/MeanFill.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/MeanFill.java
new file mode 100644
index 0000000000..f7d8d6377b
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/MeanFill.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.db.query.udf.api.access.RowIterator;
+
+public class MeanFill extends ValueFill {
+  public MeanFill(RowIterator dataIterator) throws Exception {
+    super(dataIterator);
+    calMeanAndVar();
+  }
+
+  @Override
+  public void fill() {
+    for (int i = 0; i < original.length; i++) {
+      double yt = original[i];
+      if (!Double.isNaN(yt)) {
+        repaired[i] = yt;
+      } else {
+        repaired[i] = mean;
+      }
+    }
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/PreviousFill.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/PreviousFill.java
new file mode 100644
index 0000000000..a424d50b99
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/PreviousFill.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.db.query.udf.api.access.RowIterator;
+
+public class PreviousFill extends ValueFill {
+
+  private long previousTime = -1;
+  private double previousValue;
+
+  public PreviousFill(RowIterator dataIterator) throws Exception {
+    super(dataIterator);
+  }
+
+  @Override
+  public void fill() {
+    // NaN at the beginning is not filled
+    for (int i = 0; i < original.length; i++) {
+      if (Double.isNaN(original[i])) {
+        if (previousTime == -1) { // 序列初始为空,直接pass
+          repaired[i] = original[i];
+        } else {
+          repaired[i] = previousValue;
+        }
+      } else {
+        previousTime = time[i];
+        previousValue = original[i];
+        repaired[i] = original[i];
+      }
+    }
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/Screen.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/Screen.java
new file mode 100644
index 0000000000..9bc3c93c75
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/Screen.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.db.query.udf.api.access.RowIterator;
+import org.apache.iotdb.library.util.Util;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.math3.stat.descriptive.rank.Median;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+public class Screen extends ValueRepair {
+
+  private double smin, smax;
+  private double w;
+
+  public Screen(RowIterator dataIterator) throws Exception {
+    super(dataIterator);
+    setParameters();
+  }
+
+  public Screen(String filename) throws Exception {
+    super(filename);
+    setParameters();
+  }
+
+  private void setParameters() {
+    // set the default speed threshold
+    double[] speed = Util.speed(original, time);
+    Median median = new Median();
+    double mid = median.evaluate(speed);
+    double sigma = Util.mad(speed);
+    smax = mid + 3 * sigma;
+    smin = mid - 3 * sigma;
+    // set the default window size
+    double[] interval = Util.variation(time);
+    w = 5 * median.evaluate(interval);
+  }
+
+  @Override
+  public void repair() {
+    // fixed window
+    ArrayList<Pair<Long, Double>> ans = new ArrayList<>();
+    ans.add(Pair.of(time[0], original[0]));
+    int startIndex = 0;
+    for (int i = 1; i < n; i++) {
+      ans.add(Pair.of(time[i], original[i]));
+      while (ans.get(startIndex).getLeft() + w < ans.get(i).getLeft()) {
+        // sliding window
+        local(ans, startIndex);
+        startIndex++;
+      }
+    }
+    while (startIndex < n) {
+      local(ans, startIndex);
+      startIndex++;
+    }
+    int k = 0;
+    for (Pair<Long, Double> p : ans) {
+      this.repaired[k] = p.getRight();
+      k++;
+    }
+  }
+
+  private double getMedian(ArrayList<Pair<Long, Double>> list, int index) {
+    int m = 0;
+    while (index + m + 1 < list.size()
+        && list.get(index + m + 1).getLeft() <= list.get(index).getLeft() + w) {
+      m++;
+    }
+    double[] x = new double[2 * m + 1];
+    x[0] = list.get(index).getRight();
+    for (int i = 1; i <= m; i++) {
+      x[i] =
+          list.get(index + i).getRight()
+              + smin * (list.get(index).getLeft() - list.get(index + i).getLeft());
+      x[i + m] =
+          list.get(index + i).getRight()
+              + smax * (list.get(index).getLeft() - list.get(index + i).getLeft());
+    }
+    Arrays.sort(x);
+    return x[m];
+  }
+
+  private double getRepairedValue(ArrayList<Pair<Long, Double>> list, int index, double mid) {
+    double xmin =
+        list.get(index - 1).getRight()
+            + smin * (list.get(index).getLeft() - list.get(index - 1).getLeft());
+    double xmax =
+        list.get(index - 1).getRight()
+            + smax * (list.get(index).getLeft() - list.get(index - 1).getLeft());
+    double temp = mid;
+    temp = Math.min(xmax, temp);
+    temp = Math.max(xmin, temp);
+    return temp;
+  }
+
+  private void local(ArrayList<Pair<Long, Double>> list, int index) {
+    double mid = getMedian(list, index);
+    // 计算x_k'
+    if (index == 0) {
+      list.set(index, Pair.of(list.get(index).getLeft(), mid));
+    } else {
+      double temp = getRepairedValue(list, index, mid);
+      list.set(index, Pair.of(list.get(index).getLeft(), temp));
+    }
+  }
+
+  public void setSmin(double smin) {
+    this.smin = smin;
+  }
+
+  public void setSmax(double smax) {
+    this.smax = smax;
+  }
+
+  public void setW(int w) {
+    this.w = w;
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/ScreenFill.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/ScreenFill.java
new file mode 100644
index 0000000000..6badf0446f
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/ScreenFill.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.db.query.udf.api.access.RowIterator;
+import org.apache.iotdb.library.util.Util;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.math3.stat.descriptive.rank.Median;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+// Reference to Screen.java
+public class ScreenFill extends ValueFill {
+
+  private double smin, smax;
+  private double w;
+
+  public ScreenFill(RowIterator dataIterator) throws Exception {
+    super(dataIterator);
+    setParameters();
+  }
+
+  @Override
+  public long[] getTime() {
+    return super.getTime();
+  }
+
+  @Override
+  public double[] getFilled() {
+    return super.getFilled();
+  }
+
+  @Override
+  public void calMeanAndVar() {
+    super.calMeanAndVar();
+  }
+
+  @Override
+  public void fill() {
+    ArrayList<Pair<Long, Double>> ans = new ArrayList<>();
+    int currentIndex = 0;
+    while (currentIndex < n) {
+      ans.add(Pair.of(time[currentIndex], original[currentIndex]));
+      if (Double.isNaN(original[currentIndex])) {
+        int startIndex = currentIndex;
+        long fillTime = time[currentIndex];
+        int nextIndex = -1;
+        currentIndex++;
+        while (currentIndex < n && fillTime + w >= time[currentIndex]) {
+          ans.add(Pair.of(time[currentIndex], original[currentIndex]));
+          if (Double.isNaN(original[currentIndex]) && nextIndex == -1) {
+            nextIndex = currentIndex;
+          }
+          currentIndex++;
+        }
+        local(ans, startIndex);
+        if (nextIndex > 0) {
+          while (currentIndex > nextIndex) {
+            ans.remove(currentIndex - 1);
+            currentIndex--;
+          }
+        }
+      } else {
+        currentIndex++;
+      }
+    }
+    int k = 0;
+    for (Pair<Long, Double> p : ans) {
+      this.repaired[k] = p.getRight();
+      k++;
+    }
+  }
+
+  private double getMedian(ArrayList<Pair<Long, Double>> list, int index) {
+    int m = 0;
+    while (index + m + 1 < list.size()
+        && list.get(index + m + 1).getLeft() <= list.get(index).getLeft() + w) {
+      m++;
+    }
+    int count = 0;
+    for (int i = 1; i <= m; i++) {
+      if (!Double.isNaN(list.get(index + i).getRight())) {
+        count++;
+      }
+    }
+    double x[] = new double[2 * count];
+    int temp_count = 0;
+    for (int i = 1; i <= m; i++) {
+      if (!Double.isNaN(list.get(index + i).getRight())) {
+        x[temp_count] =
+            list.get(index + i).getRight()
+                + smin * (list.get(index).getLeft() - list.get(index + i).getLeft());
+        x[temp_count + count] =
+            list.get(index + i).getRight()
+                + smax * (list.get(index).getLeft() - list.get(index + i).getLeft());
+        temp_count++;
+      }
+    }
+    Arrays.sort(x);
+    return x[count];
+  }
+
+  private double getRepairedValue(ArrayList<Pair<Long, Double>> list, int index, double mid) {
+    double xmin =
+        list.get(index - 1).getRight()
+            + smin * (list.get(index).getLeft() - list.get(index - 1).getLeft());
+    double xmax =
+        list.get(index - 1).getRight()
+            + smax * (list.get(index).getLeft() - list.get(index - 1).getLeft());
+    double temp = mid;
+    temp = Math.min(xmax, temp);
+    temp = Math.max(xmin, temp);
+    return temp;
+  }
+
+  private void local(ArrayList<Pair<Long, Double>> list, int index) {
+    double mid = getMedian(list, index);
+    if (index == 0) {
+      list.set(index, Pair.of(list.get(index).getLeft(), mid));
+    } else {
+      double temp = getRepairedValue(list, index, mid);
+      list.set(index, Pair.of(list.get(index).getLeft(), temp));
+    }
+  }
+
+  private void setParameters() {
+    double[] speed = Util.speed(original, time);
+    Median median = new Median();
+    double mid = median.evaluate(speed);
+    double sigma = Util.mad(speed);
+    smax = mid + 3 * sigma;
+    smin = mid - 3 * sigma;
+    double[] interval = Util.variation(time);
+    w = 5 * median.evaluate(interval);
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/TimestampInterval.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/TimestampInterval.java
new file mode 100644
index 0000000000..ed1414b039
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/TimestampInterval.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair.util;
+
+import java.util.*;
+
+public class TimestampInterval {
+
+  protected int n;
+  protected long[] time;
+  protected double[] original;
+  protected long[] repaired;
+  protected long deltaT;
+  protected long start0;
+
+  public TimestampInterval(long[] time, double[] original) throws Exception {
+    // keep the time series
+    this.time = time;
+    this.original = original;
+    n = time.length;
+    repaired = new long[n];
+  }
+
+  // get standard interval
+  // -1 median -2 mode -3 cluster
+  public long getInterval(int mode) {
+    switch (mode) {
+      case -1:
+        this.deltaT = getIntervalByMedian();
+        break;
+      case -2:
+        this.deltaT = getIntervalByMode();
+        break;
+      case -3:
+        this.deltaT = getIntervalByCluster();
+        break;
+      default:
+        this.deltaT = mode;
+    }
+    return this.deltaT;
+  }
+
+  // median
+  private long getIntervalByMedian() {
+    ArrayList<Long> arrInterval = new ArrayList<>();
+    for (int i = 0; i < n - 2; i++) {
+      arrInterval.add(time[i + 1] - time[i]);
+    }
+    arrInterval.sort(Comparator.naturalOrder());
+    int m = n - 1;
+    if (m % 2 == 0) {
+      return (arrInterval.get(m / 2 - 1) + arrInterval.get(m / 2)) / 2;
+    }
+    return arrInterval.get(m / 2);
+  }
+
+  // mode
+  private long getIntervalByMode() {
+    repaired = time.clone();
+    // get a timestamp interval that appears most times
+    HashMap<Object, Integer> map = new LinkedHashMap<>();
+    int maxTimes = 0;
+    long maxTimesKey = 0;
+    for (int i = 0; i < n - 1; i++) {
+      map.put(time[i + 1] - time[i], map.getOrDefault(time[i + 1] - time[i], 0) + 1);
+    }
+    for (Map.Entry<Object, Integer> entry : map.entrySet()) {
+      Object key = entry.getKey();
+      Integer value = entry.getValue();
+      if (value > maxTimes) {
+        maxTimes = value;
+        maxTimesKey = (long) key;
+      }
+    }
+    return maxTimesKey;
+  }
+
+  // cluster
+  private long getIntervalByCluster() {
+    // get array of timestamp intervals
+    HashMap<Object, Integer> map = new LinkedHashMap<>();
+    long maxInterval = 0;
+    long minInterval = 9999999;
+    long[] intervals = new long[n];
+    for (int i = 0; i < n - 1; i++) {
+      intervals[i] = time[i + 1] - time[i];
+      if (intervals[i] > maxInterval) {
+        maxInterval = intervals[i];
+      }
+      if (intervals[i] < minInterval) {
+        minInterval = intervals[i];
+      }
+    }
+    int k = 3;
+    long[] means = new long[k];
+    for (int i = 0; i < k; i++) {
+      means[i] = minInterval + (i + 1) * (maxInterval - minInterval) / (k + 1);
+    }
+    long[][] distance = new long[n - 1][k];
+    int[] results = new int[n - 1];
+    for (int i = 0; i < n - 1; i++) {
+      results[i] = -1;
+    }
+    boolean changed = true;
+    int[] cnts = new int[k];
+    int maxClusterId = 0;
+    while (changed) {
+      changed = false;
+      for (int i = 0; i < n - 1; i++) {
+        long minDis = 99999999;
+        int minDisId = 0;
+        for (int j = 0; j < k; j++) {
+          distance[i][j] = Math.abs(intervals[i] - means[j]);
+          if (distance[i][j] < minDis) {
+            minDis = distance[i][j];
+            minDisId = j;
+          }
+        }
+        if (minDisId != results[i]) {
+          changed = true;
+          results[i] = minDisId;
+        }
+      }
+      int maxCluterCnt = 0;
+      for (int i = 0; i < k; i++) {
+        long sum = 0;
+        cnts[i] = 0;
+        for (int j = 0; j < n - 1; j++) {
+          if (results[j] == i) {
+            sum += intervals[j];
+            cnts[i] += 1;
+          }
+        }
+        if (cnts[i] != 0) {
+          means[i] = sum / cnts[i];
+          if (cnts[i] > maxCluterCnt) {
+            maxClusterId = i;
+            maxCluterCnt = cnts[i];
+          }
+        }
+      }
+    }
+    return means[maxClusterId];
+  }
+
+  // get standard starting point
+  public long getStart0(int mode) {
+    switch (mode) {
+      case 1:
+        this.start0 = getStart0ByLinear();
+        break;
+      case 2:
+        this.start0 = getStart0ByMode();
+        break;
+    }
+    return this.start0;
+  }
+
+  private long getStart0ByLinear() {
+    long sum_ = 0;
+    for (int i = 0; i < n; i++) {
+      sum_ += time[i];
+      sum_ -= this.deltaT * i;
+    }
+    return sum_ / n;
+  }
+
+  private long getStart0ByMode() {
+    long[] modn = new long[n];
+    // get mode that appears most times
+    HashMap<Object, Integer> mapn = new LinkedHashMap<>();
+    for (int i = 0; i < n; i++) {
+      modn[i] = time[i] % this.deltaT;
+      mapn.put(modn[i], mapn.getOrDefault(modn[i], 0) + 1);
+    }
+    int maxTimesn = 0;
+    long maxTimesMode = 0;
+    for (Map.Entry<Object, Integer> entry : mapn.entrySet()) {
+      Object key = entry.getKey();
+      Integer value = entry.getValue();
+      if (value > maxTimesn) {
+        maxTimesn = value;
+        maxTimesMode = (long) key;
+      }
+    }
+    long st = 0;
+    for (int i = 0; i < n; i++) {
+      if (modn[i] == maxTimesMode) {
+        st = time[i];
+        while (st > time[0]) {
+          st -= deltaT;
+        }
+      }
+    }
+    return st;
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/TimestampRepair.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/TimestampRepair.java
new file mode 100644
index 0000000000..54e7769401
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/TimestampRepair.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.access.RowIterator;
+import org.apache.iotdb.library.util.Util;
+
+import java.util.*;
+
+public class TimestampRepair {
+
+  protected int n;
+  protected long[] time;
+  protected double[] original;
+  protected long[] repaired;
+  protected double[] repairedValue;
+  protected long deltaT;
+  protected long start0;
+
+  public TimestampRepair(RowIterator dataIterator, int intervalMode, int startPointMode)
+      throws Exception {
+    ArrayList<Long> timeList = new ArrayList<>();
+    ArrayList<Double> originList = new ArrayList<>();
+    while (dataIterator.hasNextRow()) {
+      Row row = dataIterator.next();
+      double v = Util.getValueAsDouble(row);
+      timeList.add(row.getTime());
+      if (!Double.isFinite(v)) {
+        originList.add(Double.NaN);
+      } else {
+        originList.add(v);
+      }
+    }
+    time = Util.toLongArray(timeList);
+    original = Util.toDoubleArray(originList);
+    n = time.length;
+    TimestampInterval trParam = new TimestampInterval(time, original);
+    this.deltaT = trParam.getInterval(intervalMode);
+    this.start0 = trParam.getStart0(startPointMode);
+  }
+
+  private void noRepair() {
+    for (int i = 0; i < time.length; i++) {
+      repaired[i] = time[i];
+      repairedValue[i] = original[i];
+    }
+  }
+
+  public void dpRepair() {
+    if (time.length <= 2) {
+      noRepair();
+      return;
+    }
+    int n_ = (int) Math.ceil((time[n - 1] - start0) / deltaT + 1);
+    repaired = new long[n_];
+    repairedValue = new double[n_];
+    int m_ = this.n;
+    long[][] f = new long[n_ + 1][m_ + 1];
+    int[][] steps = new int[n_ + 1][m_ + 1];
+    // dynamic programming
+    int addCostRatio = 100000;
+    for (int i = 0; i < n_ + 1; i++) {
+      f[i][0] = (long) addCostRatio * i;
+      steps[i][0] = 1;
+    }
+    for (int i = 0; i < m_ + 1; i++) {
+      f[0][i] = (long) addCostRatio * i;
+      steps[0][i] = 2;
+    }
+
+    for (int i = 1; i < n_ + 1; i++) {
+      for (int j = 1; j < m_ + 1; j++) {
+
+        if (time[j - 1] == start0 + (i - 1) * deltaT) {
+          // if timestamps are equal, then temporary minimum operation time equals to matched
+          // operations before these points
+          f[i][j] = f[i - 1][j - 1];
+          steps[i][j] = 0;
+        } else {
+          // addition or deletion
+          if (f[i - 1][j] < f[i][j - 1]) {
+            f[i][j] = f[i - 1][j] + addCostRatio * 1;
+            steps[i][j] = 1;
+          } else {
+            f[i][j] = f[i][j - 1] + addCostRatio * 1;
+            steps[i][j] = 2;
+          }
+          // replacement
+          long modifyResult = f[i - 1][j - 1] + Math.abs(time[j - 1] - start0 - (i - 1) * deltaT);
+          if (modifyResult < f[i][j]) {
+            f[i][j] = modifyResult;
+            steps[i][j] = 0;
+          }
+        }
+      }
+    }
+
+    int i = n_;
+    int j = m_;
+    double unionSet = 0;
+    double joinSet = 0;
+    while (i >= 1 && j >= 1) {
+      long ps = start0 + (i - 1) * deltaT;
+      if (steps[i][j] == 0) {
+        repaired[i - 1] = ps;
+        repairedValue[i - 1] = original[j - 1];
+        System.out.println(time[j - 1] + "," + ps + "," + original[j - 1]);
+        unionSet += 1;
+        joinSet += 1;
+        i--;
+        j--;
+      } else if (steps[i][j] == 1) {
+        // add points
+        repaired[i - 1] = ps;
+        repairedValue[i - 1] = Double.NaN;
+        unionSet += 1;
+        System.out.println("add, " + ps + "," + original[j - 1]);
+        i--;
+      } else {
+        // delete points
+        unionSet += 1;
+        System.out.println(time[j - 1] + ",delete" + "," + original[j - 1]);
+        j--;
+      }
+    }
+    System.out.println(joinSet / unionSet);
+    System.out.println(f[n_][m_] / n_);
+  }
+
+  public double[] getRepairedValue() {
+    return repairedValue;
+  }
+
+  public long[] getRepaired() {
+    return repaired;
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/ValueFill.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/ValueFill.java
new file mode 100644
index 0000000000..b3bdb33eec
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/ValueFill.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.access.RowIterator;
+import org.apache.iotdb.library.util.Util;
+
+import java.util.ArrayList;
+
+public abstract class ValueFill {
+  protected int n;
+  protected long[] time;
+  protected double[] original;
+  protected double[] repaired;
+  protected double mean = 0;
+  protected double var = 0;
+  protected int not_nan_number = 0;
+
+  public ValueFill(RowIterator dataIterator) throws Exception {
+    ArrayList<Long> timeList = new ArrayList<>();
+    ArrayList<Double> originList = new ArrayList<>();
+    while (dataIterator.hasNextRow()) {
+      Row row = dataIterator.next();
+      Double v = Util.getValueAsDouble(row);
+      timeList.add(row.getTime());
+      if (!Double.isFinite(v)) {
+        originList.add(Double.NaN);
+      } else {
+        originList.add(v);
+      }
+    }
+    time = Util.toLongArray(timeList);
+    original = Util.toDoubleArray(originList);
+    n = time.length;
+    repaired = new double[n];
+  }
+
+  public abstract void fill();
+
+  public long[] getTime() {
+    return time;
+  }
+
+  public double[] getFilled() {
+    return repaired;
+  };
+
+  public void calMeanAndVar() {
+    for (double v : original) {
+      if (!Double.isNaN(v)) {
+        mean += v;
+        not_nan_number += 1;
+      }
+    }
+    assert not_nan_number > 0 : "All values are NaN";
+    mean /= not_nan_number;
+    for (double v : original) {
+      if (!Double.isNaN(v)) {
+        var += (v - mean) * (v - mean);
+      }
+    }
+    var /= not_nan_number;
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/ValueRepair.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/ValueRepair.java
new file mode 100644
index 0000000000..5b47ac776c
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/ValueRepair.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.access.RowIterator;
+import org.apache.iotdb.library.util.Util;
+
+import java.io.File;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Scanner;
+
+public abstract class ValueRepair {
+
+  protected int n;
+  protected long[] time;
+  protected double[] original;
+  protected double[] repaired;
+
+  public ValueRepair(RowIterator dataIterator) throws Exception {
+    ArrayList<Long> timeList = new ArrayList<>();
+    ArrayList<Double> originList = new ArrayList<>();
+    while (dataIterator.hasNextRow()) {
+      Row row = dataIterator.next();
+      Double v = Util.getValueAsDouble(row);
+      timeList.add(row.getTime());
+      if (!Double.isFinite(v)) {
+        originList.add(Double.NaN);
+      } else {
+        originList.add(v);
+      }
+    }
+    time = Util.toLongArray(timeList);
+    original = Util.toDoubleArray(originList);
+    n = time.length;
+    repaired = new double[n];
+    processNaN();
+  }
+
+  public ValueRepair(String filename) throws Exception {
+    Scanner sc = new Scanner(new File(filename));
+    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    sc.useDelimiter("\\s*(,|\\r|\\n)\\s*");
+    sc.nextLine();
+    ArrayList<Long> timeList = new ArrayList<>();
+    ArrayList<Double> originList = new ArrayList<>();
+    while (sc.hasNext()) {
+      timeList.add(format.parse(sc.next()).getTime());
+      Double v = sc.nextDouble();
+      if (!Double.isFinite(v)) {
+        originList.add(Double.NaN);
+      } else {
+        originList.add(v);
+      }
+    }
+    time = Util.toLongArray(timeList);
+    original = Util.toDoubleArray(originList);
+    n = time.length;
+    repaired = new double[n];
+    processNaN();
+  }
+
+  public abstract void repair();
+
+  private void processNaN() throws Exception {
+    int index1 = 0, index2;
+    while (index1 < n && Double.isNaN(original[index1])) {
+      index1++;
+    }
+    index2 = index1 + 1;
+    while (index2 < n && Double.isNaN(original[index2])) {
+      index2++;
+    }
+    if (index2 >= n) {
+      throw new Exception("At least two non-NaN values are needed");
+    }
+    for (int i = 0; i < index2; i++) {
+      original[i] =
+          original[index1]
+              + (original[index2] - original[index1])
+                  * (time[i] - time[index1])
+                  / (time[index2] - time[index1]);
+    }
+    for (int i = index2 + 1; i < n; i++) {
+      if (!Double.isNaN(original[i])) {
+        index1 = index2;
+        index2 = i;
+        for (int j = index1 + 1; j < index2; j++) {
+          original[j] =
+              original[index1]
+                  + (original[index2] - original[index1])
+                      * (time[j] - time[index1])
+                      / (time[index2] - time[index1]);
+        }
+      }
+    }
+    for (int i = index2 + 1; i < n; i++) {
+      original[i] =
+          original[index1]
+              + (original[index2] - original[index1])
+                  * (time[i] - time[index1])
+                  / (time[index2] - time[index1]);
+    }
+  }
+
+  public long[] getTime() {
+    return time;
+  }
+
+  public double[] getRepaired() {
+    return repaired;
+  }
+}
diff --git a/library-udf/src/test/java/org/apache/iotdb/library/drepair/DRepairTests.java b/library-udf/src/test/java/org/apache/iotdb/library/drepair/DRepairTests.java
new file mode 100644
index 0000000000..4bf48b4b37
--- /dev/null
+++ b/library-udf/src/test/java/org/apache/iotdb/library/drepair/DRepairTests.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.library.drepair;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.integration.env.ConfigFactory;
+import org.apache.iotdb.integration.env.EnvFactory;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.junit.Assert.fail;
+
+public class DRepairTests {
+  protected static final int ITERATION_TIMES = 100_000;
+  protected static final int DELTA_T = 100;
+
+  private static final float oldUdfCollectorMemoryBudgetInMB =
+      IoTDBDescriptor.getInstance().getConfig().getUdfCollectorMemoryBudgetInMB();
+  private static final float oldUdfTransformerMemoryBudgetInMB =
+      IoTDBDescriptor.getInstance().getConfig().getUdfTransformerMemoryBudgetInMB();
+  private static final float oldUdfReaderMemoryBudgetInMB =
+      IoTDBDescriptor.getInstance().getConfig().getUdfReaderMemoryBudgetInMB();
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    ConfigFactory.getConfig()
+        .setUdfCollectorMemoryBudgetInMB(5)
+        .setUdfTransformerMemoryBudgetInMB(5)
+        .setUdfReaderMemoryBudgetInMB(5);
+    EnvFactory.getEnv().initBeforeClass();
+    createTimeSeries();
+    generateData();
+    registerUDF();
+  }
+
+  private static void createTimeSeries() throws MetadataException {
+    IoTDB.metaManager.setStorageGroup(new PartialPath("root.vehicle"));
+    // test series for TimeStampRepair
+    IoTDB.metaManager.createTimeseries(
+        new PartialPath("root.vehicle.d1.s1"),
+        TSDataType.INT32,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    // test series for ValueFill
+    IoTDB.metaManager.createTimeseries(
+        new PartialPath("root.vehicle.d2.s1"),
+        TSDataType.INT64,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    // test series for ValueRepair
+    IoTDB.metaManager.createTimeseries(
+        new PartialPath("root.vehicle.d3.s1"),
+        TSDataType.FLOAT,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+  }
+
+  private static void generateData() {
+    double x = -100d, y = 100d; // borders of random value
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      for (int i = 1; i <= ITERATION_TIMES; ++i) {
+        if (Math.random() < 0.99) {
+          statement.execute(
+              String.format(
+                  "insert into root.vehicle.d1(timestamp,s1) values(%d,%f)",
+                  (long) i * DELTA_T, Math.floor(x + Math.random() * y % (y - x + 1))));
+        } else {
+          statement.execute(
+              String.format(
+                  "insert into root.vehicle.d1(timestamp,s1) values(%d,%f)",
+                  (long) i * DELTA_T + (long) Math.floor((Math.random() - 0.5) * DELTA_T),
+                  Math.floor(x + Math.random() * y % (y - x + 1))));
+        }
+      }
+      for (int i = 1; i <= ITERATION_TIMES; ++i) {
+        if (Math.random() < 0.97) {
+          statement.execute(
+              String.format(
+                  "insert into root.vehicle.d2(timestamp,s1) values(%d,%f)",
+                  (long) i * DELTA_T, Math.floor(x + Math.random() * y % (y - x + 1))));
+        } else {
+          statement.execute(
+              String.format(
+                  "insert into root.vehicle.d2(timestamp,s1) values(%d,%f)",
+                  (long) i * DELTA_T, Double.NaN));
+        }
+      }
+      for (int i = 1; i <= ITERATION_TIMES; ++i) {
+        statement.execute(
+            String.format(
+                "insert into root.vehicle.d3(timestamp,s1) values(%d,%f)",
+                (long) i * DELTA_T,
+                i / (double) ITERATION_TIMES * (y - x) + (Math.random() - 0.5) * (y - x)));
+      }
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private static void registerUDF() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute(
+          "create function timestamprepair as 'org.apache.iotdb.library.dquality.UDTFTimestampRepair'");
+      statement.execute(
+          "create function valuefill as 'org.apache.iotdb.library.dquality.UDTFValueFill'");
+      statement.execute(
+          "create function valuerepair as 'org.apache.iotdb.library.dquality.UDTFValueRepair'");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanAfterClass();
+    ConfigFactory.getConfig()
+        .setUdfCollectorMemoryBudgetInMB(oldUdfCollectorMemoryBudgetInMB)
+        .setUdfTransformerMemoryBudgetInMB(oldUdfTransformerMemoryBudgetInMB)
+        .setUdfReaderMemoryBudgetInMB(oldUdfReaderMemoryBudgetInMB);
+  }
+
+  @Test
+  public void testTimestampRepair1() {
+    String sqlStr =
+        String.format("select timestamprepair(d1.s1,'interval'='%d') from root.vehicle", DELTA_T);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testTimestampRepair2() {
+    String sqlStr = "select timestamprepair(d1.s1,'method'='median') from root.vehicle";
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testTimestampRepair3() {
+    String sqlStr = "select timestamprepair(d1.s1,'method'='mode') from root.vehicle";
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testTimestampRepair4() {
+    String sqlStr = "select timestamprepair(d1.s1,'method'='cluster') from root.vehicle";
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testValueFill1() {
+    String sqlStr = "select valuefill(d2.s1,'method'='previous') from root.vehicle";
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testValueFill2() {
+    String sqlStr = "select valuefill(d2.s1,'method'='linear') from root.vehicle";
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testValueFill3() {
+    String sqlStr = "select valuefill(d2.s1,'method'='likelihood') from root.vehicle";
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testValueFill4() {
+    String sqlStr = "select valuefill(d2.s1,'method'='ar') from root.vehicle";
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testValueFill5() {
+    String sqlStr = "select valuefill(d2.s1,'method'='ma') from root.vehicle";
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testValueFill6() {
+    String sqlStr = "select valuefill(d2.s1,'method'='screen') from root.vehicle";
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testValueRepair1() {
+    String sqlStr = "select valuerepair(d3.s1,'method'='screen') from root.vehicle";
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testValueRepair2() {
+    String sqlStr = "select valuerepair(d3.s1,'method'='lsgreedy') from root.vehicle";
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+}