You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/11/22 02:45:43 UTC

[iotdb] branch rel/0.13 updated: [To rel/0.13][IOTDB-4967] SlidingTimeWindow stops when encounter an empty window

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.13 by this push:
     new 67869f3fff [To rel/0.13][IOTDB-4967] SlidingTimeWindow stops when encounter an empty window
67869f3fff is described below

commit 67869f3fff4d7187d1cfae8867349ba64e8dbe8b
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Tue Nov 22 10:45:36 2022 +0800

    [To rel/0.13][IOTDB-4967] SlidingTimeWindow stops when encounter an empty window
---
 .../db/integration/IoTDBUDFWindowQueryIT.java      | 39 ++++++++++++++++++---
 .../iotdb/db/query/udf/api/access/RowWindow.java   |  3 +-
 .../db/query/udf/api/collector/PointCollector.java |  4 ++-
 ...izableRowRecordListBackedMultiColumnWindow.java | 12 +++++++
 ...SerializableTVListBackedSingleColumnWindow.java | 12 +++++++
 .../db/query/udf/core/access/EmptyRowIterator.java | 40 ++++++++++++++++++++++
 .../layer/MultiInputColumnIntermediateLayer.java   |  8 ++++-
 ...InputColumnMultiReferenceIntermediateLayer.java |  7 +++-
 ...nputColumnSingleReferenceIntermediateLayer.java |  8 ++++-
 9 files changed, 124 insertions(+), 9 deletions(-)

diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBUDFWindowQueryIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBUDFWindowQueryIT.java
index 808d319aaf..1037601c88 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBUDFWindowQueryIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBUDFWindowQueryIT.java
@@ -37,6 +37,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
 @Category({LocalStandaloneTest.class, ClusterTest.class})
@@ -62,6 +63,7 @@ public class IoTDBUDFWindowQueryIT {
       statement.execute("SET STORAGE GROUP TO root.vehicle");
       statement.execute("CREATE TIMESERIES root.vehicle.d1.s1 with datatype=INT32,encoding=PLAIN");
       statement.execute("CREATE TIMESERIES root.vehicle.d1.s2 with datatype=INT32,encoding=PLAIN");
+      statement.execute("CREATE TIMESERIES root.vehicle1.d1.s1 with datatype=INT32,encoding=PLAIN");
     } catch (SQLException throwable) {
       fail(throwable.getMessage());
     }
@@ -75,6 +77,8 @@ public class IoTDBUDFWindowQueryIT {
             (String.format(
                 "insert into root.vehicle.d1(timestamp,s1,s2) values(%d,%d,%d)", i, i, i)));
       }
+      // test empty window, details could be found at https://github.com/apache/iotdb/issues/7738
+      statement.execute("insert into root.vehicle1.d1(timestamp, s1) values (1,2),(2,3),(7,8)");
     } catch (SQLException throwable) {
       fail(throwable.getMessage());
     }
@@ -406,6 +410,33 @@ public class IoTDBUDFWindowQueryIT {
     }
   }
 
+  @Test
+  public void testSlidingTimeWindowWithEmptyWindow() {
+    String sql =
+        String.format(
+            "select time_window_tester(s1, '%s'='%s') from root.vehicle1.d1",
+            ExampleUDFConstant.TIME_INTERVAL_KEY, 3);
+    try (Connection conn = EnvFactory.getEnv().getConnection();
+        Statement statement = conn.createStatement();
+        ResultSet resultSet = statement.executeQuery(sql)) {
+      assertEquals(2, resultSet.getMetaData().getColumnCount());
+
+      int count = 0;
+      if (resultSet.next()) {
+        assertEquals(5, (int) (Double.parseDouble(resultSet.getString(2))));
+        ++count;
+      }
+      if (resultSet.next()) {
+        assertEquals(8, (int) (Double.parseDouble(resultSet.getString(2))));
+        ++count;
+      }
+      assertFalse(resultSet.next());
+      assertEquals(2, count);
+    } catch (SQLException throwable) {
+      fail();
+    }
+  }
+
   @Test
   public void testSlidingTimeWindowWithTimeIntervalOnly1() {
     testSlidingTimeWindowWithTimeIntervalOnly(1);
@@ -705,7 +736,7 @@ public class IoTDBUDFWindowQueryIT {
           time += 1000;
           value += 1000000;
         }
-        Assert.assertFalse(rs.next());
+        assertFalse(rs.next());
       }
 
       query =
@@ -720,7 +751,7 @@ public class IoTDBUDFWindowQueryIT {
           time += 1000;
           value += 1000000D;
         }
-        Assert.assertFalse(rs.next());
+        assertFalse(rs.next());
       }
     } catch (SQLException e) {
       e.printStackTrace();
@@ -750,7 +781,7 @@ public class IoTDBUDFWindowQueryIT {
           time += 1000;
           value += 1000000;
         }
-        Assert.assertFalse(rs.next());
+        assertFalse(rs.next());
       }
 
       query =
@@ -771,7 +802,7 @@ public class IoTDBUDFWindowQueryIT {
           time += 1000;
           value += 1000000D;
         }
-        Assert.assertFalse(rs.next());
+        assertFalse(rs.next());
       }
     } catch (SQLException e) {
       e.printStackTrace();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/api/access/RowWindow.java b/server/src/main/java/org/apache/iotdb/db/query/udf/api/access/RowWindow.java
index 54b6caf0ec..cdfd2ebb8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/api/access/RowWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/api/access/RowWindow.java
@@ -42,7 +42,8 @@ public interface RowWindow {
    * will not generate a new Row instance.
    *
    * @param rowIndex index of the row to return
-   * @return the row at the specified position in this window
+   * @return the row at the specified position in this window, throw IndexOutOfBoundException if
+   *     call this method on an empty RowWindow.
    * @throws IOException if any I/O errors occur
    */
   Row getRow(int rowIndex) throws IOException;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/api/collector/PointCollector.java b/server/src/main/java/org/apache/iotdb/db/query/udf/api/collector/PointCollector.java
index d1f4b9fafb..beedb5ff39 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/api/collector/PointCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/api/collector/PointCollector.java
@@ -32,7 +32,9 @@ import java.io.IOException;
 
 /**
  * Used to collect time series data points generated by {@link UDTF#transform(Row, PointCollector)},
- * {@link UDTF#transform(RowWindow, PointCollector)} or {@link UDTF#terminate(PointCollector)}.
+ * {@link UDTF#transform(RowWindow, PointCollector)} or {@link UDTF#terminate(PointCollector)}. *
+ * Notice that one timestamp can not be put in the PointCollector more than once, or it may stop the
+ * * calculation.
  */
 public interface PointCollector {
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableRowRecordListBackedMultiColumnWindow.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableRowRecordListBackedMultiColumnWindow.java
index d384267516..f5e3ea4e6f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableRowRecordListBackedMultiColumnWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableRowRecordListBackedMultiColumnWindow.java
@@ -61,6 +61,9 @@ public class ElasticSerializableRowRecordListBackedMultiColumnWindow implements
 
   @Override
   public Row getRow(int rowIndex) throws IOException {
+    if (this.size == 0) {
+      throw new IndexOutOfBoundsException("Size is 0");
+    }
     return row.setRowRecord(rowRecordList.getRowRecord(beginIndex + rowIndex));
   }
 
@@ -71,6 +74,9 @@ public class ElasticSerializableRowRecordListBackedMultiColumnWindow implements
 
   @Override
   public RowIterator getRowIterator() {
+    if (this.size == 0) {
+      return new EmptyRowIterator();
+    }
     if (rowIterator == null) {
       rowIterator =
           new ElasticSerializableRowRecordListBackedMultiColumnWindowIterator(
@@ -81,6 +87,12 @@ public class ElasticSerializableRowRecordListBackedMultiColumnWindow implements
     return rowIterator;
   }
 
+  public void setEmptyWindow(long startTime, long endTime) {
+    this.startTime = startTime;
+    this.endTime = endTime;
+    this.size = 0;
+  }
+
   @Override
   public long windowStartTime() {
     return startTime;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java
index f8fcf92d22..4911522ea2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java
@@ -55,6 +55,9 @@ public class ElasticSerializableTVListBackedSingleColumnWindow implements RowWin
 
   @Override
   public Row getRow(int rowIndex) {
+    if (this.size == 0) {
+      throw new IndexOutOfBoundsException("Size is 0");
+    }
     return row.seek(beginIndex + rowIndex);
   }
 
@@ -65,6 +68,9 @@ public class ElasticSerializableTVListBackedSingleColumnWindow implements RowWin
 
   @Override
   public RowIterator getRowIterator() {
+    if (this.size == 0) {
+      return new EmptyRowIterator();
+    }
     if (rowIterator == null) {
       rowIterator =
           new ElasticSerializableTVListBackedSingleColumnWindowIterator(
@@ -75,6 +81,12 @@ public class ElasticSerializableTVListBackedSingleColumnWindow implements RowWin
     return rowIterator;
   }
 
+  public void setEmptyWindow(long startTime, long endTime) {
+    this.startTime = startTime;
+    this.endTime = endTime;
+    this.size = 0;
+  }
+
   @Override
   public long windowStartTime() {
     return startTime;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/EmptyRowIterator.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/EmptyRowIterator.java
new file mode 100644
index 0000000000..ca0478534b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/EmptyRowIterator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.access;
+
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.access.RowIterator;
+
+import java.io.IOException;
+
+public class EmptyRowIterator implements RowIterator {
+  @Override
+  public boolean hasNextRow() {
+    return false;
+  }
+
+  @Override
+  public Row next() throws IOException {
+    throw new UnsupportedOperationException("Can not call next on EmptyRowIterator");
+  }
+
+  @Override
+  public void reset() {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnIntermediateLayer.java
index 843bd85504..b390cd618f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnIntermediateLayer.java
@@ -351,13 +351,19 @@ public class MultiInputColumnIntermediateLayer extends IntermediateLayer
             break;
           }
         }
+
+        if ((nextIndexEnd == nextIndexBegin)
+            && nextWindowTimeEnd < rowRecordList.getTime(rowRecordList.size() - 1)) {
+          window.setEmptyWindow(nextWindowTimeBegin, nextWindowTimeEnd);
+          return true;
+        }
         window.seek(
             nextIndexBegin,
             nextIndexEnd,
             nextWindowTimeBegin,
             nextWindowTimeBegin + timeInterval - 1);
 
-        hasCached = nextIndexBegin != nextIndexEnd;
+        hasCached = !(nextIndexBegin == nextIndexEnd && nextIndexEnd == rowRecordList.size());
         return hasCached;
       }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnMultiReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnMultiReferenceIntermediateLayer.java
index c1359d5771..7edd5ff90d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnMultiReferenceIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnMultiReferenceIntermediateLayer.java
@@ -341,13 +341,18 @@ public class SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
             break;
           }
         }
+        if ((nextIndexEnd == nextIndexBegin)
+            && nextWindowTimeEnd < tvList.getTime(tvList.size() - 1)) {
+          window.setEmptyWindow(nextWindowTimeBegin, nextWindowTimeEnd);
+          return true;
+        }
         window.seek(
             nextIndexBegin,
             nextIndexEnd,
             nextWindowTimeBegin,
             nextWindowTimeBegin + timeInterval - 1);
 
-        hasCached = nextIndexBegin != nextIndexEnd;
+        hasCached = !(nextIndexBegin == nextIndexEnd && nextIndexEnd == tvList.size());
         return hasCached;
       }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java
index 29549ccdd0..e3cd04cccc 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java
@@ -254,13 +254,19 @@ public class SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
             break;
           }
         }
+
+        if ((nextIndexEnd == nextIndexBegin)
+            && nextWindowTimeEnd < tvList.getTime(tvList.size() - 1)) {
+          window.setEmptyWindow(nextWindowTimeBegin, nextWindowTimeEnd);
+          return true;
+        }
         window.seek(
             nextIndexBegin,
             nextIndexEnd,
             nextWindowTimeBegin,
             nextWindowTimeBegin + timeInterval - 1);
 
-        hasCached = nextIndexBegin != nextIndexEnd;
+        hasCached = !(nextIndexBegin == nextIndexEnd && nextIndexEnd == tvList.size());
         return hasCached;
       }