You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/11/22 02:45:43 UTC
[iotdb] branch rel/0.13 updated: [To rel/0.13][IOTDB-4967] SlidingTimeWindow stops when encounter an empty window
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 67869f3fff [To rel/0.13][IOTDB-4967] SlidingTimeWindow stops when encounter an empty window
67869f3fff is described below
commit 67869f3fff4d7187d1cfae8867349ba64e8dbe8b
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Tue Nov 22 10:45:36 2022 +0800
[To rel/0.13][IOTDB-4967] SlidingTimeWindow stops when encounter an empty window
---
.../db/integration/IoTDBUDFWindowQueryIT.java | 39 ++++++++++++++++++---
.../iotdb/db/query/udf/api/access/RowWindow.java | 3 +-
.../db/query/udf/api/collector/PointCollector.java | 4 ++-
...izableRowRecordListBackedMultiColumnWindow.java | 12 +++++++
...SerializableTVListBackedSingleColumnWindow.java | 12 +++++++
.../db/query/udf/core/access/EmptyRowIterator.java | 40 ++++++++++++++++++++++
.../layer/MultiInputColumnIntermediateLayer.java | 8 ++++-
...InputColumnMultiReferenceIntermediateLayer.java | 7 +++-
...nputColumnSingleReferenceIntermediateLayer.java | 8 ++++-
9 files changed, 124 insertions(+), 9 deletions(-)
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBUDFWindowQueryIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBUDFWindowQueryIT.java
index 808d319aaf..1037601c88 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBUDFWindowQueryIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBUDFWindowQueryIT.java
@@ -37,6 +37,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
@Category({LocalStandaloneTest.class, ClusterTest.class})
@@ -62,6 +63,7 @@ public class IoTDBUDFWindowQueryIT {
statement.execute("SET STORAGE GROUP TO root.vehicle");
statement.execute("CREATE TIMESERIES root.vehicle.d1.s1 with datatype=INT32,encoding=PLAIN");
statement.execute("CREATE TIMESERIES root.vehicle.d1.s2 with datatype=INT32,encoding=PLAIN");
+ statement.execute("CREATE TIMESERIES root.vehicle1.d1.s1 with datatype=INT32,encoding=PLAIN");
} catch (SQLException throwable) {
fail(throwable.getMessage());
}
@@ -75,6 +77,8 @@ public class IoTDBUDFWindowQueryIT {
(String.format(
"insert into root.vehicle.d1(timestamp,s1,s2) values(%d,%d,%d)", i, i, i)));
}
+ // test empty window, details could be found at https://github.com/apache/iotdb/issues/7738
+ statement.execute("insert into root.vehicle1.d1(timestamp, s1) values (1,2),(2,3),(7,8)");
} catch (SQLException throwable) {
fail(throwable.getMessage());
}
@@ -406,6 +410,33 @@ public class IoTDBUDFWindowQueryIT {
}
}
+ @Test
+ public void testSlidingTimeWindowWithEmptyWindow() {
+ String sql =
+ String.format(
+ "select time_window_tester(s1, '%s'='%s') from root.vehicle1.d1",
+ ExampleUDFConstant.TIME_INTERVAL_KEY, 3);
+ try (Connection conn = EnvFactory.getEnv().getConnection();
+ Statement statement = conn.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql)) {
+ assertEquals(2, resultSet.getMetaData().getColumnCount());
+
+ int count = 0;
+ if (resultSet.next()) {
+ assertEquals(5, (int) (Double.parseDouble(resultSet.getString(2))));
+ ++count;
+ }
+ if (resultSet.next()) {
+ assertEquals(8, (int) (Double.parseDouble(resultSet.getString(2))));
+ ++count;
+ }
+ assertFalse(resultSet.next());
+ assertEquals(2, count);
+ } catch (SQLException throwable) {
+ fail();
+ }
+ }
+
@Test
public void testSlidingTimeWindowWithTimeIntervalOnly1() {
testSlidingTimeWindowWithTimeIntervalOnly(1);
@@ -705,7 +736,7 @@ public class IoTDBUDFWindowQueryIT {
time += 1000;
value += 1000000;
}
- Assert.assertFalse(rs.next());
+ assertFalse(rs.next());
}
query =
@@ -720,7 +751,7 @@ public class IoTDBUDFWindowQueryIT {
time += 1000;
value += 1000000D;
}
- Assert.assertFalse(rs.next());
+ assertFalse(rs.next());
}
} catch (SQLException e) {
e.printStackTrace();
@@ -750,7 +781,7 @@ public class IoTDBUDFWindowQueryIT {
time += 1000;
value += 1000000;
}
- Assert.assertFalse(rs.next());
+ assertFalse(rs.next());
}
query =
@@ -771,7 +802,7 @@ public class IoTDBUDFWindowQueryIT {
time += 1000;
value += 1000000D;
}
- Assert.assertFalse(rs.next());
+ assertFalse(rs.next());
}
} catch (SQLException e) {
e.printStackTrace();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/api/access/RowWindow.java b/server/src/main/java/org/apache/iotdb/db/query/udf/api/access/RowWindow.java
index 54b6caf0ec..cdfd2ebb8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/api/access/RowWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/api/access/RowWindow.java
@@ -42,7 +42,8 @@ public interface RowWindow {
* will not generate a new Row instance.
*
* @param rowIndex index of the row to return
- * @return the row at the specified position in this window
+ * @return the row at the specified position in this window, throw IndexOutOfBoundException if
+ * call this method on an empty RowWindow.
* @throws IOException if any I/O errors occur
*/
Row getRow(int rowIndex) throws IOException;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/api/collector/PointCollector.java b/server/src/main/java/org/apache/iotdb/db/query/udf/api/collector/PointCollector.java
index d1f4b9fafb..beedb5ff39 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/api/collector/PointCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/api/collector/PointCollector.java
@@ -32,7 +32,9 @@ import java.io.IOException;
/**
* Used to collect time series data points generated by {@link UDTF#transform(Row, PointCollector)},
- * {@link UDTF#transform(RowWindow, PointCollector)} or {@link UDTF#terminate(PointCollector)}.
+ * {@link UDTF#transform(RowWindow, PointCollector)} or {@link UDTF#terminate(PointCollector)}. *
+ * Notice that one timestamp can not be put in the PointCollector more than once, or it may stop the
+ * * calculation.
*/
public interface PointCollector {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableRowRecordListBackedMultiColumnWindow.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableRowRecordListBackedMultiColumnWindow.java
index d384267516..f5e3ea4e6f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableRowRecordListBackedMultiColumnWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableRowRecordListBackedMultiColumnWindow.java
@@ -61,6 +61,9 @@ public class ElasticSerializableRowRecordListBackedMultiColumnWindow implements
@Override
public Row getRow(int rowIndex) throws IOException {
+ if (this.size == 0) {
+ throw new IndexOutOfBoundsException("Size is 0");
+ }
return row.setRowRecord(rowRecordList.getRowRecord(beginIndex + rowIndex));
}
@@ -71,6 +74,9 @@ public class ElasticSerializableRowRecordListBackedMultiColumnWindow implements
@Override
public RowIterator getRowIterator() {
+ if (this.size == 0) {
+ return new EmptyRowIterator();
+ }
if (rowIterator == null) {
rowIterator =
new ElasticSerializableRowRecordListBackedMultiColumnWindowIterator(
@@ -81,6 +87,12 @@ public class ElasticSerializableRowRecordListBackedMultiColumnWindow implements
return rowIterator;
}
+ public void setEmptyWindow(long startTime, long endTime) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.size = 0;
+ }
+
@Override
public long windowStartTime() {
return startTime;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java
index f8fcf92d22..4911522ea2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java
@@ -55,6 +55,9 @@ public class ElasticSerializableTVListBackedSingleColumnWindow implements RowWin
@Override
public Row getRow(int rowIndex) {
+ if (this.size == 0) {
+ throw new IndexOutOfBoundsException("Size is 0");
+ }
return row.seek(beginIndex + rowIndex);
}
@@ -65,6 +68,9 @@ public class ElasticSerializableTVListBackedSingleColumnWindow implements RowWin
@Override
public RowIterator getRowIterator() {
+ if (this.size == 0) {
+ return new EmptyRowIterator();
+ }
if (rowIterator == null) {
rowIterator =
new ElasticSerializableTVListBackedSingleColumnWindowIterator(
@@ -75,6 +81,12 @@ public class ElasticSerializableTVListBackedSingleColumnWindow implements RowWin
return rowIterator;
}
+ public void setEmptyWindow(long startTime, long endTime) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.size = 0;
+ }
+
@Override
public long windowStartTime() {
return startTime;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/EmptyRowIterator.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/EmptyRowIterator.java
new file mode 100644
index 0000000000..ca0478534b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/EmptyRowIterator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.access;
+
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.access.RowIterator;
+
+import java.io.IOException;
+
+public class EmptyRowIterator implements RowIterator {
+ @Override
+ public boolean hasNextRow() {
+ return false;
+ }
+
+ @Override
+ public Row next() throws IOException {
+ throw new UnsupportedOperationException("Can not call next on EmptyRowIterator");
+ }
+
+ @Override
+ public void reset() {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnIntermediateLayer.java
index 843bd85504..b390cd618f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnIntermediateLayer.java
@@ -351,13 +351,19 @@ public class MultiInputColumnIntermediateLayer extends IntermediateLayer
break;
}
}
+
+ if ((nextIndexEnd == nextIndexBegin)
+ && nextWindowTimeEnd < rowRecordList.getTime(rowRecordList.size() - 1)) {
+ window.setEmptyWindow(nextWindowTimeBegin, nextWindowTimeEnd);
+ return true;
+ }
window.seek(
nextIndexBegin,
nextIndexEnd,
nextWindowTimeBegin,
nextWindowTimeBegin + timeInterval - 1);
- hasCached = nextIndexBegin != nextIndexEnd;
+ hasCached = !(nextIndexBegin == nextIndexEnd && nextIndexEnd == rowRecordList.size());
return hasCached;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnMultiReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnMultiReferenceIntermediateLayer.java
index c1359d5771..7edd5ff90d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnMultiReferenceIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnMultiReferenceIntermediateLayer.java
@@ -341,13 +341,18 @@ public class SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
break;
}
}
+ if ((nextIndexEnd == nextIndexBegin)
+ && nextWindowTimeEnd < tvList.getTime(tvList.size() - 1)) {
+ window.setEmptyWindow(nextWindowTimeBegin, nextWindowTimeEnd);
+ return true;
+ }
window.seek(
nextIndexBegin,
nextIndexEnd,
nextWindowTimeBegin,
nextWindowTimeBegin + timeInterval - 1);
- hasCached = nextIndexBegin != nextIndexEnd;
+ hasCached = !(nextIndexBegin == nextIndexEnd && nextIndexEnd == tvList.size());
return hasCached;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java
index 29549ccdd0..e3cd04cccc 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java
@@ -254,13 +254,19 @@ public class SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
break;
}
}
+
+ if ((nextIndexEnd == nextIndexBegin)
+ && nextWindowTimeEnd < tvList.getTime(tvList.size() - 1)) {
+ window.setEmptyWindow(nextWindowTimeBegin, nextWindowTimeEnd);
+ return true;
+ }
window.seek(
nextIndexBegin,
nextIndexEnd,
nextWindowTimeBegin,
nextWindowTimeBegin + timeInterval - 1);
- hasCached = nextIndexBegin != nextIndexEnd;
+ hasCached = !(nextIndexBegin == nextIndexEnd && nextIndexEnd == tvList.size());
return hasCached;
}