You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/08 12:39:11 UTC
[iotdb] branch master updated: [IOTDB-2304]Library-UDF Data Repairing Functions (#4833)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 87c8ab4ac8 [IOTDB-2304]Library-UDF Data Repairing Functions (#4833)
87c8ab4ac8 is described below
commit 87c8ab4ac830be9f3b28a0548035975f563eb5c7
Author: Pengyu Chen <48...@users.noreply.github.com>
AuthorDate: Fri Apr 8 20:39:07 2022 +0800
[IOTDB-2304]Library-UDF Data Repairing Functions (#4833)
---
docs/zh/UserGuide/UDF-Library/Data-Repairing.md | 1 -
.../iotdb/library/drepair/UDTFTimestampRepair.java | 102 +++++++
.../iotdb/library/drepair/UDTFValueFill.java | 104 +++++++
.../iotdb/library/drepair/UDTFValueRepair.java | 122 ++++++++
.../apache/iotdb/library/drepair/util/ARFill.java | 93 ++++++
.../iotdb/library/drepair/util/LikelihoodFill.java | 128 +++++++++
.../iotdb/library/drepair/util/LinearFill.java | 57 ++++
.../iotdb/library/drepair/util/LsGreedy.java | 141 +++++++++
.../apache/iotdb/library/drepair/util/MAFill.java | 57 ++++
.../iotdb/library/drepair/util/MeanFill.java | 40 +++
.../iotdb/library/drepair/util/PreviousFill.java | 49 ++++
.../apache/iotdb/library/drepair/util/Screen.java | 138 +++++++++
.../iotdb/library/drepair/util/ScreenFill.java | 154 ++++++++++
.../library/drepair/util/TimestampInterval.java | 213 ++++++++++++++
.../library/drepair/util/TimestampRepair.java | 154 ++++++++++
.../iotdb/library/drepair/util/ValueFill.java | 81 ++++++
.../iotdb/library/drepair/util/ValueRepair.java | 130 +++++++++
.../apache/iotdb/library/drepair/DRepairTests.java | 317 +++++++++++++++++++++
18 files changed, 2080 insertions(+), 1 deletion(-)
diff --git a/docs/zh/UserGuide/UDF-Library/Data-Repairing.md b/docs/zh/UserGuide/UDF-Library/Data-Repairing.md
index 7a2c7637e5..e75d7c1782 100644
--- a/docs/zh/UserGuide/UDF-Library/Data-Repairing.md
+++ b/docs/zh/UserGuide/UDF-Library/Data-Repairing.md
@@ -135,7 +135,6 @@ select timestamprepair(s1) from root.test.d2
**备注:** AR 模型采用 AR(1),时序列需满足自相关条件,否则将输出单个数据点 (0, 0.0).
### 使用示例
-
#### 使用 linear 方法进行填补
当`method`缺省或取值为 'linear' 时,本函数将使用线性插值方法进行填补。
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFTimestampRepair.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFTimestampRepair.java
new file mode 100644
index 0000000000..85bcbc3bb0
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFTimestampRepair.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.library.drepair;
+
+import org.apache.iotdb.db.query.udf.api.UDTF;
+import org.apache.iotdb.db.query.udf.api.access.RowWindow;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.library.drepair.util.TimestampRepair;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+/** This function is used for timestamp repair. */
+public class UDTFTimestampRepair implements UDTF {
+ String intervalMethod;
+ int interval;
+ int intervalMode;
+
+ @Override
+ public void validate(UDFParameterValidator validator) throws Exception {
+ validator
+ .validateInputSeriesNumber(1)
+ .validateInputSeriesDataType(
+ 0, TSDataType.DOUBLE, TSDataType.FLOAT, TSDataType.INT32, TSDataType.INT64)
+ .validate(
+ x -> (Integer) x >= 0,
+ "Interval should be a positive integer.",
+ validator.getParameters().getIntOrDefault("interval", 0));
+ }
+
+ @Override
+ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+ throws Exception {
+ configurations
+ .setAccessStrategy(new SlidingSizeWindowAccessStrategy(Integer.MAX_VALUE))
+ .setOutputDataType(parameters.getDataType(0));
+ intervalMethod = parameters.getStringOrDefault("method", "Median");
+ interval = parameters.getIntOrDefault("interval", 0);
+ if (interval > 0) {
+ intervalMode = interval;
+ } else if ("Median".equalsIgnoreCase(intervalMethod)) {
+ intervalMode = -1;
+ } else if ("Mode".equalsIgnoreCase(intervalMethod)) {
+ intervalMode = -2;
+ } else if ("Cluster".equalsIgnoreCase(intervalMethod)) {
+ intervalMode = -3;
+ } else {
+ throw new Exception("Illegal method.");
+ }
+ }
+
+ @Override
+ public void transform(RowWindow rowWindow, PointCollector collector) throws Exception {
+ TimestampRepair ts = new TimestampRepair(rowWindow.getRowIterator(), intervalMode, 2);
+ ts.dpRepair();
+ long[] timestamp = ts.getRepaired();
+ double[] value = ts.getRepairedValue();
+ switch (rowWindow.getDataType(0)) {
+ case DOUBLE:
+ for (int i = 0; i < timestamp.length; i++) {
+ collector.putDouble(timestamp[i], value[i]);
+ }
+ break;
+ case FLOAT:
+ for (int i = 0; i < timestamp.length; i++) {
+ collector.putFloat(timestamp[i], (float) value[i]);
+ }
+ break;
+ case INT32:
+ for (int i = 0; i < timestamp.length; i++) {
+ collector.putInt(timestamp[i], (int) value[i]);
+ }
+ break;
+ case INT64:
+ for (int i = 0; i < timestamp.length; i++) {
+ collector.putLong(timestamp[i], (long) value[i]);
+ }
+ break;
+ default:
+ throw new Exception();
+ }
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFValueFill.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFValueFill.java
new file mode 100644
index 0000000000..9d148b573b
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFValueFill.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair;
+
+import org.apache.iotdb.db.query.udf.api.UDTF;
+import org.apache.iotdb.db.query.udf.api.access.RowWindow;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.library.drepair.util.ARFill;
+import org.apache.iotdb.library.drepair.util.LikelihoodFill;
+import org.apache.iotdb.library.drepair.util.LinearFill;
+import org.apache.iotdb.library.drepair.util.MeanFill;
+import org.apache.iotdb.library.drepair.util.PreviousFill;
+import org.apache.iotdb.library.drepair.util.ScreenFill;
+import org.apache.iotdb.library.drepair.util.ValueFill;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+/** This function is used to interpolate time series. */
+public class UDTFValueFill implements UDTF {
+ private String method;
+
+ @Override
+ public void validate(UDFParameterValidator validator) throws Exception {
+ validator
+ .validateInputSeriesNumber(1)
+ .validateInputSeriesDataType(
+ 0, TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.INT32, TSDataType.INT64);
+ }
+
+ @Override
+ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+ throws Exception {
+ configurations
+ .setAccessStrategy(new SlidingSizeWindowAccessStrategy(Integer.MAX_VALUE))
+ .setOutputDataType(parameters.getDataType(0));
+ method = parameters.getStringOrDefault("method", "linear");
+ }
+
+ @Override
+ public void transform(RowWindow rowWindow, PointCollector collector) throws Exception {
+ ValueFill vf;
+ if ("previous".equalsIgnoreCase(method)) {
+ vf = new PreviousFill(rowWindow.getRowIterator());
+ } else if ("linear".equalsIgnoreCase(method)) {
+ vf = new LinearFill(rowWindow.getRowIterator());
+ } else if ("mean".equalsIgnoreCase(method)) {
+ vf = new MeanFill(rowWindow.getRowIterator());
+ } else if ("ar".equalsIgnoreCase(method)) {
+ vf = new ARFill(rowWindow.getRowIterator());
+ } else if ("screen".equalsIgnoreCase(method)) {
+ vf = new ScreenFill(rowWindow.getRowIterator());
+ } else if ("likelihood".equalsIgnoreCase(method)) {
+ vf = new LikelihoodFill(rowWindow.getRowIterator());
+ } else {
+ throw new Exception("Illegal method");
+ }
+ vf.fill();
+ double[] repaired = vf.getFilled();
+ long[] time = vf.getTime();
+ switch (rowWindow.getDataType(0)) {
+ case DOUBLE:
+ for (int i = 0; i < time.length; i++) {
+ collector.putDouble(time[i], repaired[i]);
+ }
+ break;
+ case FLOAT:
+ for (int i = 0; i < time.length; i++) {
+ collector.putFloat(time[i], (float) repaired[i]);
+ }
+ break;
+ case INT32:
+ for (int i = 0; i < time.length; i++) {
+ collector.putInt(time[i], (int) Math.round(repaired[i]));
+ }
+ break;
+ case INT64:
+ for (int i = 0; i < time.length; i++) {
+ collector.putLong(time[i], Math.round(repaired[i]));
+ }
+ break;
+ default:
+ throw new Exception();
+ }
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFValueRepair.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFValueRepair.java
new file mode 100644
index 0000000000..a7cd8ca774
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFValueRepair.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.library.drepair;
+
+import org.apache.iotdb.db.query.udf.api.UDTF;
+import org.apache.iotdb.db.query.udf.api.access.RowWindow;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.library.drepair.util.LsGreedy;
+import org.apache.iotdb.library.drepair.util.Screen;
+import org.apache.iotdb.library.drepair.util.ValueRepair;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+/** This function is used to repair the value of the time series. */
+public class UDTFValueRepair implements UDTF {
+ String method;
+ double minSpeed;
+ double maxSpeed;
+ double center;
+ double sigma;
+
+ @Override
+ public void validate(UDFParameterValidator validator) throws Exception {
+ validator
+ .validateInputSeriesNumber(1)
+ .validateInputSeriesDataType(
+ 0, TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.INT32, TSDataType.INT64)
+ .validate(
+ x -> (double) x > 0,
+ "Parameter $sigma$ should be larger than 0.",
+ validator.getParameters().getDoubleOrDefault("sigma", 1.0))
+ .validate(
+ params -> (double) params[0] < (double) params[1],
+ "parameter $minSpeed$ should be smaller than $maxSpeed$.",
+ validator.getParameters().getDoubleOrDefault("minSpeed", -1),
+ validator.getParameters().getDoubleOrDefault("maxSpeed", 1));
+ }
+
+ @Override
+ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+ throws Exception {
+ configurations
+ .setAccessStrategy(new SlidingSizeWindowAccessStrategy(Integer.MAX_VALUE))
+ .setOutputDataType(parameters.getDataType(0));
+ method = parameters.getStringOrDefault("method", "screen");
+ minSpeed = parameters.getDoubleOrDefault("minSpeed", Double.NaN);
+ maxSpeed = parameters.getDoubleOrDefault("maxSpeed", Double.NaN);
+ center = parameters.getDoubleOrDefault("center", 0);
+ sigma = parameters.getDoubleOrDefault("sigma", Double.NaN);
+ }
+
+ @Override
+ public void transform(RowWindow rowWindow, PointCollector collector) throws Exception {
+ ValueRepair vr;
+ if ("screen".equalsIgnoreCase(method)) {
+ Screen screen = new Screen(rowWindow.getRowIterator());
+ if (!Double.isNaN(minSpeed)) {
+ screen.setSmin(minSpeed);
+ }
+ if (!Double.isNaN(maxSpeed)) {
+ screen.setSmax(maxSpeed);
+ }
+ vr = screen;
+ } else if ("lsgreedy".equalsIgnoreCase(method)) {
+ LsGreedy lsGreedy = new LsGreedy(rowWindow.getRowIterator());
+ if (!Double.isNaN(sigma)) {
+ lsGreedy.setSigma(sigma);
+ }
+ lsGreedy.setCenter(center);
+ vr = lsGreedy;
+ } else {
+ throw new Exception("Illegal method.");
+ }
+ vr.repair();
+ double[] repaired = vr.getRepaired();
+ long[] time = vr.getTime();
+ switch (rowWindow.getDataType(0)) {
+ case DOUBLE:
+ for (int i = 0; i < time.length; i++) {
+ collector.putDouble(time[i], repaired[i]);
+ }
+ break;
+ case FLOAT:
+ for (int i = 0; i < time.length; i++) {
+ collector.putFloat(time[i], (float) repaired[i]);
+ }
+ break;
+ case INT32:
+ for (int i = 0; i < time.length; i++) {
+ collector.putInt(time[i], (int) Math.round(repaired[i]));
+ }
+ break;
+ case INT64:
+ for (int i = 0; i < time.length; i++) {
+ collector.putLong(time[i], Math.round(repaired[i]));
+ }
+ break;
+ default:
+ throw new Exception();
+ }
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/ARFill.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/ARFill.java
new file mode 100644
index 0000000000..90e5b7c783
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/ARFill.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.db.query.udf.api.access.RowIterator;
+
+public class ARFill extends ValueFill {
+ // TODO Higer order AR regression
+ private int order = 1;
+ private double theta = 1e10;
+
+ public ARFill(RowIterator dataIterator) throws Exception {
+ super(dataIterator);
+ calMeanAndVar();
+ }
+
+ public void setOrder(int order) {
+ this.order = order;
+ }
+
+ @Override
+ public void fill() {
+ // compute \sum x_t * x_{t-1}
+ double acf = 0;
+ double factor = 0;
+ int acf_cnt = 0;
+ for (int i = 0; i < original.length - 1; i++) {
+ double left = original[i], right = original[i + 1];
+ if (Double.isNaN(left)) {
+ left = 0;
+ }
+ if (Double.isNaN(right)) {
+ right = 0;
+ }
+ acf += left * right;
+ factor += left * left;
+ acf_cnt += 1;
+ }
+ // acf /= acf_cnt;
+ this.theta = acf / factor;
+ try {
+ assert this.theta < 1;
+ } catch (AssertionError e) {
+ System.out.println("Cannot fit AR(1) model. Please try another method.");
+ this.time = new long[] {0};
+ this.repaired = new double[] {0D};
+ return;
+ }
+ double mean_epsilon = 0;
+ double var_epsilon = 0;
+ double cnt_epsilon = 0;
+ for (int i = 0; i < original.length - 1; i++) {
+ double left = original[i], right = original[i + 1];
+ if (Double.isNaN(left) || Double.isNaN(right)) {
+ continue;
+ }
+ cnt_epsilon += 1;
+ double epsilon = right - left * this.theta;
+ mean_epsilon += epsilon;
+ var_epsilon += epsilon * epsilon;
+ }
+ mean_epsilon /= cnt_epsilon;
+ var_epsilon /= cnt_epsilon;
+ for (int i = 0; i < original.length; i++) {
+ double yt = original[i];
+ if (!Double.isNaN(yt)) {
+ repaired[i] = yt;
+ } else {
+ if (i != 0) {
+ repaired[i] = this.theta * repaired[i - 1] + mean_epsilon;
+ } else {
+ repaired[i] = this.mean;
+ }
+ }
+ }
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/LikelihoodFill.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/LikelihoodFill.java
new file mode 100644
index 0000000000..f293a16734
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/LikelihoodFill.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.db.query.udf.api.access.RowIterator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class LikelihoodFill extends ValueFill {
+
+ private double stopErrorRatio = 0.0001;
+ private int stopIteration = Integer.MAX_VALUE;
+
+ public LikelihoodFill(RowIterator dataIterator) throws Exception {
+ super(dataIterator);
+ }
+
+ @Override
+ public void fill() {
+ // find missing points
+ List<Integer> repairIndexList = new ArrayList<>();
+ List<Double> repairValueList = new ArrayList<>();
+ // initialize linear interpolation
+ int previousNotNaN = -1;
+ for (int i = 0; i < n; i++) {
+ if (Double.isNaN(original[i])) {
+ repairIndexList.add(i);
+ repaired[i] = original[i];
+ } else {
+ if (previousNotNaN >= 0 && previousNotNaN + 1 != i) {
+ double delta =
+ (original[i] - original[previousNotNaN]) / (time[i] - time[previousNotNaN]);
+ for (int j = previousNotNaN + 1; j < i; j++) {
+ repaired[j] = original[previousNotNaN] + delta * (time[j] - time[previousNotNaN]);
+ repairValueList.add(repaired[j]);
+ }
+ }
+ repaired[i] = original[i];
+ previousNotNaN = i;
+ }
+ }
+ // update filled values iteratively
+ double errorRatio = 1;
+ int iteration = 0;
+ while (errorRatio > stopErrorRatio && iteration < stopIteration) {
+ errorRatio = 0.0;
+ // find best repair from last iteration
+ for (int i = 0; i < repairIndexList.size(); i++) {
+ int currentIndex = repairIndexList.get(i);
+ // no repair for data at beginning and end
+ if (currentIndex == 0
+ || Double.isNaN(repaired[currentIndex - 1])
+ || currentIndex == n - 1
+ || Double.isNaN(repaired[currentIndex + 1])) {
+ continue;
+ }
+ double intervalPrev1 = time[currentIndex] - time[currentIndex - 1];
+ double intervalPost1 = time[currentIndex + 1] - time[currentIndex];
+ double squareAPrev = 0.0, squareBPrev = 0.0;
+ if (currentIndex >= 2 && !Double.isNaN(repaired[currentIndex - 2])) {
+ double intervalPrev2 = time[currentIndex - 2] - time[currentIndex - 1];
+ squareAPrev = 1.0 / (intervalPrev1 * intervalPrev1);
+ squareBPrev =
+ 2.0 * repaired[currentIndex - 2] / (intervalPrev2 * intervalPrev1)
+ - 2.0
+ * (intervalPrev2 + intervalPrev1)
+ * repaired[currentIndex - 1]
+ / (intervalPrev2 * intervalPrev1 * intervalPrev1);
+ }
+ double squareACurr =
+ (intervalPrev1 + intervalPost1)
+ * (intervalPrev1 + intervalPost1)
+ / (intervalPrev1 * intervalPrev1 * intervalPost1 * intervalPost1);
+ double squareBCurr =
+ -2.0
+ * (intervalPrev1 + intervalPost1)
+ * repaired[currentIndex - 1]
+ / (intervalPrev1 * intervalPrev1 * intervalPost1)
+ - 2.0
+ * (intervalPrev1 + intervalPost1)
+ * repaired[currentIndex + 1]
+ / (intervalPrev1 * intervalPost1 * intervalPost1);
+ double squareAPost = 0.0, squareBPost = 0.0;
+ if (currentIndex <= n - 3 && !Double.isNaN(repaired[currentIndex + 2])) {
+ double intervalPost2 = time[currentIndex + 2] - time[currentIndex + 1];
+ squareAPost = 1.0 / (intervalPost1 * intervalPost1);
+ squareBPost =
+ 2.0 * repaired[currentIndex + 2] / (intervalPost1 * intervalPost2)
+ - 2.0
+ * (intervalPost1 + intervalPost2)
+ * repaired[currentIndex + 1]
+ / (intervalPost1 * intervalPost1 * intervalPost2);
+ }
+ // minimize likelihood
+ repairValueList.set(
+ i,
+ -(squareBPrev + squareBCurr + squareBPost)
+ / (2.0 * (squareAPrev + squareACurr + squareAPost)));
+ }
+ for (int i = 0; i < repairIndexList.size(); i++) {
+ int currentIndex = repairIndexList.get(i);
+ double previousRepair = repaired[currentIndex];
+ double updatedRepair = repairValueList.get(i);
+ double updatedRatio = Math.abs(updatedRepair - previousRepair) / previousRepair;
+ errorRatio = Math.max(updatedRatio, errorRatio);
+ repaired[currentIndex] = updatedRepair;
+ }
+ iteration++;
+ }
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/LinearFill.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/LinearFill.java
new file mode 100644
index 0000000000..8c8a41b29b
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/LinearFill.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.db.query.udf.api.access.RowIterator;
+
+public class LinearFill extends ValueFill {
+
+ private int prevNotNaN = -1;
+
+ public LinearFill(RowIterator dataIterator) throws Exception {
+ super(dataIterator);
+ }
+
+ @Override
+ public void fill() {
+ for (int i = 0; i < original.length; i++) {
+ if (!Double.isNaN(original[i])) {
+ double k = 0;
+ if (prevNotNaN > 0) {
+ k = original[i] - original[prevNotNaN];
+ k /= i - prevNotNaN;
+ }
+ int t = prevNotNaN + 1;
+ while (t < i) {
+ repaired[t] = original[i] + k * (t - i);
+ t++;
+ }
+ repaired[i] = original[i];
+ prevNotNaN = i;
+ }
+ }
+ if (prevNotNaN < original.length - 1 && prevNotNaN >= 0) {
+ int t = prevNotNaN;
+ while (t <= original.length - 1) {
+ repaired[t] = original[prevNotNaN];
+ t++;
+ }
+ }
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/LsGreedy.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/LsGreedy.java
new file mode 100644
index 0000000000..276841c76f
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/LsGreedy.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.db.query.udf.api.access.RowIterator;
+import org.apache.iotdb.library.util.Util;
+
+import java.util.PriorityQueue;
+
+public class LsGreedy extends ValueRepair {
+
+ private double center = 0, sigma;
+ private final double eps = 1e-12;
+
+ public LsGreedy(RowIterator dataIterator) throws Exception {
+ super(dataIterator);
+ setParameters();
+ }
+
+ public LsGreedy(String filename) throws Exception {
+ super(filename);
+ setParameters();
+ }
+
+ private void setParameters() {
+ double[] speed = Util.speed(original, time);
+ double[] speedchange = Util.variation(speed);
+ sigma = Util.mad(speedchange);
+ }
+
+ @Override
+ public void repair() {
+ repaired = original.clone();
+ RepairNode[] table = new RepairNode[n];
+ PriorityQueue<RepairNode> heap = new PriorityQueue<>();
+ for (int i = 1; i < n - 1; i++) {
+ RepairNode node = new RepairNode(i);
+ table[i] = node;
+ if (Math.abs(node.getU() - center) > 3 * sigma) {
+ heap.add(node);
+ }
+ }
+ while (true) {
+ RepairNode top = heap.peek();
+ if (top == null || Math.abs(top.getU() - center) < Math.max(eps, 3 * sigma)) {
+ break;
+ } // stop greedy algorithm when the heap is empty or all speed changes locate in center±3sigma
+ top.modify();
+ for (int i = Math.max(1, top.getIndex() - 1); i <= Math.min(n - 2, top.getIndex() + 1); i++) {
+ heap.remove(table[i]);
+ RepairNode temp = new RepairNode(i);
+ table[i] = temp;
+ if (Math.abs(temp.getU() - center) > 3 * sigma) {
+ heap.add(temp);
+ }
+ }
+ }
+ }
+
+ class RepairNode implements Comparable<RepairNode> {
+
+ private final int index;
+ private final double u; // speed variation
+
+ public RepairNode(int index) {
+ this.index = index;
+ double v1 = repaired[index + 1] - repaired[index];
+ v1 = v1 / (time[index + 1] - time[index]);
+ double v2 = repaired[index] - repaired[index - 1];
+ v2 = v2 / (time[index] - time[index - 1]);
+ this.u = v1 - v2;
+ }
+
+ /**
+ * modify values of repaired points, to make the difference of its speed variation and center is
+ * 1 sigma
+ */
+ public void modify() {
+ double temp;
+ if (sigma < eps) {
+ temp = Math.abs(u - center);
+ } else {
+ temp = Math.max(sigma, Math.abs(u - center) / 3);
+ }
+ temp *=
+ (double) (time[index + 1] - time[index])
+ * (time[index] - time[index - 1])
+ / (time[index + 1] - time[index - 1]);
+ if (this.u > center) {
+ repaired[index] += temp;
+ } else {
+ repaired[index] -= temp;
+ }
+ }
+
+ @Override
+ public int compareTo(RepairNode o) {
+ double u1 = Math.abs(this.u - center);
+ double u2 = Math.abs(o.u - center);
+ if (u1 > u2) {
+ return -1;
+ } else if (u1 == u2) {
+ return 0;
+ } else {
+ return 1;
+ }
+ }
+
+ public int getIndex() {
+ return index;
+ }
+
+ public double getU() {
+ return u;
+ }
+ }
+
+ public void setCenter(double center) {
+ this.center = center;
+ }
+
+ public void setSigma(double sigma) {
+ this.sigma = sigma;
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/MAFill.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/MAFill.java
new file mode 100644
index 0000000000..c1e90d5382
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/MAFill.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.db.query.udf.api.access.RowIterator;
+
+public class MAFill extends ValueFill {
+ int window_size = 5;
+ double window_sum = 0;
+ int window_cnt = 0;
+ int l = 0;
+ int r = window_size - 1;
+
+ public MAFill(RowIterator dataIterator) throws Exception {
+ super(dataIterator);
+ }
+
+ @Override
+ public void fill() {
+ for (int i = l; i < r && i < original.length; i++) {
+ if (!Double.isNaN(original[i])) {
+ window_sum += original[i];
+ window_cnt += 1;
+ }
+ }
+ for (int i = 0; i < original.length; i++) {
+ if (!Double.isNaN(original[i])) {
+ repaired[i] = original[i];
+ } else {
+ repaired[i] = window_sum / window_cnt;
+ }
+ if (i <= (window_size - 1) / 2 || i >= original.length - (window_size - 1) / 2 - 1) continue;
+ if (!Double.isNaN(original[r])) {
+ window_sum += original[r];
+ window_cnt += 1;
+ }
+ l += 1;
+ r += 1;
+ }
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/MeanFill.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/MeanFill.java
new file mode 100644
index 0000000000..f7d8d6377b
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/MeanFill.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.db.query.udf.api.access.RowIterator;
+
+public class MeanFill extends ValueFill {
+ public MeanFill(RowIterator dataIterator) throws Exception {
+ super(dataIterator);
+ calMeanAndVar();
+ }
+
+ @Override
+ public void fill() {
+ for (int i = 0; i < original.length; i++) {
+ double yt = original[i];
+ if (!Double.isNaN(yt)) {
+ repaired[i] = yt;
+ } else {
+ repaired[i] = mean;
+ }
+ }
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/PreviousFill.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/PreviousFill.java
new file mode 100644
index 0000000000..a424d50b99
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/PreviousFill.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.db.query.udf.api.access.RowIterator;
+
+public class PreviousFill extends ValueFill {
+
+ private long previousTime = -1;
+ private double previousValue;
+
+ public PreviousFill(RowIterator dataIterator) throws Exception {
+ super(dataIterator);
+ }
+
+ @Override
+ public void fill() {
+ // NaN at the beginning is not filled
+ for (int i = 0; i < original.length; i++) {
+ if (Double.isNaN(original[i])) {
+ if (previousTime == -1) { // 序列初始为空,直接pass
+ repaired[i] = original[i];
+ } else {
+ repaired[i] = previousValue;
+ }
+ } else {
+ previousTime = time[i];
+ previousValue = original[i];
+ repaired[i] = original[i];
+ }
+ }
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/Screen.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/Screen.java
new file mode 100644
index 0000000000..9bc3c93c75
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/Screen.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.db.query.udf.api.access.RowIterator;
+import org.apache.iotdb.library.util.Util;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.math3.stat.descriptive.rank.Median;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+public class Screen extends ValueRepair {
+
+ private double smin, smax;
+ private double w;
+
+ public Screen(RowIterator dataIterator) throws Exception {
+ super(dataIterator);
+ setParameters();
+ }
+
+ public Screen(String filename) throws Exception {
+ super(filename);
+ setParameters();
+ }
+
+ private void setParameters() {
+ // set the default speed threshold
+ double[] speed = Util.speed(original, time);
+ Median median = new Median();
+ double mid = median.evaluate(speed);
+ double sigma = Util.mad(speed);
+ smax = mid + 3 * sigma;
+ smin = mid - 3 * sigma;
+ // set the default window size
+ double[] interval = Util.variation(time);
+ w = 5 * median.evaluate(interval);
+ }
+
+ @Override
+ public void repair() {
+ // fixed window
+ ArrayList<Pair<Long, Double>> ans = new ArrayList<>();
+ ans.add(Pair.of(time[0], original[0]));
+ int startIndex = 0;
+ for (int i = 1; i < n; i++) {
+ ans.add(Pair.of(time[i], original[i]));
+ while (ans.get(startIndex).getLeft() + w < ans.get(i).getLeft()) {
+ // sliding window
+ local(ans, startIndex);
+ startIndex++;
+ }
+ }
+ while (startIndex < n) {
+ local(ans, startIndex);
+ startIndex++;
+ }
+ int k = 0;
+ for (Pair<Long, Double> p : ans) {
+ this.repaired[k] = p.getRight();
+ k++;
+ }
+ }
+
+ private double getMedian(ArrayList<Pair<Long, Double>> list, int index) {
+ int m = 0;
+ while (index + m + 1 < list.size()
+ && list.get(index + m + 1).getLeft() <= list.get(index).getLeft() + w) {
+ m++;
+ }
+ double[] x = new double[2 * m + 1];
+ x[0] = list.get(index).getRight();
+ for (int i = 1; i <= m; i++) {
+ x[i] =
+ list.get(index + i).getRight()
+ + smin * (list.get(index).getLeft() - list.get(index + i).getLeft());
+ x[i + m] =
+ list.get(index + i).getRight()
+ + smax * (list.get(index).getLeft() - list.get(index + i).getLeft());
+ }
+ Arrays.sort(x);
+ return x[m];
+ }
+
+ private double getRepairedValue(ArrayList<Pair<Long, Double>> list, int index, double mid) {
+ double xmin =
+ list.get(index - 1).getRight()
+ + smin * (list.get(index).getLeft() - list.get(index - 1).getLeft());
+ double xmax =
+ list.get(index - 1).getRight()
+ + smax * (list.get(index).getLeft() - list.get(index - 1).getLeft());
+ double temp = mid;
+ temp = Math.min(xmax, temp);
+ temp = Math.max(xmin, temp);
+ return temp;
+ }
+
+ private void local(ArrayList<Pair<Long, Double>> list, int index) {
+ double mid = getMedian(list, index);
+ // 计算x_k'
+ if (index == 0) {
+ list.set(index, Pair.of(list.get(index).getLeft(), mid));
+ } else {
+ double temp = getRepairedValue(list, index, mid);
+ list.set(index, Pair.of(list.get(index).getLeft(), temp));
+ }
+ }
+
+ public void setSmin(double smin) {
+ this.smin = smin;
+ }
+
+ public void setSmax(double smax) {
+ this.smax = smax;
+ }
+
+ public void setW(int w) {
+ this.w = w;
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/ScreenFill.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/ScreenFill.java
new file mode 100644
index 0000000000..6badf0446f
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/ScreenFill.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.db.query.udf.api.access.RowIterator;
+import org.apache.iotdb.library.util.Util;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.math3.stat.descriptive.rank.Median;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+// Reference to Screen.java
+public class ScreenFill extends ValueFill {
+
+ private double smin, smax;
+ private double w;
+
+ public ScreenFill(RowIterator dataIterator) throws Exception {
+ super(dataIterator);
+ setParameters();
+ }
+
+ @Override
+ public long[] getTime() {
+ return super.getTime();
+ }
+
+ @Override
+ public double[] getFilled() {
+ return super.getFilled();
+ }
+
+ @Override
+ public void calMeanAndVar() {
+ super.calMeanAndVar();
+ }
+
+ @Override
+ public void fill() {
+ ArrayList<Pair<Long, Double>> ans = new ArrayList<>();
+ int currentIndex = 0;
+ while (currentIndex < n) {
+ ans.add(Pair.of(time[currentIndex], original[currentIndex]));
+ if (Double.isNaN(original[currentIndex])) {
+ int startIndex = currentIndex;
+ long fillTime = time[currentIndex];
+ int nextIndex = -1;
+ currentIndex++;
+ while (currentIndex < n && fillTime + w >= time[currentIndex]) {
+ ans.add(Pair.of(time[currentIndex], original[currentIndex]));
+ if (Double.isNaN(original[currentIndex]) && nextIndex == -1) {
+ nextIndex = currentIndex;
+ }
+ currentIndex++;
+ }
+ local(ans, startIndex);
+ if (nextIndex > 0) {
+ while (currentIndex > nextIndex) {
+ ans.remove(currentIndex - 1);
+ currentIndex--;
+ }
+ }
+ } else {
+ currentIndex++;
+ }
+ }
+ int k = 0;
+ for (Pair<Long, Double> p : ans) {
+ this.repaired[k] = p.getRight();
+ k++;
+ }
+ }
+
+ private double getMedian(ArrayList<Pair<Long, Double>> list, int index) {
+ int m = 0;
+ while (index + m + 1 < list.size()
+ && list.get(index + m + 1).getLeft() <= list.get(index).getLeft() + w) {
+ m++;
+ }
+ int count = 0;
+ for (int i = 1; i <= m; i++) {
+ if (!Double.isNaN(list.get(index + i).getRight())) {
+ count++;
+ }
+ }
+ double x[] = new double[2 * count];
+ int temp_count = 0;
+ for (int i = 1; i <= m; i++) {
+ if (!Double.isNaN(list.get(index + i).getRight())) {
+ x[temp_count] =
+ list.get(index + i).getRight()
+ + smin * (list.get(index).getLeft() - list.get(index + i).getLeft());
+ x[temp_count + count] =
+ list.get(index + i).getRight()
+ + smax * (list.get(index).getLeft() - list.get(index + i).getLeft());
+ temp_count++;
+ }
+ }
+ Arrays.sort(x);
+ return x[count];
+ }
+
+ private double getRepairedValue(ArrayList<Pair<Long, Double>> list, int index, double mid) {
+ double xmin =
+ list.get(index - 1).getRight()
+ + smin * (list.get(index).getLeft() - list.get(index - 1).getLeft());
+ double xmax =
+ list.get(index - 1).getRight()
+ + smax * (list.get(index).getLeft() - list.get(index - 1).getLeft());
+ double temp = mid;
+ temp = Math.min(xmax, temp);
+ temp = Math.max(xmin, temp);
+ return temp;
+ }
+
+ private void local(ArrayList<Pair<Long, Double>> list, int index) {
+ double mid = getMedian(list, index);
+ if (index == 0) {
+ list.set(index, Pair.of(list.get(index).getLeft(), mid));
+ } else {
+ double temp = getRepairedValue(list, index, mid);
+ list.set(index, Pair.of(list.get(index).getLeft(), temp));
+ }
+ }
+
+ private void setParameters() {
+ double[] speed = Util.speed(original, time);
+ Median median = new Median();
+ double mid = median.evaluate(speed);
+ double sigma = Util.mad(speed);
+ smax = mid + 3 * sigma;
+ smin = mid - 3 * sigma;
+ double[] interval = Util.variation(time);
+ w = 5 * median.evaluate(interval);
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/TimestampInterval.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/TimestampInterval.java
new file mode 100644
index 0000000000..ed1414b039
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/TimestampInterval.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair.util;
+
+import java.util.*;
+
+public class TimestampInterval {
+
+ protected int n;
+ protected long[] time;
+ protected double[] original;
+ protected long[] repaired;
+ protected long deltaT;
+ protected long start0;
+
+ public TimestampInterval(long[] time, double[] original) throws Exception {
+ // keep the time series
+ this.time = time;
+ this.original = original;
+ n = time.length;
+ repaired = new long[n];
+ }
+
+ // get standard interval
+ // -1 median -2 mode -3 cluster
+ public long getInterval(int mode) {
+ switch (mode) {
+ case -1:
+ this.deltaT = getIntervalByMedian();
+ break;
+ case -2:
+ this.deltaT = getIntervalByMode();
+ break;
+ case -3:
+ this.deltaT = getIntervalByCluster();
+ break;
+ default:
+ this.deltaT = mode;
+ }
+ return this.deltaT;
+ }
+
+ // median
+ private long getIntervalByMedian() {
+ ArrayList<Long> arrInterval = new ArrayList<>();
+ for (int i = 0; i < n - 2; i++) {
+ arrInterval.add(time[i + 1] - time[i]);
+ }
+ arrInterval.sort(Comparator.naturalOrder());
+ int m = n - 1;
+ if (m % 2 == 0) {
+ return (arrInterval.get(m / 2 - 1) + arrInterval.get(m / 2)) / 2;
+ }
+ return arrInterval.get(m / 2);
+ }
+
+ // mode
+ private long getIntervalByMode() {
+ repaired = time.clone();
+ // get a timestamp interval that appears most times
+ HashMap<Object, Integer> map = new LinkedHashMap<>();
+ int maxTimes = 0;
+ long maxTimesKey = 0;
+ for (int i = 0; i < n - 1; i++) {
+ map.put(time[i + 1] - time[i], map.getOrDefault(time[i + 1] - time[i], 0) + 1);
+ }
+ for (Map.Entry<Object, Integer> entry : map.entrySet()) {
+ Object key = entry.getKey();
+ Integer value = entry.getValue();
+ if (value > maxTimes) {
+ maxTimes = value;
+ maxTimesKey = (long) key;
+ }
+ }
+ return maxTimesKey;
+ }
+
+ // cluster
+ private long getIntervalByCluster() {
+ // get array of timestamp intervals
+ HashMap<Object, Integer> map = new LinkedHashMap<>();
+ long maxInterval = 0;
+ long minInterval = 9999999;
+ long[] intervals = new long[n];
+ for (int i = 0; i < n - 1; i++) {
+ intervals[i] = time[i + 1] - time[i];
+ if (intervals[i] > maxInterval) {
+ maxInterval = intervals[i];
+ }
+ if (intervals[i] < minInterval) {
+ minInterval = intervals[i];
+ }
+ }
+ int k = 3;
+ long[] means = new long[k];
+ for (int i = 0; i < k; i++) {
+ means[i] = minInterval + (i + 1) * (maxInterval - minInterval) / (k + 1);
+ }
+ long[][] distance = new long[n - 1][k];
+ int[] results = new int[n - 1];
+ for (int i = 0; i < n - 1; i++) {
+ results[i] = -1;
+ }
+ boolean changed = true;
+ int[] cnts = new int[k];
+ int maxClusterId = 0;
+ while (changed) {
+ changed = false;
+ for (int i = 0; i < n - 1; i++) {
+ long minDis = 99999999;
+ int minDisId = 0;
+ for (int j = 0; j < k; j++) {
+ distance[i][j] = Math.abs(intervals[i] - means[j]);
+ if (distance[i][j] < minDis) {
+ minDis = distance[i][j];
+ minDisId = j;
+ }
+ }
+ if (minDisId != results[i]) {
+ changed = true;
+ results[i] = minDisId;
+ }
+ }
+ int maxCluterCnt = 0;
+ for (int i = 0; i < k; i++) {
+ long sum = 0;
+ cnts[i] = 0;
+ for (int j = 0; j < n - 1; j++) {
+ if (results[j] == i) {
+ sum += intervals[j];
+ cnts[i] += 1;
+ }
+ }
+ if (cnts[i] != 0) {
+ means[i] = sum / cnts[i];
+ if (cnts[i] > maxCluterCnt) {
+ maxClusterId = i;
+ maxCluterCnt = cnts[i];
+ }
+ }
+ }
+ }
+ return means[maxClusterId];
+ }
+
+ // get standard starting point
+ public long getStart0(int mode) {
+ switch (mode) {
+ case 1:
+ this.start0 = getStart0ByLinear();
+ break;
+ case 2:
+ this.start0 = getStart0ByMode();
+ break;
+ }
+ return this.start0;
+ }
+
+ private long getStart0ByLinear() {
+ long sum_ = 0;
+ for (int i = 0; i < n; i++) {
+ sum_ += time[i];
+ sum_ -= this.deltaT * i;
+ }
+ return sum_ / n;
+ }
+
+ private long getStart0ByMode() {
+ long[] modn = new long[n];
+ // get mode that appears most times
+ HashMap<Object, Integer> mapn = new LinkedHashMap<>();
+ for (int i = 0; i < n; i++) {
+ modn[i] = time[i] % this.deltaT;
+ mapn.put(modn[i], mapn.getOrDefault(modn[i], 0) + 1);
+ }
+ int maxTimesn = 0;
+ long maxTimesMode = 0;
+ for (Map.Entry<Object, Integer> entry : mapn.entrySet()) {
+ Object key = entry.getKey();
+ Integer value = entry.getValue();
+ if (value > maxTimesn) {
+ maxTimesn = value;
+ maxTimesMode = (long) key;
+ }
+ }
+ long st = 0;
+ for (int i = 0; i < n; i++) {
+ if (modn[i] == maxTimesMode) {
+ st = time[i];
+ while (st > time[0]) {
+ st -= deltaT;
+ }
+ }
+ }
+ return st;
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/TimestampRepair.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/TimestampRepair.java
new file mode 100644
index 0000000000..54e7769401
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/TimestampRepair.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.access.RowIterator;
+import org.apache.iotdb.library.util.Util;
+
+import java.util.*;
+
+public class TimestampRepair {
+
+ protected int n;
+ protected long[] time;
+ protected double[] original;
+ protected long[] repaired;
+ protected double[] repairedValue;
+ protected long deltaT;
+ protected long start0;
+
+ public TimestampRepair(RowIterator dataIterator, int intervalMode, int startPointMode)
+ throws Exception {
+ ArrayList<Long> timeList = new ArrayList<>();
+ ArrayList<Double> originList = new ArrayList<>();
+ while (dataIterator.hasNextRow()) {
+ Row row = dataIterator.next();
+ double v = Util.getValueAsDouble(row);
+ timeList.add(row.getTime());
+ if (!Double.isFinite(v)) {
+ originList.add(Double.NaN);
+ } else {
+ originList.add(v);
+ }
+ }
+ time = Util.toLongArray(timeList);
+ original = Util.toDoubleArray(originList);
+ n = time.length;
+ TimestampInterval trParam = new TimestampInterval(time, original);
+ this.deltaT = trParam.getInterval(intervalMode);
+ this.start0 = trParam.getStart0(startPointMode);
+ }
+
+ private void noRepair() {
+ for (int i = 0; i < time.length; i++) {
+ repaired[i] = time[i];
+ repairedValue[i] = original[i];
+ }
+ }
+
+ public void dpRepair() {
+ if (time.length <= 2) {
+ noRepair();
+ return;
+ }
+ int n_ = (int) Math.ceil((time[n - 1] - start0) / deltaT + 1);
+ repaired = new long[n_];
+ repairedValue = new double[n_];
+ int m_ = this.n;
+ long[][] f = new long[n_ + 1][m_ + 1];
+ int[][] steps = new int[n_ + 1][m_ + 1];
+ // dynamic programming
+ int addCostRatio = 100000;
+ for (int i = 0; i < n_ + 1; i++) {
+ f[i][0] = (long) addCostRatio * i;
+ steps[i][0] = 1;
+ }
+ for (int i = 0; i < m_ + 1; i++) {
+ f[0][i] = (long) addCostRatio * i;
+ steps[0][i] = 2;
+ }
+
+ for (int i = 1; i < n_ + 1; i++) {
+ for (int j = 1; j < m_ + 1; j++) {
+
+ if (time[j - 1] == start0 + (i - 1) * deltaT) {
+ // if timestamps are equal, then temporary minimum operation time equals to matched
+ // operations before these points
+ f[i][j] = f[i - 1][j - 1];
+ steps[i][j] = 0;
+ } else {
+ // addition or deletion
+ if (f[i - 1][j] < f[i][j - 1]) {
+ f[i][j] = f[i - 1][j] + addCostRatio * 1;
+ steps[i][j] = 1;
+ } else {
+ f[i][j] = f[i][j - 1] + addCostRatio * 1;
+ steps[i][j] = 2;
+ }
+ // replacement
+ long modifyResult = f[i - 1][j - 1] + Math.abs(time[j - 1] - start0 - (i - 1) * deltaT);
+ if (modifyResult < f[i][j]) {
+ f[i][j] = modifyResult;
+ steps[i][j] = 0;
+ }
+ }
+ }
+ }
+
+ int i = n_;
+ int j = m_;
+ double unionSet = 0;
+ double joinSet = 0;
+ while (i >= 1 && j >= 1) {
+ long ps = start0 + (i - 1) * deltaT;
+ if (steps[i][j] == 0) {
+ repaired[i - 1] = ps;
+ repairedValue[i - 1] = original[j - 1];
+ System.out.println(time[j - 1] + "," + ps + "," + original[j - 1]);
+ unionSet += 1;
+ joinSet += 1;
+ i--;
+ j--;
+ } else if (steps[i][j] == 1) {
+ // add points
+ repaired[i - 1] = ps;
+ repairedValue[i - 1] = Double.NaN;
+ unionSet += 1;
+ System.out.println("add, " + ps + "," + original[j - 1]);
+ i--;
+ } else {
+ // delete points
+ unionSet += 1;
+ System.out.println(time[j - 1] + ",delete" + "," + original[j - 1]);
+ j--;
+ }
+ }
+ System.out.println(joinSet / unionSet);
+ System.out.println(f[n_][m_] / n_);
+ }
+
+ public double[] getRepairedValue() {
+ return repairedValue;
+ }
+
+ public long[] getRepaired() {
+ return repaired;
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/ValueFill.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/ValueFill.java
new file mode 100644
index 0000000000..b3bdb33eec
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/ValueFill.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.access.RowIterator;
+import org.apache.iotdb.library.util.Util;
+
+import java.util.ArrayList;
+
+public abstract class ValueFill {
+ protected int n;
+ protected long[] time;
+ protected double[] original;
+ protected double[] repaired;
+ protected double mean = 0;
+ protected double var = 0;
+ protected int not_nan_number = 0;
+
+ public ValueFill(RowIterator dataIterator) throws Exception {
+ ArrayList<Long> timeList = new ArrayList<>();
+ ArrayList<Double> originList = new ArrayList<>();
+ while (dataIterator.hasNextRow()) {
+ Row row = dataIterator.next();
+ Double v = Util.getValueAsDouble(row);
+ timeList.add(row.getTime());
+ if (!Double.isFinite(v)) {
+ originList.add(Double.NaN);
+ } else {
+ originList.add(v);
+ }
+ }
+ time = Util.toLongArray(timeList);
+ original = Util.toDoubleArray(originList);
+ n = time.length;
+ repaired = new double[n];
+ }
+
+ public abstract void fill();
+
+ public long[] getTime() {
+ return time;
+ }
+
+ public double[] getFilled() {
+ return repaired;
+ };
+
+ public void calMeanAndVar() {
+ for (double v : original) {
+ if (!Double.isNaN(v)) {
+ mean += v;
+ not_nan_number += 1;
+ }
+ }
+ assert not_nan_number > 0 : "All values are NaN";
+ mean /= not_nan_number;
+ for (double v : original) {
+ if (!Double.isNaN(v)) {
+ var += (v - mean) * (v - mean);
+ }
+ }
+ var /= not_nan_number;
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/ValueRepair.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/ValueRepair.java
new file mode 100644
index 0000000000..5b47ac776c
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/ValueRepair.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.access.RowIterator;
+import org.apache.iotdb.library.util.Util;
+
+import java.io.File;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Scanner;
+
+public abstract class ValueRepair {
+
+ protected int n;
+ protected long[] time;
+ protected double[] original;
+ protected double[] repaired;
+
+ public ValueRepair(RowIterator dataIterator) throws Exception {
+ ArrayList<Long> timeList = new ArrayList<>();
+ ArrayList<Double> originList = new ArrayList<>();
+ while (dataIterator.hasNextRow()) {
+ Row row = dataIterator.next();
+ Double v = Util.getValueAsDouble(row);
+ timeList.add(row.getTime());
+ if (!Double.isFinite(v)) {
+ originList.add(Double.NaN);
+ } else {
+ originList.add(v);
+ }
+ }
+ time = Util.toLongArray(timeList);
+ original = Util.toDoubleArray(originList);
+ n = time.length;
+ repaired = new double[n];
+ processNaN();
+ }
+
+ public ValueRepair(String filename) throws Exception {
+ Scanner sc = new Scanner(new File(filename));
+ SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ sc.useDelimiter("\\s*(,|\\r|\\n)\\s*");
+ sc.nextLine();
+ ArrayList<Long> timeList = new ArrayList<>();
+ ArrayList<Double> originList = new ArrayList<>();
+ while (sc.hasNext()) {
+ timeList.add(format.parse(sc.next()).getTime());
+ Double v = sc.nextDouble();
+ if (!Double.isFinite(v)) {
+ originList.add(Double.NaN);
+ } else {
+ originList.add(v);
+ }
+ }
+ time = Util.toLongArray(timeList);
+ original = Util.toDoubleArray(originList);
+ n = time.length;
+ repaired = new double[n];
+ processNaN();
+ }
+
+ public abstract void repair();
+
+ private void processNaN() throws Exception {
+ int index1 = 0, index2;
+ while (index1 < n && Double.isNaN(original[index1])) {
+ index1++;
+ }
+ index2 = index1 + 1;
+ while (index2 < n && Double.isNaN(original[index2])) {
+ index2++;
+ }
+ if (index2 >= n) {
+ throw new Exception("At least two non-NaN values are needed");
+ }
+ for (int i = 0; i < index2; i++) {
+ original[i] =
+ original[index1]
+ + (original[index2] - original[index1])
+ * (time[i] - time[index1])
+ / (time[index2] - time[index1]);
+ }
+ for (int i = index2 + 1; i < n; i++) {
+ if (!Double.isNaN(original[i])) {
+ index1 = index2;
+ index2 = i;
+ for (int j = index1 + 1; j < index2; j++) {
+ original[j] =
+ original[index1]
+ + (original[index2] - original[index1])
+ * (time[j] - time[index1])
+ / (time[index2] - time[index1]);
+ }
+ }
+ }
+ for (int i = index2 + 1; i < n; i++) {
+ original[i] =
+ original[index1]
+ + (original[index2] - original[index1])
+ * (time[i] - time[index1])
+ / (time[index2] - time[index1]);
+ }
+ }
+
+ public long[] getTime() {
+ return time;
+ }
+
+ public double[] getRepaired() {
+ return repaired;
+ }
+}
diff --git a/library-udf/src/test/java/org/apache/iotdb/library/drepair/DRepairTests.java b/library-udf/src/test/java/org/apache/iotdb/library/drepair/DRepairTests.java
new file mode 100644
index 0000000000..4bf48b4b37
--- /dev/null
+++ b/library-udf/src/test/java/org/apache/iotdb/library/drepair/DRepairTests.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.library.drepair;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.integration.env.ConfigFactory;
+import org.apache.iotdb.integration.env.EnvFactory;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.junit.Assert.fail;
+
+public class DRepairTests {
+ protected static final int ITERATION_TIMES = 100_000;
+ protected static final int DELTA_T = 100;
+
+ private static final float oldUdfCollectorMemoryBudgetInMB =
+ IoTDBDescriptor.getInstance().getConfig().getUdfCollectorMemoryBudgetInMB();
+ private static final float oldUdfTransformerMemoryBudgetInMB =
+ IoTDBDescriptor.getInstance().getConfig().getUdfTransformerMemoryBudgetInMB();
+ private static final float oldUdfReaderMemoryBudgetInMB =
+ IoTDBDescriptor.getInstance().getConfig().getUdfReaderMemoryBudgetInMB();
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ ConfigFactory.getConfig()
+ .setUdfCollectorMemoryBudgetInMB(5)
+ .setUdfTransformerMemoryBudgetInMB(5)
+ .setUdfReaderMemoryBudgetInMB(5);
+ EnvFactory.getEnv().initBeforeClass();
+ createTimeSeries();
+ generateData();
+ registerUDF();
+ }
+
+ private static void createTimeSeries() throws MetadataException {
+ IoTDB.metaManager.setStorageGroup(new PartialPath("root.vehicle"));
+ // test series for TimeStampRepair
+ IoTDB.metaManager.createTimeseries(
+ new PartialPath("root.vehicle.d1.s1"),
+ TSDataType.INT32,
+ TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED,
+ null);
+ // test series for ValueFill
+ IoTDB.metaManager.createTimeseries(
+ new PartialPath("root.vehicle.d2.s1"),
+ TSDataType.INT64,
+ TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED,
+ null);
+ // test series for ValueRepair
+ IoTDB.metaManager.createTimeseries(
+ new PartialPath("root.vehicle.d3.s1"),
+ TSDataType.FLOAT,
+ TSEncoding.PLAIN,
+ CompressionType.UNCOMPRESSED,
+ null);
+ }
+
+ private static void generateData() {
+ double x = -100d, y = 100d; // borders of random value
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ for (int i = 1; i <= ITERATION_TIMES; ++i) {
+ if (Math.random() < 0.99) {
+ statement.execute(
+ String.format(
+ "insert into root.vehicle.d1(timestamp,s1) values(%d,%f)",
+ (long) i * DELTA_T, Math.floor(x + Math.random() * y % (y - x + 1))));
+ } else {
+ statement.execute(
+ String.format(
+ "insert into root.vehicle.d1(timestamp,s1) values(%d,%f)",
+ (long) i * DELTA_T + (long) Math.floor((Math.random() - 0.5) * DELTA_T),
+ Math.floor(x + Math.random() * y % (y - x + 1))));
+ }
+ }
+ for (int i = 1; i <= ITERATION_TIMES; ++i) {
+ if (Math.random() < 0.97) {
+ statement.execute(
+ String.format(
+ "insert into root.vehicle.d2(timestamp,s1) values(%d,%f)",
+ (long) i * DELTA_T, Math.floor(x + Math.random() * y % (y - x + 1))));
+ } else {
+ statement.execute(
+ String.format(
+ "insert into root.vehicle.d2(timestamp,s1) values(%d,%f)",
+ (long) i * DELTA_T, Double.NaN));
+ }
+ }
+ for (int i = 1; i <= ITERATION_TIMES; ++i) {
+ statement.execute(
+ String.format(
+ "insert into root.vehicle.d3(timestamp,s1) values(%d,%f)",
+ (long) i * DELTA_T,
+ i / (double) ITERATION_TIMES * (y - x) + (Math.random() - 0.5) * (y - x)));
+ }
+ } catch (SQLException throwable) {
+ fail(throwable.getMessage());
+ }
+ }
+
+ private static void registerUDF() {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ "create function timestamprepair as 'org.apache.iotdb.library.dquality.UDTFTimestampRepair'");
+ statement.execute(
+ "create function valuefill as 'org.apache.iotdb.library.dquality.UDTFValueFill'");
+ statement.execute(
+ "create function valuerepair as 'org.apache.iotdb.library.dquality.UDTFValueRepair'");
+ } catch (SQLException throwable) {
+ fail(throwable.getMessage());
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanAfterClass();
+ ConfigFactory.getConfig()
+ .setUdfCollectorMemoryBudgetInMB(oldUdfCollectorMemoryBudgetInMB)
+ .setUdfTransformerMemoryBudgetInMB(oldUdfTransformerMemoryBudgetInMB)
+ .setUdfReaderMemoryBudgetInMB(oldUdfReaderMemoryBudgetInMB);
+ }
+
+ @Test
+ public void testTimestampRepair1() {
+ String sqlStr =
+ String.format("select timestamprepair(d1.s1,'interval'='%d') from root.vehicle", DELTA_T);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ ResultSet resultSet = statement.executeQuery(sqlStr);
+ } catch (SQLException throwable) {
+ fail(throwable.getMessage());
+ }
+ }
+
+ @Test
+ public void testTimestampRepair2() {
+ String sqlStr = "select timestamprepair(d1.s1,'method'='median') from root.vehicle";
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ ResultSet resultSet = statement.executeQuery(sqlStr);
+ } catch (SQLException throwable) {
+ fail(throwable.getMessage());
+ }
+ }
+
+ @Test
+ public void testTimestampRepair3() {
+ String sqlStr = "select timestamprepair(d1.s1,'method'='mode') from root.vehicle";
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ ResultSet resultSet = statement.executeQuery(sqlStr);
+ } catch (SQLException throwable) {
+ fail(throwable.getMessage());
+ }
+ }
+
+ @Test
+ public void testTimestampRepair4() {
+ String sqlStr = "select timestamprepair(d1.s1,'method'='cluster') from root.vehicle";
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ ResultSet resultSet = statement.executeQuery(sqlStr);
+ } catch (SQLException throwable) {
+ fail(throwable.getMessage());
+ }
+ }
+
+ @Test
+ public void testValueFill1() {
+ String sqlStr = "select valuefill(d2.s1,'method'='previous') from root.vehicle";
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ ResultSet resultSet = statement.executeQuery(sqlStr);
+ } catch (SQLException throwable) {
+ fail(throwable.getMessage());
+ }
+ }
+
+ @Test
+ public void testValueFill2() {
+ String sqlStr = "select valuefill(d2.s1,'method'='linear') from root.vehicle";
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ ResultSet resultSet = statement.executeQuery(sqlStr);
+ } catch (SQLException throwable) {
+ fail(throwable.getMessage());
+ }
+ }
+
+ @Test
+ public void testValueFill3() {
+ String sqlStr = "select valuefill(d2.s1,'method'='likelihood') from root.vehicle";
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ ResultSet resultSet = statement.executeQuery(sqlStr);
+ } catch (SQLException throwable) {
+ fail(throwable.getMessage());
+ }
+ }
+
+ @Test
+ public void testValueFill4() {
+ String sqlStr = "select valuefill(d2.s1,'method'='ar') from root.vehicle";
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ ResultSet resultSet = statement.executeQuery(sqlStr);
+ } catch (SQLException throwable) {
+ fail(throwable.getMessage());
+ }
+ }
+
+ @Test
+ public void testValueFill5() {
+ String sqlStr = "select valuefill(d2.s1,'method'='ma') from root.vehicle";
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ ResultSet resultSet = statement.executeQuery(sqlStr);
+ } catch (SQLException throwable) {
+ fail(throwable.getMessage());
+ }
+ }
+
+ @Test
+ public void testValueFill6() {
+ String sqlStr = "select valuefill(d2.s1,'method'='screen') from root.vehicle";
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ ResultSet resultSet = statement.executeQuery(sqlStr);
+ } catch (SQLException throwable) {
+ fail(throwable.getMessage());
+ }
+ }
+
+ @Test
+ public void testValueRepair1() {
+ String sqlStr = "select valuerepair(d3.s1,'method'='screen') from root.vehicle";
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ ResultSet resultSet = statement.executeQuery(sqlStr);
+ } catch (SQLException throwable) {
+ fail(throwable.getMessage());
+ }
+ }
+
+ @Test
+ public void testValueRepair2() {
+ String sqlStr = "select valuerepair(d3.s1,'method'='lsgreedy') from root.vehicle";
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ ResultSet resultSet = statement.executeQuery(sqlStr);
+ } catch (SQLException throwable) {
+ fail(throwable.getMessage());
+ }
+ }
+}