You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/09/06 09:09:52 UTC

[iotdb] 02/04: SingleInputSingleOutputIntermediateLayer: constructRowSlidingTimeWindowReader

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9e61bd6a0d3259a540e71c94f373d1cbdc65bf57
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon Sep 6 11:31:08 2021 +0800

    SingleInputSingleOutputIntermediateLayer: constructRowSlidingTimeWindowReader
---
 .../db/query/udf/core/layer/IntermediateLayer.java |   5 +-
 .../SingleInputSingleOutputIntermediateLayer.java  | 175 ++++++++++++++++-----
 2 files changed, 137 insertions(+), 43 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
index 49aae1a..c7ae82c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
@@ -27,6 +27,8 @@ import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
 import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader;
 
+import java.io.IOException;
+
 public abstract class IntermediateLayer {
 
   protected final long queryId;
@@ -61,5 +63,6 @@ public abstract class IntermediateLayer {
       throws QueryProcessException;
 
   protected abstract LayerRowWindowReader constructRowSlidingTimeWindowReader(
-      SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB);
+      SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
+      throws QueryProcessException, IOException;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
index 3c1c705..452ad3f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
@@ -37,11 +37,13 @@ import java.io.IOException;
 public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer {
 
   private final LayerPointReader parentLayerPointReader;
+  private final TSDataType dataType;
 
   public SingleInputSingleOutputIntermediateLayer(
       long queryId, float memoryBudgetInMB, LayerPointReader parentLayerPointReader) {
     super(queryId, memoryBudgetInMB);
     this.parentLayerPointReader = parentLayerPointReader;
+    dataType = parentLayerPointReader.getDataType();
   }
 
   @Override
@@ -101,7 +103,6 @@ public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer
       private final int windowSize = strategy.getWindowSize();
       private final int slidingStep = strategy.getSlidingStep();
 
-      private final TSDataType dataType = parentLayerPointReader.getDataType();
       private final ElasticSerializableTVList tvList =
           ElasticSerializableTVList.newElasticSerializableTVList(
               dataType, queryId, memoryBudgetInMB, 2);
@@ -122,7 +123,7 @@ public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer
 
         int pointsToBeCollected = endIndex - tvList.size();
         if (0 < pointsToBeCollected) {
-          hasCached = collectPoints(pointsToBeCollected) != 0;
+          hasCached = collectPoints(pointsToBeCollected, tvList) != 0;
           window.seek(beginIndex, tvList.size());
         } else {
           hasCached = true;
@@ -132,50 +133,105 @@ public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer
         return hasCached;
       }
 
-      /** @return number of actually collected, which may be less than or equals to pointNumber */
-      private int collectPoints(int pointNumber) throws QueryProcessException, IOException {
-        int count = 0;
-
-        while (parentLayerPointReader.next() && count < pointNumber) {
-          ++count;
-
-          switch (dataType) {
-            case INT32:
-              tvList.putInt(
-                  parentLayerPointReader.currentTime(), parentLayerPointReader.currentInt());
-              break;
-            case INT64:
-              tvList.putLong(
-                  parentLayerPointReader.currentTime(), parentLayerPointReader.currentLong());
-              break;
-            case FLOAT:
-              tvList.putFloat(
-                  parentLayerPointReader.currentTime(), parentLayerPointReader.currentFloat());
-              break;
-            case DOUBLE:
-              tvList.putDouble(
-                  parentLayerPointReader.currentTime(), parentLayerPointReader.currentDouble());
-              break;
-            case BOOLEAN:
-              tvList.putBoolean(
-                  parentLayerPointReader.currentTime(), parentLayerPointReader.currentBoolean());
-              break;
-            case TEXT:
-              tvList.putBinary(
-                  parentLayerPointReader.currentTime(), parentLayerPointReader.currentBinary());
-              break;
-            default:
+      @Override
+      public void readyForNext() {
+        hasCached = false;
+      }
+
+      @Override
+      public TSDataType[] getDataTypes() {
+        return new TSDataType[] {dataType};
+      }
+
+      @Override
+      public RowWindow currentWindow() {
+        return window;
+      }
+    };
+  }
+
+  @Override
+  protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
+      SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
+      throws QueryProcessException, IOException {
+
+    final long timeInterval = strategy.getTimeInterval();
+    final long slidingStep = strategy.getSlidingStep();
+    final long displayWindowEnd = strategy.getDisplayWindowEnd();
+
+    final ElasticSerializableTVList tvList =
+        ElasticSerializableTVList.newElasticSerializableTVList(
+            dataType, queryId, memoryBudgetInMB, 2);
+    final ElasticSerializableTVListBackedSingleColumnWindow window =
+        new ElasticSerializableTVListBackedSingleColumnWindow(tvList);
+
+    long nextWindowTimeBeginGivenByStrategy = strategy.getDisplayWindowBegin();
+    if (tvList.size() == 0 && parentLayerPointReader.next()) {
+      collectPoints(1, tvList);
+
+      if (nextWindowTimeBeginGivenByStrategy == Long.MIN_VALUE) {
+        // display window begin should be set to the same as the min timestamp of the query result
+        // set
+        nextWindowTimeBeginGivenByStrategy = tvList.getTime(0);
+      }
+    }
+    long finalNextWindowTimeBeginGivenByStrategy = nextWindowTimeBeginGivenByStrategy;
+
+    final boolean hasAtLeastOneRow = tvList.size() != 0;
+
+    return new LayerRowWindowReader() {
+
+      private long nextWindowTimeBegin = finalNextWindowTimeBeginGivenByStrategy;
+      private int nextIndexBegin = 0;
+
+      @Override
+      public boolean next() throws IOException, QueryProcessException {
+        if (displayWindowEnd <= nextWindowTimeBegin) {
+          return false;
+        }
+        if (!hasAtLeastOneRow || 0 < tvList.size()) {
+          return true;
+        }
+
+        long nextWindowTimeEnd = Math.min(nextWindowTimeBegin + timeInterval, displayWindowEnd);
+        int oldTVListSize = tvList.size();
+        while (tvList.getTime(tvList.size() - 1) < nextWindowTimeEnd) {
+          if (parentLayerPointReader.next()) {
+            collectPoints(1, tvList);
+          } else if (displayWindowEnd == Long.MAX_VALUE
+              // display window end == the max timestamp of the query result set
+              && oldTVListSize == tvList.size()) {
+            return false;
+          } else {
+            break;
           }
+        }
 
-          parentLayerPointReader.readyForNext();
+        for (int i = nextIndexBegin; i < tvList.size(); ++i) {
+          if (nextWindowTimeBegin <= tvList.getTime(i)) {
+            nextIndexBegin = i;
+            break;
+          }
+          if (i == tvList.size() - 1) {
+            nextIndexBegin = tvList.size();
+          }
         }
 
-        return count;
+        int nextIndexEnd = tvList.size();
+        for (int i = nextIndexBegin; i < tvList.size(); ++i) {
+          if (nextWindowTimeEnd <= tvList.getTime(i)) {
+            nextIndexEnd = i;
+            break;
+          }
+        }
+        window.seek(nextIndexBegin, nextIndexEnd);
+
+        return true;
       }
 
       @Override
       public void readyForNext() {
-        hasCached = false;
+        nextWindowTimeBegin += slidingStep;
       }
 
       @Override
@@ -190,9 +246,44 @@ public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer
     };
   }
 
-  @Override
-  protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
-      SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
-    return null;
+  /** @return number of actually collected, which may be less than or equals to pointNumber */
+  private int collectPoints(int pointNumber, ElasticSerializableTVList tvList)
+      throws QueryProcessException, IOException {
+    int count = 0;
+
+    while (parentLayerPointReader.next() && count < pointNumber) {
+      ++count;
+
+      switch (dataType) {
+        case INT32:
+          tvList.putInt(parentLayerPointReader.currentTime(), parentLayerPointReader.currentInt());
+          break;
+        case INT64:
+          tvList.putLong(
+              parentLayerPointReader.currentTime(), parentLayerPointReader.currentLong());
+          break;
+        case FLOAT:
+          tvList.putFloat(
+              parentLayerPointReader.currentTime(), parentLayerPointReader.currentFloat());
+          break;
+        case DOUBLE:
+          tvList.putDouble(
+              parentLayerPointReader.currentTime(), parentLayerPointReader.currentDouble());
+          break;
+        case BOOLEAN:
+          tvList.putBoolean(
+              parentLayerPointReader.currentTime(), parentLayerPointReader.currentBoolean());
+          break;
+        case TEXT:
+          tvList.putBinary(
+              parentLayerPointReader.currentTime(), parentLayerPointReader.currentBinary());
+          break;
+        default:
+      }
+
+      parentLayerPointReader.readyForNext();
+    }
+
+    return count;
   }
 }