You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/11/20 07:30:46 UTC
[iotdb] branch master updated: [IOTDB-4967] Fix SlidingTimeWindow stops when encounter an empty window (#8033)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 53bc567b3e [IOTDB-4967] Fix SlidingTimeWindow stops when encounter an empty window (#8033)
53bc567b3e is described below
commit 53bc567b3e799a8b85d75982eeac86896230c148
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Sun Nov 20 15:30:40 2022 +0800
[IOTDB-4967] Fix SlidingTimeWindow stops when encounter an empty window (#8033)
---
.../SlidingTimeWindowAccessStrategyExample.java | 105 +++++++++++++++++++++
.../iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java | 31 ++++++
...izableRowRecordListBackedMultiColumnWindow.java | 12 +++
...SerializableTVListBackedSingleColumnWindow.java | 12 +++
.../dag/adapter/EmptyRowIterator.java | 40 ++++++++
.../MultiInputColumnIntermediateLayer.java | 16 +++-
...InputColumnMultiReferenceIntermediateLayer.java | 18 +++-
...nputColumnSingleReferenceIntermediateLayer.java | 18 +++-
.../org/apache/iotdb/udf/api/access/RowWindow.java | 3 +-
.../iotdb/udf/api/collector/PointCollector.java | 4 +-
10 files changed, 251 insertions(+), 8 deletions(-)
diff --git a/example/udf/src/main/java/org/apache/iotdb/udf/SlidingTimeWindowAccessStrategyExample.java b/example/udf/src/main/java/org/apache/iotdb/udf/SlidingTimeWindowAccessStrategyExample.java
new file mode 100644
index 0000000000..a9115b3c6c
--- /dev/null
+++ b/example/udf/src/main/java/org/apache/iotdb/udf/SlidingTimeWindowAccessStrategyExample.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.udf;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.RowWindow;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SlidingTimeWindowAccessStrategyExample implements UDTF {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(SlidingTimeWindowAccessStrategyExample.class);
+
+ public SlidingTimeWindowAccessStrategyExample() {}
+
+ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+ throws Exception {
+ LOGGER.info("###### TestSlidingTimeWindow # beforeStart #######");
+ LOGGER.info("attributes: {}", parameters.getAttributes().toString());
+ if (parameters.hasAttribute("start") && parameters.hasAttribute("end")) {
+ if (parameters.hasAttribute("step")) {
+ configurations
+ .setOutputDataType(Type.INT64)
+ .setAccessStrategy(
+ new SlidingTimeWindowAccessStrategy(
+ (long) parameters.getInt("interval"),
+ (long) parameters.getInt("step"),
+ parameters.getLong("start"),
+ parameters.getLong("end")));
+ } else {
+ configurations
+ .setOutputDataType(Type.INT64)
+ .setAccessStrategy(
+ new SlidingTimeWindowAccessStrategy(
+ (long) parameters.getInt("interval"),
+ (long) parameters.getInt("interval"),
+ parameters.getLong("start"),
+ parameters.getLong("end")));
+ }
+ } else {
+ if (parameters.hasAttribute("start") || parameters.hasAttribute("end")) {
+ throw new RuntimeException("start and end must be both existed. ");
+ }
+
+ if (parameters.hasAttribute("step")) {
+ configurations
+ .setOutputDataType(Type.INT64)
+ .setAccessStrategy(
+ new SlidingTimeWindowAccessStrategy(
+ (long) parameters.getInt("interval"), (long) parameters.getInt("step")));
+ } else {
+ configurations
+ .setOutputDataType(Type.INT64)
+ .setAccessStrategy(
+ new SlidingTimeWindowAccessStrategy((long) parameters.getInt("interval")));
+ }
+ }
+ }
+
+ public void transform(RowWindow rowWindow, PointCollector collector) throws Exception {
+ LOGGER.info("######### TestSlidingTimeWindow # [{}] ########", rowWindow.windowSize());
+ long result = 0L;
+
+ for (int i = 0; i < rowWindow.windowSize(); ++i) {
+ if (!rowWindow.getRow(i).isNull(0)) {
+ result += rowWindow.getRow(i).getLong(0);
+ }
+ }
+
+ collector.putLong(rowWindow.windowStartTime(), result);
+ }
+
+ public void beforeDestroy() {
+ LOGGER.info("###### TestSlidingTimeWindow # beforeDestroy #######");
+ }
+
+ public void validate(UDFParameterValidator validator) throws Exception {
+ validator.validateRequiredAttribute("interval");
+ validator.validateInputSeriesDataType(0, Type.INT64);
+ }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java
index b6029e9970..6d14e5d47a 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java
@@ -39,6 +39,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@@ -87,6 +88,7 @@ public class IoTDBUDFWindowQueryIT {
statement.execute("CREATE DATABASE root.vehicle");
statement.execute("CREATE TIMESERIES root.vehicle.d1.s1 with datatype=INT32,encoding=PLAIN");
statement.execute("CREATE TIMESERIES root.vehicle.d1.s2 with datatype=INT32,encoding=PLAIN");
+ statement.execute("CREATE TIMESERIES root.vehicle1.d1.s1 with datatype=INT32,encoding=PLAIN");
} catch (SQLException throwable) {
fail(throwable.getMessage());
}
@@ -100,6 +102,8 @@ public class IoTDBUDFWindowQueryIT {
(String.format(
"insert into root.vehicle.d1(timestamp,s1,s2) values(%d,%d,%d)", i, i, i)));
}
+ // test empty window, details could be found at https://github.com/apache/iotdb/issues/7738
+ statement.execute("insert into root.vehicle1.d1(timestamp, s1) values (1,2),(2,3),(7,8)");
} catch (SQLException throwable) {
fail(throwable.getMessage());
}
@@ -431,6 +435,33 @@ public class IoTDBUDFWindowQueryIT {
}
}
+ @Test
+ public void testSlidingTimeWindowWithEmptyWindow() {
+ String sql =
+ String.format(
+ "select time_window_tester(s1, '%s'='%s') from root.vehicle1.d1",
+ UDFTestConstant.TIME_INTERVAL_KEY, 3);
+ try (Connection conn = EnvFactory.getEnv().getConnection();
+ Statement statement = conn.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql)) {
+ assertEquals(2, resultSet.getMetaData().getColumnCount());
+
+ int count = 0;
+ if (resultSet.next()) {
+ assertEquals(5, (int) (Double.parseDouble(resultSet.getString(2))));
+ ++count;
+ }
+ if (resultSet.next()) {
+ assertEquals(8, (int) (Double.parseDouble(resultSet.getString(2))));
+ ++count;
+ }
+ assertFalse(resultSet.next());
+ assertEquals(2, count);
+ } catch (SQLException throwable) {
+ fail();
+ }
+ }
+
@Test
public void testSlidingTimeWindowWithTimeIntervalOnly1() {
testSlidingTimeWindowWithTimeIntervalOnly(1);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/ElasticSerializableRowRecordListBackedMultiColumnWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/ElasticSerializableRowRecordListBackedMultiColumnWindow.java
index 9ba6e5c156..863de28e66 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/ElasticSerializableRowRecordListBackedMultiColumnWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/ElasticSerializableRowRecordListBackedMultiColumnWindow.java
@@ -63,6 +63,9 @@ public class ElasticSerializableRowRecordListBackedMultiColumnWindow implements
@Override
public Row getRow(int rowIndex) throws IOException {
+ if (this.size == 0) {
+ throw new IndexOutOfBoundsException("Size is 0");
+ }
return row.setRowRecord(rowRecordList.getRowRecord(beginIndex + rowIndex));
}
@@ -73,6 +76,9 @@ public class ElasticSerializableRowRecordListBackedMultiColumnWindow implements
@Override
public RowIterator getRowIterator() {
+ if (this.size == 0) {
+ return new EmptyRowIterator();
+ }
if (rowIterator == null) {
rowIterator =
new ElasticSerializableRowRecordListBackedMultiColumnWindowIterator(
@@ -93,6 +99,12 @@ public class ElasticSerializableRowRecordListBackedMultiColumnWindow implements
return endTime;
}
+ public void setEmptyWindow(long startTime, long endTime) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.size = 0;
+ }
+
public void seek(int beginIndex, int endIndex, long startTime, long endTime) {
this.beginIndex = beginIndex;
this.endIndex = endIndex;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/ElasticSerializableTVListBackedSingleColumnWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/ElasticSerializableTVListBackedSingleColumnWindow.java
index 64e360d3de..37f22901f7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/ElasticSerializableTVListBackedSingleColumnWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/ElasticSerializableTVListBackedSingleColumnWindow.java
@@ -56,6 +56,9 @@ public class ElasticSerializableTVListBackedSingleColumnWindow implements RowWin
@Override
public Row getRow(int rowIndex) {
+ if (this.size == 0) {
+ throw new IndexOutOfBoundsException("Size is 0");
+ }
return row.seek(beginIndex + rowIndex);
}
@@ -66,6 +69,9 @@ public class ElasticSerializableTVListBackedSingleColumnWindow implements RowWin
@Override
public RowIterator getRowIterator() {
+ if (this.size == 0) {
+ return new EmptyRowIterator();
+ }
if (rowIterator == null) {
rowIterator =
new ElasticSerializableTVListBackedSingleColumnWindowIterator(
@@ -86,6 +92,12 @@ public class ElasticSerializableTVListBackedSingleColumnWindow implements RowWin
return endTime;
}
+ public void setEmptyWindow(long startTime, long endTime) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.size = 0;
+ }
+
public void seek(int beginIndex, int endIndex, long startTime, long endTime) {
this.beginIndex = beginIndex;
this.endIndex = endIndex;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/EmptyRowIterator.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/EmptyRowIterator.java
new file mode 100644
index 0000000000..ebb714e332
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/EmptyRowIterator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.transformation.dag.adapter;
+
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.access.RowIterator;
+
+import java.io.IOException;
+
+public class EmptyRowIterator implements RowIterator {
+ @Override
+ public boolean hasNextRow() {
+ return false;
+ }
+
+ @Override
+ public Row next() throws IOException {
+ throw new UnsupportedOperationException("Can not call next on EmptyRowIterator");
+ }
+
+ @Override
+ public void reset() {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
index 4ac11c77ec..b2c9c731f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
@@ -570,13 +570,18 @@ public class MultiInputColumnIntermediateLayer extends IntermediateLayer
break;
}
}
+ if ((nextIndexEnd == nextIndexBegin)
+ && nextWindowTimeEnd < rowRecordList.getTime(rowRecordList.size() - 1)) {
+ window.setEmptyWindow(nextWindowTimeBegin, nextWindowTimeEnd);
+ return YieldableState.YIELDABLE;
+ }
window.seek(
nextIndexBegin,
nextIndexEnd,
nextWindowTimeBegin,
nextWindowTimeBegin + timeInterval - 1);
- hasCached = nextIndexBegin != nextIndexEnd;
+ hasCached = !(nextIndexBegin == nextIndexEnd && nextIndexEnd == rowRecordList.size());
return hasCached ? YieldableState.YIELDABLE : YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
}
@@ -626,13 +631,20 @@ public class MultiInputColumnIntermediateLayer extends IntermediateLayer
break;
}
}
+
+ if ((nextIndexEnd == nextIndexBegin)
+ && nextWindowTimeEnd < rowRecordList.getTime(rowRecordList.size() - 1)) {
+ window.setEmptyWindow(nextWindowTimeBegin, nextWindowTimeEnd);
+ return true;
+ }
+
window.seek(
nextIndexBegin,
nextIndexEnd,
nextWindowTimeBegin,
nextWindowTimeBegin + timeInterval - 1);
- hasCached = nextIndexBegin != nextIndexEnd;
+ hasCached = !(nextIndexBegin == nextIndexEnd && nextIndexEnd == rowRecordList.size());
return hasCached;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
index 5b6216eea0..165a079120 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
@@ -459,13 +459,20 @@ public class SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
break;
}
}
+
+ if ((nextIndexEnd == nextIndexBegin)
+ && nextWindowTimeEnd < tvList.getTime(tvList.size() - 1)) {
+ window.setEmptyWindow(nextWindowTimeBegin, nextWindowTimeEnd);
+ return YieldableState.YIELDABLE;
+ }
+
window.seek(
nextIndexBegin,
nextIndexEnd,
nextWindowTimeBegin,
nextWindowTimeBegin + timeInterval - 1);
- hasCached = nextIndexBegin != nextIndexEnd;
+ hasCached = !(nextIndexBegin == nextIndexEnd && nextIndexEnd == tvList.size());
return hasCached ? YieldableState.YIELDABLE : YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
}
@@ -516,13 +523,20 @@ public class SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
break;
}
}
+
+ if ((nextIndexEnd == nextIndexBegin)
+ && nextWindowTimeEnd < tvList.getTime(tvList.size() - 1)) {
+ window.setEmptyWindow(nextWindowTimeBegin, nextWindowTimeEnd);
+ return true;
+ }
+
window.seek(
nextIndexBegin,
nextIndexEnd,
nextWindowTimeBegin,
nextWindowTimeBegin + timeInterval - 1);
- hasCached = nextIndexBegin != nextIndexEnd;
+ hasCached = !(nextIndexBegin == nextIndexEnd && nextIndexEnd == tvList.size());
return hasCached;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
index 58b5bbb8ac..d6bd270c41 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
@@ -332,13 +332,20 @@ public class SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
break;
}
}
+
+ if ((nextIndexEnd == nextIndexBegin)
+ && nextWindowTimeEnd < tvList.getTime(tvList.size() - 1)) {
+ window.setEmptyWindow(nextWindowTimeBegin, nextWindowTimeEnd);
+ return YieldableState.YIELDABLE;
+ }
+
window.seek(
nextIndexBegin,
nextIndexEnd,
nextWindowTimeBegin,
nextWindowTimeBegin + timeInterval - 1);
- hasCached = nextIndexBegin != nextIndexEnd;
+ hasCached = !(nextIndexBegin == nextIndexEnd && nextIndexEnd == tvList.size());
return hasCached ? YieldableState.YIELDABLE : YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
}
@@ -388,13 +395,20 @@ public class SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
break;
}
}
+
+ if ((nextIndexEnd == nextIndexBegin)
+ && nextWindowTimeEnd < tvList.getTime(tvList.size() - 1)) {
+ window.setEmptyWindow(nextWindowTimeBegin, nextWindowTimeEnd);
+ return true;
+ }
+
window.seek(
nextIndexBegin,
nextIndexEnd,
nextWindowTimeBegin,
nextWindowTimeBegin + timeInterval - 1);
- hasCached = nextIndexBegin != nextIndexEnd;
+ hasCached = !(nextIndexBegin == nextIndexEnd && nextIndexEnd == tvList.size());
return hasCached;
}
diff --git a/udf-api/src/main/java/org/apache/iotdb/udf/api/access/RowWindow.java b/udf-api/src/main/java/org/apache/iotdb/udf/api/access/RowWindow.java
index 8d94a4cb63..782c98f509 100644
--- a/udf-api/src/main/java/org/apache/iotdb/udf/api/access/RowWindow.java
+++ b/udf-api/src/main/java/org/apache/iotdb/udf/api/access/RowWindow.java
@@ -42,7 +42,8 @@ public interface RowWindow {
* will not generate a new Row instance.
*
* @param rowIndex index of the row to return
- * @return the row at the specified position in this window
+ * @return the row at the specified position in this window, throw IndexOutOfBoundException if
+ * call this method on an empty RowWindow.
* @throws IOException if any I/O errors occur
*/
Row getRow(int rowIndex) throws IOException;
diff --git a/udf-api/src/main/java/org/apache/iotdb/udf/api/collector/PointCollector.java b/udf-api/src/main/java/org/apache/iotdb/udf/api/collector/PointCollector.java
index 40270d1f0d..b5ed276d31 100644
--- a/udf-api/src/main/java/org/apache/iotdb/udf/api/collector/PointCollector.java
+++ b/udf-api/src/main/java/org/apache/iotdb/udf/api/collector/PointCollector.java
@@ -31,7 +31,9 @@ import java.io.IOException;
/**
* Used to collect time series data points generated by {@link UDTF#transform(Row, PointCollector)},
- * {@link UDTF#transform(RowWindow, PointCollector)} or {@link UDTF#terminate(PointCollector)}.
+ * Notice that one timestamp can not be put in the PointCollector more than once, or it may stop the
+ * calculation. {@link UDTF#transform(RowWindow, PointCollector)} or {@link
+ * UDTF#terminate(PointCollector)}.
*/
public interface PointCollector {