You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/11/20 07:30:46 UTC

[iotdb] branch master updated: [IOTDB-4967] Fix SlidingTimeWindow stops when encounter an empty window (#8033)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 53bc567b3e [IOTDB-4967] Fix SlidingTimeWindow stops when encounter an empty window  (#8033)
53bc567b3e is described below

commit 53bc567b3e799a8b85d75982eeac86896230c148
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Sun Nov 20 15:30:40 2022 +0800

    [IOTDB-4967] Fix SlidingTimeWindow stops when encounter an empty window  (#8033)
---
 .../SlidingTimeWindowAccessStrategyExample.java    | 105 +++++++++++++++++++++
 .../iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java     |  31 ++++++
 ...izableRowRecordListBackedMultiColumnWindow.java |  12 +++
 ...SerializableTVListBackedSingleColumnWindow.java |  12 +++
 .../dag/adapter/EmptyRowIterator.java              |  40 ++++++++
 .../MultiInputColumnIntermediateLayer.java         |  16 +++-
 ...InputColumnMultiReferenceIntermediateLayer.java |  18 +++-
 ...nputColumnSingleReferenceIntermediateLayer.java |  18 +++-
 .../org/apache/iotdb/udf/api/access/RowWindow.java |   3 +-
 .../iotdb/udf/api/collector/PointCollector.java    |   4 +-
 10 files changed, 251 insertions(+), 8 deletions(-)

diff --git a/example/udf/src/main/java/org/apache/iotdb/udf/SlidingTimeWindowAccessStrategyExample.java b/example/udf/src/main/java/org/apache/iotdb/udf/SlidingTimeWindowAccessStrategyExample.java
new file mode 100644
index 0000000000..a9115b3c6c
--- /dev/null
+++ b/example/udf/src/main/java/org/apache/iotdb/udf/SlidingTimeWindowAccessStrategyExample.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.udf;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.RowWindow;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SlidingTimeWindowAccessStrategyExample implements UDTF {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(SlidingTimeWindowAccessStrategyExample.class);
+
+  public SlidingTimeWindowAccessStrategyExample() {}
+
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    LOGGER.info("###### TestSlidingTimeWindow # beforeStart #######");
+    LOGGER.info("attributes: {}", parameters.getAttributes().toString());
+    if (parameters.hasAttribute("start") && parameters.hasAttribute("end")) {
+      if (parameters.hasAttribute("step")) {
+        configurations
+            .setOutputDataType(Type.INT64)
+            .setAccessStrategy(
+                new SlidingTimeWindowAccessStrategy(
+                    (long) parameters.getInt("interval"),
+                    (long) parameters.getInt("step"),
+                    parameters.getLong("start"),
+                    parameters.getLong("end")));
+      } else {
+        configurations
+            .setOutputDataType(Type.INT64)
+            .setAccessStrategy(
+                new SlidingTimeWindowAccessStrategy(
+                    (long) parameters.getInt("interval"),
+                    (long) parameters.getInt("interval"),
+                    parameters.getLong("start"),
+                    parameters.getLong("end")));
+      }
+    } else {
+      if (parameters.hasAttribute("start") || parameters.hasAttribute("end")) {
+        throw new RuntimeException("start and end must be both existed. ");
+      }
+
+      if (parameters.hasAttribute("step")) {
+        configurations
+            .setOutputDataType(Type.INT64)
+            .setAccessStrategy(
+                new SlidingTimeWindowAccessStrategy(
+                    (long) parameters.getInt("interval"), (long) parameters.getInt("step")));
+      } else {
+        configurations
+            .setOutputDataType(Type.INT64)
+            .setAccessStrategy(
+                new SlidingTimeWindowAccessStrategy((long) parameters.getInt("interval")));
+      }
+    }
+  }
+
+  public void transform(RowWindow rowWindow, PointCollector collector) throws Exception {
+    LOGGER.info("######### TestSlidingTimeWindow # [{}] ########", rowWindow.windowSize());
+    long result = 0L;
+
+    for (int i = 0; i < rowWindow.windowSize(); ++i) {
+      if (!rowWindow.getRow(i).isNull(0)) {
+        result += rowWindow.getRow(i).getLong(0);
+      }
+    }
+
+    collector.putLong(rowWindow.windowStartTime(), result);
+  }
+
+  public void beforeDestroy() {
+    LOGGER.info("###### TestSlidingTimeWindow # beforeDestroy #######");
+  }
+
+  public void validate(UDFParameterValidator validator) throws Exception {
+    validator.validateRequiredAttribute("interval");
+    validator.validateInputSeriesDataType(0, Type.INT64);
+  }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java
index b6029e9970..6d14e5d47a 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java
@@ -39,6 +39,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
@@ -87,6 +88,7 @@ public class IoTDBUDFWindowQueryIT {
       statement.execute("CREATE DATABASE root.vehicle");
       statement.execute("CREATE TIMESERIES root.vehicle.d1.s1 with datatype=INT32,encoding=PLAIN");
       statement.execute("CREATE TIMESERIES root.vehicle.d1.s2 with datatype=INT32,encoding=PLAIN");
+      statement.execute("CREATE TIMESERIES root.vehicle1.d1.s1 with datatype=INT32,encoding=PLAIN");
     } catch (SQLException throwable) {
       fail(throwable.getMessage());
     }
@@ -100,6 +102,8 @@ public class IoTDBUDFWindowQueryIT {
             (String.format(
                 "insert into root.vehicle.d1(timestamp,s1,s2) values(%d,%d,%d)", i, i, i)));
       }
+      // test empty window, details could be found at https://github.com/apache/iotdb/issues/7738
+      statement.execute("insert into root.vehicle1.d1(timestamp, s1) values (1,2),(2,3),(7,8)");
     } catch (SQLException throwable) {
       fail(throwable.getMessage());
     }
@@ -431,6 +435,33 @@ public class IoTDBUDFWindowQueryIT {
     }
   }
 
+  @Test
+  public void testSlidingTimeWindowWithEmptyWindow() {
+    String sql =
+        String.format(
+            "select time_window_tester(s1, '%s'='%s') from root.vehicle1.d1",
+            UDFTestConstant.TIME_INTERVAL_KEY, 3);
+    try (Connection conn = EnvFactory.getEnv().getConnection();
+        Statement statement = conn.createStatement();
+        ResultSet resultSet = statement.executeQuery(sql)) {
+      assertEquals(2, resultSet.getMetaData().getColumnCount());
+
+      int count = 0;
+      if (resultSet.next()) {
+        assertEquals(5, (int) (Double.parseDouble(resultSet.getString(2))));
+        ++count;
+      }
+      if (resultSet.next()) {
+        assertEquals(8, (int) (Double.parseDouble(resultSet.getString(2))));
+        ++count;
+      }
+      assertFalse(resultSet.next());
+      assertEquals(2, count);
+    } catch (SQLException throwable) {
+      fail();
+    }
+  }
+
   @Test
   public void testSlidingTimeWindowWithTimeIntervalOnly1() {
     testSlidingTimeWindowWithTimeIntervalOnly(1);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/ElasticSerializableRowRecordListBackedMultiColumnWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/ElasticSerializableRowRecordListBackedMultiColumnWindow.java
index 9ba6e5c156..863de28e66 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/ElasticSerializableRowRecordListBackedMultiColumnWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/ElasticSerializableRowRecordListBackedMultiColumnWindow.java
@@ -63,6 +63,9 @@ public class ElasticSerializableRowRecordListBackedMultiColumnWindow implements
 
   @Override
   public Row getRow(int rowIndex) throws IOException {
+    if (this.size == 0) {
+      throw new IndexOutOfBoundsException("Size is 0");
+    }
     return row.setRowRecord(rowRecordList.getRowRecord(beginIndex + rowIndex));
   }
 
@@ -73,6 +76,9 @@ public class ElasticSerializableRowRecordListBackedMultiColumnWindow implements
 
   @Override
   public RowIterator getRowIterator() {
+    if (this.size == 0) {
+      return new EmptyRowIterator();
+    }
     if (rowIterator == null) {
       rowIterator =
           new ElasticSerializableRowRecordListBackedMultiColumnWindowIterator(
@@ -93,6 +99,12 @@ public class ElasticSerializableRowRecordListBackedMultiColumnWindow implements
     return endTime;
   }
 
+  public void setEmptyWindow(long startTime, long endTime) {
+    this.startTime = startTime;
+    this.endTime = endTime;
+    this.size = 0;
+  }
+
   public void seek(int beginIndex, int endIndex, long startTime, long endTime) {
     this.beginIndex = beginIndex;
     this.endIndex = endIndex;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/ElasticSerializableTVListBackedSingleColumnWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/ElasticSerializableTVListBackedSingleColumnWindow.java
index 64e360d3de..37f22901f7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/ElasticSerializableTVListBackedSingleColumnWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/ElasticSerializableTVListBackedSingleColumnWindow.java
@@ -56,6 +56,9 @@ public class ElasticSerializableTVListBackedSingleColumnWindow implements RowWin
 
   @Override
   public Row getRow(int rowIndex) {
+    if (this.size == 0) {
+      throw new IndexOutOfBoundsException("Size is 0");
+    }
     return row.seek(beginIndex + rowIndex);
   }
 
@@ -66,6 +69,9 @@ public class ElasticSerializableTVListBackedSingleColumnWindow implements RowWin
 
   @Override
   public RowIterator getRowIterator() {
+    if (this.size == 0) {
+      return new EmptyRowIterator();
+    }
     if (rowIterator == null) {
       rowIterator =
           new ElasticSerializableTVListBackedSingleColumnWindowIterator(
@@ -86,6 +92,12 @@ public class ElasticSerializableTVListBackedSingleColumnWindow implements RowWin
     return endTime;
   }
 
+  public void setEmptyWindow(long startTime, long endTime) {
+    this.startTime = startTime;
+    this.endTime = endTime;
+    this.size = 0;
+  }
+
   public void seek(int beginIndex, int endIndex, long startTime, long endTime) {
     this.beginIndex = beginIndex;
     this.endIndex = endIndex;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/EmptyRowIterator.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/EmptyRowIterator.java
new file mode 100644
index 0000000000..ebb714e332
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/adapter/EmptyRowIterator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.transformation.dag.adapter;
+
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.access.RowIterator;
+
+import java.io.IOException;
+
+public class EmptyRowIterator implements RowIterator {
+  @Override
+  public boolean hasNextRow() {
+    return false;
+  }
+
+  @Override
+  public Row next() throws IOException {
+    throw new UnsupportedOperationException("Can not call next on EmptyRowIterator");
+  }
+
+  @Override
+  public void reset() {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
index 4ac11c77ec..b2c9c731f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
@@ -570,13 +570,18 @@ public class MultiInputColumnIntermediateLayer extends IntermediateLayer
             break;
           }
         }
+        if ((nextIndexEnd == nextIndexBegin)
+            && nextWindowTimeEnd < rowRecordList.getTime(rowRecordList.size() - 1)) {
+          window.setEmptyWindow(nextWindowTimeBegin, nextWindowTimeEnd);
+          return YieldableState.YIELDABLE;
+        }
         window.seek(
             nextIndexBegin,
             nextIndexEnd,
             nextWindowTimeBegin,
             nextWindowTimeBegin + timeInterval - 1);
 
-        hasCached = nextIndexBegin != nextIndexEnd;
+        hasCached = !(nextIndexBegin == nextIndexEnd && nextIndexEnd == rowRecordList.size());
         return hasCached ? YieldableState.YIELDABLE : YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
       }
 
@@ -626,13 +631,20 @@ public class MultiInputColumnIntermediateLayer extends IntermediateLayer
             break;
           }
         }
+
+        if ((nextIndexEnd == nextIndexBegin)
+            && nextWindowTimeEnd < rowRecordList.getTime(rowRecordList.size() - 1)) {
+          window.setEmptyWindow(nextWindowTimeBegin, nextWindowTimeEnd);
+          return true;
+        }
+
         window.seek(
             nextIndexBegin,
             nextIndexEnd,
             nextWindowTimeBegin,
             nextWindowTimeBegin + timeInterval - 1);
 
-        hasCached = nextIndexBegin != nextIndexEnd;
+        hasCached = !(nextIndexBegin == nextIndexEnd && nextIndexEnd == rowRecordList.size());
         return hasCached;
       }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
index 5b6216eea0..165a079120 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
@@ -459,13 +459,20 @@ public class SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
             break;
           }
         }
+
+        if ((nextIndexEnd == nextIndexBegin)
+            && nextWindowTimeEnd < tvList.getTime(tvList.size() - 1)) {
+          window.setEmptyWindow(nextWindowTimeBegin, nextWindowTimeEnd);
+          return YieldableState.YIELDABLE;
+        }
+
         window.seek(
             nextIndexBegin,
             nextIndexEnd,
             nextWindowTimeBegin,
             nextWindowTimeBegin + timeInterval - 1);
 
-        hasCached = nextIndexBegin != nextIndexEnd;
+        hasCached = !(nextIndexBegin == nextIndexEnd && nextIndexEnd == tvList.size());
         return hasCached ? YieldableState.YIELDABLE : YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
       }
 
@@ -516,13 +523,20 @@ public class SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
             break;
           }
         }
+
+        if ((nextIndexEnd == nextIndexBegin)
+            && nextWindowTimeEnd < tvList.getTime(tvList.size() - 1)) {
+          window.setEmptyWindow(nextWindowTimeBegin, nextWindowTimeEnd);
+          return true;
+        }
+
         window.seek(
             nextIndexBegin,
             nextIndexEnd,
             nextWindowTimeBegin,
             nextWindowTimeBegin + timeInterval - 1);
 
-        hasCached = nextIndexBegin != nextIndexEnd;
+        hasCached = !(nextIndexBegin == nextIndexEnd && nextIndexEnd == tvList.size());
         return hasCached;
       }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
index 58b5bbb8ac..d6bd270c41 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
@@ -332,13 +332,20 @@ public class SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
             break;
           }
         }
+
+        if ((nextIndexEnd == nextIndexBegin)
+            && nextWindowTimeEnd < tvList.getTime(tvList.size() - 1)) {
+          window.setEmptyWindow(nextWindowTimeBegin, nextWindowTimeEnd);
+          return YieldableState.YIELDABLE;
+        }
+
         window.seek(
             nextIndexBegin,
             nextIndexEnd,
             nextWindowTimeBegin,
             nextWindowTimeBegin + timeInterval - 1);
 
-        hasCached = nextIndexBegin != nextIndexEnd;
+        hasCached = !(nextIndexBegin == nextIndexEnd && nextIndexEnd == tvList.size());
         return hasCached ? YieldableState.YIELDABLE : YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
       }
 
@@ -388,13 +395,20 @@ public class SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
             break;
           }
         }
+
+        if ((nextIndexEnd == nextIndexBegin)
+            && nextWindowTimeEnd < tvList.getTime(tvList.size() - 1)) {
+          window.setEmptyWindow(nextWindowTimeBegin, nextWindowTimeEnd);
+          return true;
+        }
+
         window.seek(
             nextIndexBegin,
             nextIndexEnd,
             nextWindowTimeBegin,
             nextWindowTimeBegin + timeInterval - 1);
 
-        hasCached = nextIndexBegin != nextIndexEnd;
+        hasCached = !(nextIndexBegin == nextIndexEnd && nextIndexEnd == tvList.size());
         return hasCached;
       }
 
diff --git a/udf-api/src/main/java/org/apache/iotdb/udf/api/access/RowWindow.java b/udf-api/src/main/java/org/apache/iotdb/udf/api/access/RowWindow.java
index 8d94a4cb63..782c98f509 100644
--- a/udf-api/src/main/java/org/apache/iotdb/udf/api/access/RowWindow.java
+++ b/udf-api/src/main/java/org/apache/iotdb/udf/api/access/RowWindow.java
@@ -42,7 +42,8 @@ public interface RowWindow {
    * will not generate a new Row instance.
    *
    * @param rowIndex index of the row to return
-   * @return the row at the specified position in this window
+   * @return the row at the specified position in this window, throw IndexOutOfBoundException if
+   *     call this method on an empty RowWindow.
    * @throws IOException if any I/O errors occur
    */
   Row getRow(int rowIndex) throws IOException;
diff --git a/udf-api/src/main/java/org/apache/iotdb/udf/api/collector/PointCollector.java b/udf-api/src/main/java/org/apache/iotdb/udf/api/collector/PointCollector.java
index 40270d1f0d..b5ed276d31 100644
--- a/udf-api/src/main/java/org/apache/iotdb/udf/api/collector/PointCollector.java
+++ b/udf-api/src/main/java/org/apache/iotdb/udf/api/collector/PointCollector.java
@@ -31,7 +31,9 @@ import java.io.IOException;
 
 /**
  * Used to collect time series data points generated by {@link UDTF#transform(Row, PointCollector)},
- * {@link UDTF#transform(RowWindow, PointCollector)} or {@link UDTF#terminate(PointCollector)}.
+ * Notice that one timestamp can not be put in the PointCollector more than once, or it may stop the
+ * calculation. {@link UDTF#transform(RowWindow, PointCollector)} or {@link
+ * UDTF#terminate(PointCollector)}.
  */
 public interface PointCollector {