You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/09/06 09:09:50 UTC

[iotdb] branch nested-operations updated (f47791a -> 205c3ba)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a change to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


    from f47791a  new access data structures
     new 025fd00  SingleInputSingleOutputIntermediateLayer: constructPointReader & constructRowReader & constructRowSlidingSizeWindowReader
     new 9e61bd6  SingleInputSingleOutputIntermediateLayer: constructRowSlidingTimeWindowReader
     new 3631c85  SingleInputMultiOutputIntermediateLayer & SingleInputSingleOutputIntermediateLayer
     new 205c3ba  rename

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../query/expression/binary/BinaryExpression.java  |   4 +-
 .../query/expression/unary/FunctionExpression.java |   5 +-
 .../query/expression/unary/NegationExpression.java |   4 +-
 .../query/expression/unary/TimeSeriesOperand.java  |   4 +-
 ...SerializableTVListBackedSingleColumnWindow.java |   4 +-
 .../db/query/udf/core/layer/IntermediateLayer.java |  13 +-
 .../db/query/udf/core/layer/LayerCacheUtils.java   |  81 +++++
 ...nputColumnMultiReferenceIntermediateLayer.java} |  51 ++--
 ...putColumnSingleReferenceIntermediateLayer.java} |  51 ++--
 .../MultiInputMultiOutputIntermediateLayer.java    |  35 ---
 .../MultiInputSingleOutputIntermediateLayer.java   |  35 ---
 ...InputColumnMultiReferenceIntermediateLayer.java | 336 +++++++++++++++++++++
 ...nputColumnSingleReferenceIntermediateLayer.java | 248 +++++++++++++++
 .../SingleInputMultiOutputIntermediateLayer.java   | 165 ----------
 .../SingleInputSingleOutputIntermediateLayer.java  |  36 ---
 .../iotdb/db/query/udf/core/layer/UDFLayer.java    |  28 +-
 .../db/query/udf/core/reader/LayerRowReader.java   |   2 +-
 17 files changed, 744 insertions(+), 358 deletions(-)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerCacheUtils.java
 copy server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/{IntermediateLayer.java => MultiInputColumnMultiReferenceIntermediateLayer.java} (51%)
 copy server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/{IntermediateLayer.java => MultiInputColumnSingleReferenceIntermediateLayer.java} (51%)
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputMultiOutputIntermediateLayer.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputSingleOutputIntermediateLayer.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnMultiReferenceIntermediateLayer.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java

[iotdb] 02/04: SingleInputSingleOutputIntermediateLayer: constructRowSlidingTimeWindowReader

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9e61bd6a0d3259a540e71c94f373d1cbdc65bf57
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon Sep 6 11:31:08 2021 +0800

    SingleInputSingleOutputIntermediateLayer: constructRowSlidingTimeWindowReader
---
 .../db/query/udf/core/layer/IntermediateLayer.java |   5 +-
 .../SingleInputSingleOutputIntermediateLayer.java  | 175 ++++++++++++++++-----
 2 files changed, 137 insertions(+), 43 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
index 49aae1a..c7ae82c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
@@ -27,6 +27,8 @@ import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
 import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader;
 
+import java.io.IOException;
+
 public abstract class IntermediateLayer {
 
   protected final long queryId;
@@ -61,5 +63,6 @@ public abstract class IntermediateLayer {
       throws QueryProcessException;
 
   protected abstract LayerRowWindowReader constructRowSlidingTimeWindowReader(
-      SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB);
+      SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
+      throws QueryProcessException, IOException;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
index 3c1c705..452ad3f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
@@ -37,11 +37,13 @@ import java.io.IOException;
 public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer {
 
   private final LayerPointReader parentLayerPointReader;
+  private final TSDataType dataType;
 
   public SingleInputSingleOutputIntermediateLayer(
       long queryId, float memoryBudgetInMB, LayerPointReader parentLayerPointReader) {
     super(queryId, memoryBudgetInMB);
     this.parentLayerPointReader = parentLayerPointReader;
+    dataType = parentLayerPointReader.getDataType();
   }
 
   @Override
@@ -101,7 +103,6 @@ public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer
       private final int windowSize = strategy.getWindowSize();
       private final int slidingStep = strategy.getSlidingStep();
 
-      private final TSDataType dataType = parentLayerPointReader.getDataType();
       private final ElasticSerializableTVList tvList =
           ElasticSerializableTVList.newElasticSerializableTVList(
               dataType, queryId, memoryBudgetInMB, 2);
@@ -122,7 +123,7 @@ public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer
 
         int pointsToBeCollected = endIndex - tvList.size();
         if (0 < pointsToBeCollected) {
-          hasCached = collectPoints(pointsToBeCollected) != 0;
+          hasCached = collectPoints(pointsToBeCollected, tvList) != 0;
           window.seek(beginIndex, tvList.size());
         } else {
           hasCached = true;
@@ -132,50 +133,105 @@ public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer
         return hasCached;
       }
 
-      /** @return number of actually collected, which may be less than or equals to pointNumber */
-      private int collectPoints(int pointNumber) throws QueryProcessException, IOException {
-        int count = 0;
-
-        while (parentLayerPointReader.next() && count < pointNumber) {
-          ++count;
-
-          switch (dataType) {
-            case INT32:
-              tvList.putInt(
-                  parentLayerPointReader.currentTime(), parentLayerPointReader.currentInt());
-              break;
-            case INT64:
-              tvList.putLong(
-                  parentLayerPointReader.currentTime(), parentLayerPointReader.currentLong());
-              break;
-            case FLOAT:
-              tvList.putFloat(
-                  parentLayerPointReader.currentTime(), parentLayerPointReader.currentFloat());
-              break;
-            case DOUBLE:
-              tvList.putDouble(
-                  parentLayerPointReader.currentTime(), parentLayerPointReader.currentDouble());
-              break;
-            case BOOLEAN:
-              tvList.putBoolean(
-                  parentLayerPointReader.currentTime(), parentLayerPointReader.currentBoolean());
-              break;
-            case TEXT:
-              tvList.putBinary(
-                  parentLayerPointReader.currentTime(), parentLayerPointReader.currentBinary());
-              break;
-            default:
+      @Override
+      public void readyForNext() {
+        hasCached = false;
+      }
+
+      @Override
+      public TSDataType[] getDataTypes() {
+        return new TSDataType[] {dataType};
+      }
+
+      @Override
+      public RowWindow currentWindow() {
+        return window;
+      }
+    };
+  }
+
+  @Override
+  protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
+      SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
+      throws QueryProcessException, IOException {
+
+    final long timeInterval = strategy.getTimeInterval();
+    final long slidingStep = strategy.getSlidingStep();
+    final long displayWindowEnd = strategy.getDisplayWindowEnd();
+
+    final ElasticSerializableTVList tvList =
+        ElasticSerializableTVList.newElasticSerializableTVList(
+            dataType, queryId, memoryBudgetInMB, 2);
+    final ElasticSerializableTVListBackedSingleColumnWindow window =
+        new ElasticSerializableTVListBackedSingleColumnWindow(tvList);
+
+    long nextWindowTimeBeginGivenByStrategy = strategy.getDisplayWindowBegin();
+    if (tvList.size() == 0 && parentLayerPointReader.next()) {
+      collectPoints(1, tvList);
+
+      if (nextWindowTimeBeginGivenByStrategy == Long.MIN_VALUE) {
+        // display window begin should be set to the same as the min timestamp of the query result
+        // set
+        nextWindowTimeBeginGivenByStrategy = tvList.getTime(0);
+      }
+    }
+    long finalNextWindowTimeBeginGivenByStrategy = nextWindowTimeBeginGivenByStrategy;
+
+    final boolean hasAtLeastOneRow = tvList.size() != 0;
+
+    return new LayerRowWindowReader() {
+
+      private long nextWindowTimeBegin = finalNextWindowTimeBeginGivenByStrategy;
+      private int nextIndexBegin = 0;
+
+      @Override
+      public boolean next() throws IOException, QueryProcessException {
+        if (displayWindowEnd <= nextWindowTimeBegin) {
+          return false;
+        }
+        if (!hasAtLeastOneRow || 0 < tvList.size()) {
+          return true;
+        }
+
+        long nextWindowTimeEnd = Math.min(nextWindowTimeBegin + timeInterval, displayWindowEnd);
+        int oldTVListSize = tvList.size();
+        while (tvList.getTime(tvList.size() - 1) < nextWindowTimeEnd) {
+          if (parentLayerPointReader.next()) {
+            collectPoints(1, tvList);
+          } else if (displayWindowEnd == Long.MAX_VALUE
+              // display window end == the max timestamp of the query result set
+              && oldTVListSize == tvList.size()) {
+            return false;
+          } else {
+            break;
           }
+        }
 
-          parentLayerPointReader.readyForNext();
+        for (int i = nextIndexBegin; i < tvList.size(); ++i) {
+          if (nextWindowTimeBegin <= tvList.getTime(i)) {
+            nextIndexBegin = i;
+            break;
+          }
+          if (i == tvList.size() - 1) {
+            nextIndexBegin = tvList.size();
+          }
         }
 
-        return count;
+        int nextIndexEnd = tvList.size();
+        for (int i = nextIndexBegin; i < tvList.size(); ++i) {
+          if (nextWindowTimeEnd <= tvList.getTime(i)) {
+            nextIndexEnd = i;
+            break;
+          }
+        }
+        window.seek(nextIndexBegin, nextIndexEnd);
+
+        return true;
       }
 
       @Override
       public void readyForNext() {
-        hasCached = false;
+        nextWindowTimeBegin += slidingStep;
       }
 
       @Override
@@ -190,9 +246,44 @@ public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer
     };
   }
 
-  @Override
-  protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
-      SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
-    return null;
+  /** @return number of actually collected, which may be less than or equals to pointNumber */
+  private int collectPoints(int pointNumber, ElasticSerializableTVList tvList)
+      throws QueryProcessException, IOException {
+    int count = 0;
+
+    while (parentLayerPointReader.next() && count < pointNumber) {
+      ++count;
+
+      switch (dataType) {
+        case INT32:
+          tvList.putInt(parentLayerPointReader.currentTime(), parentLayerPointReader.currentInt());
+          break;
+        case INT64:
+          tvList.putLong(
+              parentLayerPointReader.currentTime(), parentLayerPointReader.currentLong());
+          break;
+        case FLOAT:
+          tvList.putFloat(
+              parentLayerPointReader.currentTime(), parentLayerPointReader.currentFloat());
+          break;
+        case DOUBLE:
+          tvList.putDouble(
+              parentLayerPointReader.currentTime(), parentLayerPointReader.currentDouble());
+          break;
+        case BOOLEAN:
+          tvList.putBoolean(
+              parentLayerPointReader.currentTime(), parentLayerPointReader.currentBoolean());
+          break;
+        case TEXT:
+          tvList.putBinary(
+              parentLayerPointReader.currentTime(), parentLayerPointReader.currentBinary());
+          break;
+        default:
+      }
+
+      parentLayerPointReader.readyForNext();
+    }
+
+    return count;
   }
 }

[iotdb] 01/04: SingleInputSingleOutputIntermediateLayer: constructPointReader & constructRowReader & constructRowSlidingSizeWindowReader

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 025fd00aa3766e50e0baf0717b53893abf105375
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon Sep 6 10:37:12 2021 +0800

    SingleInputSingleOutputIntermediateLayer: constructPointReader & constructRowReader & constructRowSlidingSizeWindowReader
---
 ...SerializableTVListBackedSingleColumnWindow.java |   4 +-
 .../db/query/udf/core/layer/IntermediateLayer.java |   8 +-
 .../MultiInputMultiOutputIntermediateLayer.java    |  27 +++-
 .../MultiInputSingleOutputIntermediateLayer.java   |  27 +++-
 .../SingleInputMultiOutputIntermediateLayer.java   |  29 +++-
 .../SingleInputSingleOutputIntermediateLayer.java  | 166 ++++++++++++++++++++-
 .../iotdb/db/query/udf/core/layer/UDFLayer.java    |  28 ++--
 .../db/query/udf/core/reader/LayerRowReader.java   |   2 +-
 8 files changed, 262 insertions(+), 29 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java
index f91dd35..4157f90 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java
@@ -76,14 +76,12 @@ public class ElasticSerializableTVListBackedSingleColumnWindow implements RowWin
     return rowIterator;
   }
 
-  private RowWindow seek(int beginIndex, int endIndex) {
+  public void seek(int beginIndex, int endIndex) {
     this.beginIndex = beginIndex;
     this.endIndex = endIndex;
     size = endIndex - beginIndex;
 
     row.seek(beginIndex);
     rowIterator = null;
-
-    return this;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
index 069af89..49aae1a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.query.udf.core.layer;
 
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.query.udf.api.customizer.strategy.AccessStrategy;
 import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
 import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
@@ -41,7 +42,7 @@ public abstract class IntermediateLayer {
   public abstract LayerRowReader constructRowReader();
 
   public final LayerRowWindowReader constructRowWindowReader(
-      AccessStrategy strategy, float memoryBudgetInMB) {
+      AccessStrategy strategy, float memoryBudgetInMB) throws QueryProcessException {
     switch (strategy.getAccessStrategyType()) {
       case SLIDING_TIME_WINDOW:
         return constructRowSlidingTimeWindowReader(
@@ -56,10 +57,9 @@ public abstract class IntermediateLayer {
   }
 
   protected abstract LayerRowWindowReader constructRowSlidingSizeWindowReader(
-      SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB);
+      SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB)
+      throws QueryProcessException;
 
   protected abstract LayerRowWindowReader constructRowSlidingTimeWindowReader(
       SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB);
-
-  public abstract void updateEvictionUpperBound();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputMultiOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputMultiOutputIntermediateLayer.java
index 9cd5422..2a70a7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputMultiOutputIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputMultiOutputIntermediateLayer.java
@@ -19,17 +19,40 @@
 
 package org.apache.iotdb.db.query.udf.core.layer;
 
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader;
 
 import java.util.List;
 
-public class MultiInputMultiOutputIntermediateLayer implements IntermediateLayer {
+public class MultiInputMultiOutputIntermediateLayer extends IntermediateLayer {
 
   public MultiInputMultiOutputIntermediateLayer(
-      List<LayerPointReader> parentLayerPointReaders, long queryId, float memoryBudgetInMB) {}
+      long queryId, float memoryBudgetInMB, List<LayerPointReader> parentLayerPointReaders) {
+    super(queryId, memoryBudgetInMB);
+  }
 
   @Override
   public LayerPointReader constructPointReader() {
     return null;
   }
+
+  @Override
+  public LayerRowReader constructRowReader() {
+    return null;
+  }
+
+  @Override
+  protected LayerRowWindowReader constructRowSlidingSizeWindowReader(
+      SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+    return null;
+  }
+
+  @Override
+  protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
+      SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+    return null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputSingleOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputSingleOutputIntermediateLayer.java
index 71a4f06..c63b1be 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputSingleOutputIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputSingleOutputIntermediateLayer.java
@@ -19,17 +19,40 @@
 
 package org.apache.iotdb.db.query.udf.core.layer;
 
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader;
 
 import java.util.List;
 
-public class MultiInputSingleOutputIntermediateLayer implements IntermediateLayer {
+public class MultiInputSingleOutputIntermediateLayer extends IntermediateLayer {
 
   public MultiInputSingleOutputIntermediateLayer(
-      List<LayerPointReader> parentLayerPointReaders, long queryId, float memoryBudgetInMB) {}
+      long queryId, float memoryBudgetInMB, List<LayerPointReader> parentLayerPointReaders) {
+    super(queryId, memoryBudgetInMB);
+  }
 
   @Override
   public LayerPointReader constructPointReader() {
     return null;
   }
+
+  @Override
+  public LayerRowReader constructRowReader() {
+    return null;
+  }
+
+  @Override
+  protected LayerRowWindowReader constructRowSlidingSizeWindowReader(
+      SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+    return null;
+  }
+
+  @Override
+  protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
+      SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+    return null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java
index a711ced..17467b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java
@@ -20,27 +20,33 @@
 package org.apache.iotdb.db.query.udf.core.layer;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
 import org.apache.iotdb.db.query.udf.core.layer.SafetyLine.SafetyPile;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader;
 import org.apache.iotdb.db.query.udf.datastructure.tv.ElasticSerializableTVList;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 
 import java.io.IOException;
 
-public class SingleInputMultiOutputIntermediateLayer implements IntermediateLayer {
+public class SingleInputMultiOutputIntermediateLayer extends IntermediateLayer {
 
   private static final int CACHE_BLOCK_SIZE = 2;
 
-  private final TSDataType dataType;
   private final LayerPointReader parentLayerPointReader;
+  private final TSDataType dataType;
   private final ElasticSerializableTVList tvList;
   private final SafetyLine safetyLine;
 
   public SingleInputMultiOutputIntermediateLayer(
-      LayerPointReader parentLayerPointReader, long queryId, float memoryBudgetInMB)
+      long queryId, float memoryBudgetInMB, LayerPointReader parentLayerPointReader)
       throws QueryProcessException {
+    super(queryId, memoryBudgetInMB);
     this.parentLayerPointReader = parentLayerPointReader;
+
     dataType = parentLayerPointReader.getDataType();
     tvList =
         ElasticSerializableTVList.newElasticSerializableTVList(
@@ -162,4 +168,21 @@ public class SingleInputMultiOutputIntermediateLayer implements IntermediateLaye
       }
     };
   }
+
+  @Override
+  public LayerRowReader constructRowReader() {
+    return null;
+  }
+
+  @Override
+  protected LayerRowWindowReader constructRowSlidingSizeWindowReader(
+      SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+    return null;
+  }
+
+  @Override
+  protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
+      SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+    return null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
index 131472a..3c1c705 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
@@ -19,13 +19,28 @@
 
 package org.apache.iotdb.db.query.udf.core.layer;
 
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.access.RowWindow;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.core.access.ElasticSerializableTVListBackedSingleColumnWindow;
+import org.apache.iotdb.db.query.udf.core.access.LayerPointReaderBackedSingleColumnRow;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader;
+import org.apache.iotdb.db.query.udf.datastructure.tv.ElasticSerializableTVList;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
-public class SingleInputSingleOutputIntermediateLayer implements IntermediateLayer {
+import java.io.IOException;
+
+public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer {
 
   private final LayerPointReader parentLayerPointReader;
 
-  public SingleInputSingleOutputIntermediateLayer(LayerPointReader parentLayerPointReader) {
+  public SingleInputSingleOutputIntermediateLayer(
+      long queryId, float memoryBudgetInMB, LayerPointReader parentLayerPointReader) {
+    super(queryId, memoryBudgetInMB);
     this.parentLayerPointReader = parentLayerPointReader;
   }
 
@@ -33,4 +48,151 @@ public class SingleInputSingleOutputIntermediateLayer implements IntermediateLay
   public LayerPointReader constructPointReader() {
     return parentLayerPointReader;
   }
+
+  @Override
+  public LayerRowReader constructRowReader() {
+
+    return new LayerRowReader() {
+
+      private final Row row = new LayerPointReaderBackedSingleColumnRow(parentLayerPointReader);
+      private final TSDataType[] dataTypes =
+          new TSDataType[] {parentLayerPointReader.getDataType()};
+      private boolean hasCached = false;
+
+      @Override
+      public boolean next() throws IOException, QueryProcessException {
+        if (hasCached) {
+          return true;
+        }
+        hasCached = parentLayerPointReader.next();
+        return hasCached;
+      }
+
+      @Override
+      public void readyForNext() {
+        parentLayerPointReader.readyForNext();
+        hasCached = false;
+      }
+
+      @Override
+      public TSDataType[] getDataTypes() {
+        return dataTypes;
+      }
+
+      @Override
+      public long currentTime() throws IOException {
+        return parentLayerPointReader.currentTime();
+      }
+
+      @Override
+      public Row currentRow() {
+        return row;
+      }
+    };
+  }
+
+  @Override
+  protected LayerRowWindowReader constructRowSlidingSizeWindowReader(
+      SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB)
+      throws QueryProcessException {
+
+    return new LayerRowWindowReader() {
+
+      private final int windowSize = strategy.getWindowSize();
+      private final int slidingStep = strategy.getSlidingStep();
+
+      private final TSDataType dataType = parentLayerPointReader.getDataType();
+      private final ElasticSerializableTVList tvList =
+          ElasticSerializableTVList.newElasticSerializableTVList(
+              dataType, queryId, memoryBudgetInMB, 2);
+      private final ElasticSerializableTVListBackedSingleColumnWindow window =
+          new ElasticSerializableTVListBackedSingleColumnWindow(tvList);
+
+      private boolean hasCached = false;
+      private int beginIndex = -slidingStep;
+
+      @Override
+      public boolean next() throws IOException, QueryProcessException {
+        if (hasCached) {
+          return true;
+        }
+
+        beginIndex += slidingStep;
+        int endIndex = beginIndex + windowSize;
+
+        int pointsToBeCollected = endIndex - tvList.size();
+        if (0 < pointsToBeCollected) {
+          hasCached = collectPoints(pointsToBeCollected) != 0;
+          window.seek(beginIndex, tvList.size());
+        } else {
+          hasCached = true;
+          window.seek(beginIndex, endIndex);
+        }
+
+        return hasCached;
+      }
+
+      /** @return number of actually collected, which may be less than or equals to pointNumber */
+      private int collectPoints(int pointNumber) throws QueryProcessException, IOException {
+        int count = 0;
+
+        while (parentLayerPointReader.next() && count < pointNumber) {
+          ++count;
+
+          switch (dataType) {
+            case INT32:
+              tvList.putInt(
+                  parentLayerPointReader.currentTime(), parentLayerPointReader.currentInt());
+              break;
+            case INT64:
+              tvList.putLong(
+                  parentLayerPointReader.currentTime(), parentLayerPointReader.currentLong());
+              break;
+            case FLOAT:
+              tvList.putFloat(
+                  parentLayerPointReader.currentTime(), parentLayerPointReader.currentFloat());
+              break;
+            case DOUBLE:
+              tvList.putDouble(
+                  parentLayerPointReader.currentTime(), parentLayerPointReader.currentDouble());
+              break;
+            case BOOLEAN:
+              tvList.putBoolean(
+                  parentLayerPointReader.currentTime(), parentLayerPointReader.currentBoolean());
+              break;
+            case TEXT:
+              tvList.putBinary(
+                  parentLayerPointReader.currentTime(), parentLayerPointReader.currentBinary());
+              break;
+            default:
+          }
+
+          parentLayerPointReader.readyForNext();
+        }
+
+        return count;
+      }
+
+      @Override
+      public void readyForNext() {
+        hasCached = false;
+      }
+
+      @Override
+      public TSDataType[] getDataTypes() {
+        return new TSDataType[] {dataType};
+      }
+
+      @Override
+      public RowWindow currentWindow() {
+        return window;
+      }
+    };
+  }
+
+  @Override
+  protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
+      SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+    return null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java
index a613c4c..3148dd4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java
@@ -31,8 +31,8 @@ import org.apache.iotdb.db.query.udf.api.access.RowWindow;
 import org.apache.iotdb.db.query.udf.api.customizer.strategy.AccessStrategy;
 import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
 import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
-import org.apache.iotdb.db.query.udf.core.access.RowImpl;
-import org.apache.iotdb.db.query.udf.core.access.RowWindowImpl;
+import org.apache.iotdb.db.query.udf.core.access.MultiColumnRow;
+import org.apache.iotdb.db.query.udf.core.access.MultiColumnWindow;
 import org.apache.iotdb.db.query.udf.core.layer.SafetyLine.SafetyPile;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
@@ -68,7 +68,7 @@ public class UDFLayer {
       List<TSDataType> dataTypes,
       List<ManagedSeriesReader> readers)
       throws QueryProcessException, IOException, InterruptedException {
-    constructInputLayer(
+    construct(
         queryId,
         memoryBudgetInMB,
         new RawQueryDataSetWithoutValueFilter(queryId, paths, dataTypes, readers, true));
@@ -84,14 +84,18 @@ public class UDFLayer {
       List<IReaderByTimestamp> readers,
       List<Boolean> cached)
       throws QueryProcessException {
-    constructInputLayer(
+    construct(
         queryId,
         memoryBudgetInMB,
         new RawQueryDataSetWithValueFilter(paths, dataTypes, timeGenerator, readers, cached, true));
   }
 
-  private void constructInputLayer(
-      long queryId, float memoryBudgetInMB, UDFInputDataSet queryDataSet)
+  public UDFLayer(long queryId, float memoryBudgetInMB, UDFInputDataSet queryDataSet)
+      throws QueryProcessException {
+    construct(queryId, memoryBudgetInMB, queryDataSet);
+  }
+
+  private void construct(long queryId, float memoryBudgetInMB, UDFInputDataSet queryDataSet)
       throws QueryProcessException {
     this.queryId = queryId;
     this.queryDataSet = queryDataSet;
@@ -242,7 +246,7 @@ public class UDFLayer {
     private boolean hasCachedRowRecord;
     private Object[] cachedRowRecord;
 
-    private final RowImpl row;
+    private final MultiColumnRow row;
 
     public InputLayerRowReader(int[] columnIndexes) {
       safetyPile = safetyLine.addSafetyPile();
@@ -253,7 +257,7 @@ public class UDFLayer {
       hasCachedRowRecord = false;
       cachedRowRecord = null;
 
-      row = new RowImpl(columnIndexes, dataTypes);
+      row = new MultiColumnRow(columnIndexes, dataTypes);
     }
 
     @Override
@@ -321,7 +325,7 @@ public class UDFLayer {
 
     private final int windowSize;
     private final IntList rowIndexes;
-    private final RowWindowImpl rowWindow;
+    private final MultiColumnWindow rowWindow;
 
     private final int slidingStep;
 
@@ -343,7 +347,7 @@ public class UDFLayer {
           windowSize < SerializableIntList.calculateCapacity(memoryBudgetInMB)
               ? new WrappedIntArray(windowSize)
               : new ElasticSerializableIntList(queryId, memoryBudgetInMB, 2);
-      rowWindow = new RowWindowImpl(rowRecordList, columnIndexes, dataTypes, rowIndexes);
+      rowWindow = new MultiColumnWindow(rowRecordList, columnIndexes, dataTypes, rowIndexes);
 
       slidingStep = accessStrategy.getSlidingStep();
 
@@ -452,7 +456,7 @@ public class UDFLayer {
     private final long displayWindowEnd;
 
     private final IntList rowIndexes;
-    private final RowWindowImpl rowWindow;
+    private final MultiColumnWindow rowWindow;
 
     private long nextWindowTimeBegin;
     private int nextIndexBegin;
@@ -475,7 +479,7 @@ public class UDFLayer {
       displayWindowEnd = accessStrategy.getDisplayWindowEnd();
 
       rowIndexes = new ElasticSerializableIntList(queryId, memoryBudgetInMB, 2);
-      rowWindow = new RowWindowImpl(rowRecordList, columnIndexes, dataTypes, rowIndexes);
+      rowWindow = new MultiColumnWindow(rowRecordList, columnIndexes, dataTypes, rowIndexes);
 
       nextWindowTimeBegin = accessStrategy.getDisplayWindowBegin();
       nextIndexBegin = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/reader/LayerRowReader.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/reader/LayerRowReader.java
index 413a24d..0bc6521 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/reader/LayerRowReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/reader/LayerRowReader.java
@@ -33,7 +33,7 @@ public interface LayerRowReader {
 
   TSDataType[] getDataTypes();
 
-  long currentTime();
+  long currentTime() throws IOException;
 
   Row currentRow();
 }

[iotdb] 03/04: SingleInputMultiOutputIntermediateLayer & SingleInputSingleOutputIntermediateLayer

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3631c85afc31be9bdf1bf4b59c0c6e7e618caa4c
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon Sep 6 15:43:30 2021 +0800

    SingleInputMultiOutputIntermediateLayer & SingleInputSingleOutputIntermediateLayer
---
 .../db/query/udf/core/layer/LayerCacheUtils.java   |  81 +++++++
 .../SingleInputMultiOutputIntermediateLayer.java   | 248 ++++++++++++++++-----
 .../SingleInputSingleOutputIntermediateLayer.java  |  89 ++------
 3 files changed, 303 insertions(+), 115 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerCacheUtils.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerCacheUtils.java
new file mode 100644
index 0000000..5801eef
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerCacheUtils.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.layer;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.datastructure.tv.ElasticSerializableTVList;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.io.IOException;
+
+public class LayerCacheUtils {
+
+  private LayerCacheUtils() {}
+
+  /** @return number of actually collected, which may be less than or equals to pointNumber */
+  public static int cachePoints(
+      TSDataType dataType,
+      LayerPointReader source,
+      ElasticSerializableTVList target,
+      int pointNumber)
+      throws QueryProcessException, IOException {
+    int count = 0;
+    while (count < pointNumber && cachePoint(dataType, source, target)) {
+      ++count;
+    }
+    return count;
+  }
+
+  public static boolean cachePoint(
+      TSDataType dataType, LayerPointReader source, ElasticSerializableTVList target)
+      throws IOException, QueryProcessException {
+    if (!source.next()) {
+      return false;
+    }
+
+    switch (dataType) {
+      case INT32:
+        target.putInt(source.currentTime(), source.currentInt());
+        break;
+      case INT64:
+        target.putLong(source.currentTime(), source.currentLong());
+        break;
+      case FLOAT:
+        target.putFloat(source.currentTime(), source.currentFloat());
+        break;
+      case DOUBLE:
+        target.putDouble(source.currentTime(), source.currentDouble());
+        break;
+      case BOOLEAN:
+        target.putBoolean(source.currentTime(), source.currentBoolean());
+        break;
+      case TEXT:
+        target.putBinary(source.currentTime(), source.currentBinary());
+        break;
+      default:
+        throw new UnsupportedOperationException(dataType.name());
+    }
+
+    source.readyForNext();
+
+    return true;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java
index 17467b8..b243e54 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java
@@ -20,8 +20,12 @@
 package org.apache.iotdb.db.query.udf.core.layer;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.access.RowWindow;
 import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
 import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.core.access.ElasticSerializableTVListBackedSingleColumnRow;
+import org.apache.iotdb.db.query.udf.core.access.ElasticSerializableTVListBackedSingleColumnWindow;
 import org.apache.iotdb.db.query.udf.core.layer.SafetyLine.SafetyPile;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
@@ -66,20 +70,9 @@ public class SingleInputMultiOutputIntermediateLayer extends IntermediateLayer {
 
       @Override
       public boolean next() throws QueryProcessException, IOException {
-        if (hasCached) {
-          return true;
-        }
-
-        if (currentPointIndex < tvList.size() - 1) {
-          ++currentPointIndex;
-          hasCached = true;
-        }
-
-        // tvList.size() - 1 <= currentPointIndex
-        if (!hasCached && parentLayerPointReader.next()) {
-          cachePoint();
-          parentLayerPointReader.readyForNext();
-
+        if (!hasCached
+            && (currentPointIndex < tvList.size() - 1
+                || LayerCacheUtils.cachePoint(dataType, parentLayerPointReader, tvList))) {
           ++currentPointIndex;
           hasCached = true;
         }
@@ -87,43 +80,11 @@ public class SingleInputMultiOutputIntermediateLayer extends IntermediateLayer {
         return hasCached;
       }
 
-      private void cachePoint() throws IOException, QueryProcessException {
-        switch (dataType) {
-          case INT32:
-            tvList.putInt(
-                parentLayerPointReader.currentTime(), parentLayerPointReader.currentInt());
-            break;
-          case INT64:
-            tvList.putLong(
-                parentLayerPointReader.currentTime(), parentLayerPointReader.currentLong());
-            break;
-          case FLOAT:
-            tvList.putFloat(
-                parentLayerPointReader.currentTime(), parentLayerPointReader.currentFloat());
-            break;
-          case DOUBLE:
-            tvList.putDouble(
-                parentLayerPointReader.currentTime(), parentLayerPointReader.currentDouble());
-            break;
-          case BOOLEAN:
-            tvList.putBoolean(
-                parentLayerPointReader.currentTime(), parentLayerPointReader.currentBoolean());
-            break;
-          case TEXT:
-            tvList.putBinary(
-                parentLayerPointReader.currentTime(), parentLayerPointReader.currentBinary());
-            break;
-          default:
-            throw new UnsupportedOperationException(dataType.name());
-        }
-      }
-
       @Override
       public void readyForNext() {
         hasCached = false;
 
         safetyPile.moveForwardTo(currentPointIndex + 1);
-        // todo: reduce the update rate
         tvList.setEvictionUpperBound(safetyLine.getSafetyLine());
       }
 
@@ -171,18 +132,205 @@ public class SingleInputMultiOutputIntermediateLayer extends IntermediateLayer {
 
   @Override
   public LayerRowReader constructRowReader() {
-    return null;
+
+    return new LayerRowReader() {
+
+      private final SafetyPile safetyPile = safetyLine.addSafetyPile();
+      private final ElasticSerializableTVListBackedSingleColumnRow row =
+          new ElasticSerializableTVListBackedSingleColumnRow(tvList, -1);
+
+      private boolean hasCached = false;
+      private int currentRowIndex = -1;
+
+      @Override
+      public boolean next() throws QueryProcessException, IOException {
+        if (!hasCached
+            && ((currentRowIndex < tvList.size() - 1)
+                || LayerCacheUtils.cachePoint(dataType, parentLayerPointReader, tvList))) {
+          row.seek(++currentRowIndex);
+          hasCached = true;
+        }
+
+        return hasCached;
+      }
+
+      @Override
+      public void readyForNext() {
+        hasCached = false;
+
+        safetyPile.moveForwardTo(currentRowIndex + 1);
+        tvList.setEvictionUpperBound(safetyLine.getSafetyLine());
+      }
+
+      @Override
+      public TSDataType[] getDataTypes() {
+        return new TSDataType[] {dataType};
+      }
+
+      @Override
+      public long currentTime() throws IOException {
+        return row.getTime();
+      }
+
+      @Override
+      public Row currentRow() {
+        return row;
+      }
+    };
   }
 
   @Override
   protected LayerRowWindowReader constructRowSlidingSizeWindowReader(
       SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB) {
-    return null;
+
+    return new LayerRowWindowReader() {
+
+      private final int windowSize = strategy.getWindowSize();
+      private final int slidingStep = strategy.getSlidingStep();
+
+      private final SafetyPile safetyPile = safetyLine.addSafetyPile();
+      private final ElasticSerializableTVListBackedSingleColumnWindow window =
+          new ElasticSerializableTVListBackedSingleColumnWindow(tvList);
+
+      private boolean hasCached = false;
+      private int beginIndex = -slidingStep;
+
+      @Override
+      public boolean next() throws IOException, QueryProcessException {
+        if (hasCached) {
+          return true;
+        }
+
+        beginIndex += slidingStep;
+        int endIndex = beginIndex + windowSize;
+
+        int pointsToBeCollected = endIndex - tvList.size();
+        if (0 < pointsToBeCollected) {
+          hasCached =
+              LayerCacheUtils.cachePoints(
+                      dataType, parentLayerPointReader, tvList, pointsToBeCollected)
+                  != 0;
+          window.seek(beginIndex, tvList.size());
+        } else {
+          hasCached = true;
+          window.seek(beginIndex, endIndex);
+        }
+
+        return hasCached;
+      }
+
+      @Override
+      public void readyForNext() {
+        hasCached = false;
+
+        safetyPile.moveForwardTo(beginIndex + 1);
+        tvList.setEvictionUpperBound(safetyLine.getSafetyLine());
+      }
+
+      @Override
+      public TSDataType[] getDataTypes() {
+        return new TSDataType[] {dataType};
+      }
+
+      @Override
+      public RowWindow currentWindow() {
+        return window;
+      }
+    };
   }
 
   @Override
   protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
-      SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
-    return null;
+      SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
+      throws IOException, QueryProcessException {
+
+    final long timeInterval = strategy.getTimeInterval();
+    final long slidingStep = strategy.getSlidingStep();
+    final long displayWindowEnd = strategy.getDisplayWindowEnd();
+
+    final SafetyPile safetyPile = safetyLine.addSafetyPile();
+    final ElasticSerializableTVListBackedSingleColumnWindow window =
+        new ElasticSerializableTVListBackedSingleColumnWindow(tvList);
+
+    long nextWindowTimeBeginGivenByStrategy = strategy.getDisplayWindowBegin();
+    if (tvList.size() == 0
+        && LayerCacheUtils.cachePoint(dataType, parentLayerPointReader, tvList)
+        && nextWindowTimeBeginGivenByStrategy == Long.MIN_VALUE) {
+      // display window begin should be set to the same as the min timestamp of the query result
+      // set
+      nextWindowTimeBeginGivenByStrategy = tvList.getTime(0);
+    }
+    long finalNextWindowTimeBeginGivenByStrategy = nextWindowTimeBeginGivenByStrategy;
+
+    final boolean hasAtLeastOneRow = tvList.size() != 0;
+
+    return new LayerRowWindowReader() {
+
+      private long nextWindowTimeBegin = finalNextWindowTimeBeginGivenByStrategy;
+      private int nextIndexBegin = 0;
+
+      @Override
+      public boolean next() throws IOException, QueryProcessException {
+        if (displayWindowEnd <= nextWindowTimeBegin) {
+          return false;
+        }
+        if (!hasAtLeastOneRow || 0 < tvList.size()) {
+          return true;
+        }
+
+        long nextWindowTimeEnd = Math.min(nextWindowTimeBegin + timeInterval, displayWindowEnd);
+        int oldTVListSize = tvList.size();
+        while (tvList.getTime(tvList.size() - 1) < nextWindowTimeEnd) {
+          if (!LayerCacheUtils.cachePoint(dataType, parentLayerPointReader, tvList)) {
+            if (displayWindowEnd == Long.MAX_VALUE
+                // display window end == the max timestamp of the query result set
+                && oldTVListSize == tvList.size()) {
+              return false;
+            } else {
+              break;
+            }
+          }
+        }
+
+        for (int i = nextIndexBegin; i < tvList.size(); ++i) {
+          if (nextWindowTimeBegin <= tvList.getTime(i)) {
+            nextIndexBegin = i;
+            break;
+          }
+          if (i == tvList.size() - 1) {
+            nextIndexBegin = tvList.size();
+          }
+        }
+
+        int nextIndexEnd = tvList.size();
+        for (int i = nextIndexBegin; i < tvList.size(); ++i) {
+          if (nextWindowTimeEnd <= tvList.getTime(i)) {
+            nextIndexEnd = i;
+            break;
+          }
+        }
+        window.seek(nextIndexBegin, nextIndexEnd);
+
+        return true;
+      }
+
+      @Override
+      public void readyForNext() {
+        nextWindowTimeBegin += slidingStep;
+
+        safetyPile.moveForwardTo(nextIndexBegin + 1);
+        tvList.setEvictionUpperBound(safetyLine.getSafetyLine());
+      }
+
+      @Override
+      public TSDataType[] getDataTypes() {
+        return new TSDataType[] {dataType};
+      }
+
+      @Override
+      public RowWindow currentWindow() {
+        return window;
+      }
+    };
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
index 452ad3f..86111c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
@@ -57,28 +57,27 @@ public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer
     return new LayerRowReader() {
 
       private final Row row = new LayerPointReaderBackedSingleColumnRow(parentLayerPointReader);
-      private final TSDataType[] dataTypes =
-          new TSDataType[] {parentLayerPointReader.getDataType()};
+
       private boolean hasCached = false;
 
       @Override
       public boolean next() throws IOException, QueryProcessException {
-        if (hasCached) {
-          return true;
+        if (!hasCached) {
+          hasCached = parentLayerPointReader.next();
         }
-        hasCached = parentLayerPointReader.next();
         return hasCached;
       }
 
       @Override
       public void readyForNext() {
-        parentLayerPointReader.readyForNext();
         hasCached = false;
+
+        parentLayerPointReader.readyForNext();
       }
 
       @Override
       public TSDataType[] getDataTypes() {
-        return dataTypes;
+        return new TSDataType[] {parentLayerPointReader.getDataType()};
       }
 
       @Override
@@ -123,7 +122,10 @@ public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer
 
         int pointsToBeCollected = endIndex - tvList.size();
         if (0 < pointsToBeCollected) {
-          hasCached = collectPoints(pointsToBeCollected, tvList) != 0;
+          hasCached =
+              LayerCacheUtils.cachePoints(
+                      dataType, parentLayerPointReader, tvList, pointsToBeCollected)
+                  != 0;
           window.seek(beginIndex, tvList.size());
         } else {
           hasCached = true;
@@ -166,14 +168,12 @@ public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer
         new ElasticSerializableTVListBackedSingleColumnWindow(tvList);
 
     long nextWindowTimeBeginGivenByStrategy = strategy.getDisplayWindowBegin();
-    if (tvList.size() == 0 && parentLayerPointReader.next()) {
-      collectPoints(1, tvList);
-
-      if (nextWindowTimeBeginGivenByStrategy == Long.MIN_VALUE) {
-        // display window begin should be set to the same as the min timestamp of the query result
-        // set
-        nextWindowTimeBeginGivenByStrategy = tvList.getTime(0);
-      }
+    if (tvList.size() == 0
+        && LayerCacheUtils.cachePoint(dataType, parentLayerPointReader, tvList)
+        && nextWindowTimeBeginGivenByStrategy == Long.MIN_VALUE) {
+      // display window begin should be set to the same as the min timestamp of the query result
+      // set
+      nextWindowTimeBeginGivenByStrategy = tvList.getTime(0);
     }
     long finalNextWindowTimeBeginGivenByStrategy = nextWindowTimeBeginGivenByStrategy;
 
@@ -196,14 +196,14 @@ public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer
         long nextWindowTimeEnd = Math.min(nextWindowTimeBegin + timeInterval, displayWindowEnd);
         int oldTVListSize = tvList.size();
         while (tvList.getTime(tvList.size() - 1) < nextWindowTimeEnd) {
-          if (parentLayerPointReader.next()) {
-            collectPoints(1, tvList);
-          } else if (displayWindowEnd == Long.MAX_VALUE
-              // display window end == the max timestamp of the query result set
-              && oldTVListSize == tvList.size()) {
-            return false;
-          } else {
-            break;
+          if (!LayerCacheUtils.cachePoint(dataType, parentLayerPointReader, tvList)) {
+            if (displayWindowEnd == Long.MAX_VALUE
+                // display window end == the max timestamp of the query result set
+                && oldTVListSize == tvList.size()) {
+              return false;
+            } else {
+              break;
+            }
           }
         }
 
@@ -245,45 +245,4 @@ public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer
       }
     };
   }
-
-  /** @return number of actually collected, which may be less than or equals to pointNumber */
-  private int collectPoints(int pointNumber, ElasticSerializableTVList tvList)
-      throws QueryProcessException, IOException {
-    int count = 0;
-
-    while (parentLayerPointReader.next() && count < pointNumber) {
-      ++count;
-
-      switch (dataType) {
-        case INT32:
-          tvList.putInt(parentLayerPointReader.currentTime(), parentLayerPointReader.currentInt());
-          break;
-        case INT64:
-          tvList.putLong(
-              parentLayerPointReader.currentTime(), parentLayerPointReader.currentLong());
-          break;
-        case FLOAT:
-          tvList.putFloat(
-              parentLayerPointReader.currentTime(), parentLayerPointReader.currentFloat());
-          break;
-        case DOUBLE:
-          tvList.putDouble(
-              parentLayerPointReader.currentTime(), parentLayerPointReader.currentDouble());
-          break;
-        case BOOLEAN:
-          tvList.putBoolean(
-              parentLayerPointReader.currentTime(), parentLayerPointReader.currentBoolean());
-          break;
-        case TEXT:
-          tvList.putBinary(
-              parentLayerPointReader.currentTime(), parentLayerPointReader.currentBinary());
-          break;
-        default:
-      }
-
-      parentLayerPointReader.readyForNext();
-    }
-
-    return count;
-  }
 }

[iotdb] 04/04: rename

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 205c3baf63d840cb50d68f0c53aa24259dd047eb
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon Sep 6 16:14:19 2021 +0800

    rename
---
 .../apache/iotdb/db/query/expression/binary/BinaryExpression.java    | 4 ++--
 .../apache/iotdb/db/query/expression/unary/FunctionExpression.java   | 5 +++--
 .../apache/iotdb/db/query/expression/unary/NegationExpression.java   | 4 ++--
 .../apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java    | 4 ++--
 ...yer.java => MultiInputColumnMultiReferenceIntermediateLayer.java} | 4 ++--
 ...er.java => MultiInputColumnSingleReferenceIntermediateLayer.java} | 4 ++--
 ...er.java => SingleInputColumnMultiReferenceIntermediateLayer.java} | 4 ++--
 ...r.java => SingleInputColumnSingleReferenceIntermediateLayer.java} | 4 ++--
 8 files changed, 17 insertions(+), 16 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index b793ef9..feb51d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
-import org.apache.iotdb.db.query.udf.core.layer.SingleInputMultiOutputIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnMultiReferenceIntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticBinaryTransformer;
@@ -126,7 +126,7 @@ public abstract class BinaryExpression extends Expression {
 
       expressionIntermediateLayerMap.put(
           this,
-          new SingleInputMultiOutputIntermediateLayer(
+          new SingleInputColumnMultiReferenceIntermediateLayer(
               constructTransformer(
                   leftParentIntermediateLayer.constructPointReader(),
                   rightParentIntermediateLayer.constructPointReader()),
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index d484769..10b3811 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.db.qp.strategy.optimizer.ConcatPathOptimizer;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
-import org.apache.iotdb.db.query.udf.core.layer.MultiInputMultiOutputIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.MultiInputColumnMultiReferenceIntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 
@@ -165,7 +165,8 @@ public class FunctionExpression extends Expression {
       }
 
       expressionIntermediateLayerMap.put(
-          this, new MultiInputMultiOutputIntermediateLayer(parentLayerPointReaders, -1, -1));
+          this,
+          new MultiInputColumnMultiReferenceIntermediateLayer(parentLayerPointReaders, -1, -1));
     }
 
     return expressionIntermediateLayerMap.get(this);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index 326d1af..57a7a69 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
-import org.apache.iotdb.db.query.udf.core.layer.SingleInputMultiOutputIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnMultiReferenceIntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
 import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticNegationTransformer;
 
@@ -89,7 +89,7 @@ public class NegationExpression extends Expression {
 
       expressionIntermediateLayerMap.put(
           this,
-          new SingleInputMultiOutputIntermediateLayer(
+          new SingleInputColumnMultiReferenceIntermediateLayer(
               new ArithmeticNegationTransformer(parentIntermediateLayer.constructPointReader()),
               -1,
               -1));
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index a67e621..15a24e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
-import org.apache.iotdb.db.query.udf.core.layer.SingleInputMultiOutputIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnMultiReferenceIntermediateLayer;
 import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
@@ -80,7 +80,7 @@ public class TimeSeriesOperand extends Expression {
     if (!expressionIntermediateLayerMap.containsKey(this)) {
       expressionIntermediateLayerMap.put(
           this,
-          new SingleInputMultiOutputIntermediateLayer(
+          new SingleInputColumnMultiReferenceIntermediateLayer(
               rawTimeSeriesInputLayer.constructPointReader(
                   udtfPlan.getReaderIndex(path.getFullPath())),
               -1,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputMultiOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnMultiReferenceIntermediateLayer.java
similarity index 92%
rename from server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputMultiOutputIntermediateLayer.java
rename to server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnMultiReferenceIntermediateLayer.java
index 2a70a7b..861fd7d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputMultiOutputIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnMultiReferenceIntermediateLayer.java
@@ -27,9 +27,9 @@ import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader;
 
 import java.util.List;
 
-public class MultiInputMultiOutputIntermediateLayer extends IntermediateLayer {
+public class MultiInputColumnMultiReferenceIntermediateLayer extends IntermediateLayer {
 
-  public MultiInputMultiOutputIntermediateLayer(
+  public MultiInputColumnMultiReferenceIntermediateLayer(
       long queryId, float memoryBudgetInMB, List<LayerPointReader> parentLayerPointReaders) {
     super(queryId, memoryBudgetInMB);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputSingleOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnSingleReferenceIntermediateLayer.java
similarity index 92%
rename from server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputSingleOutputIntermediateLayer.java
rename to server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnSingleReferenceIntermediateLayer.java
index c63b1be..9ef769d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputSingleOutputIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputColumnSingleReferenceIntermediateLayer.java
@@ -27,9 +27,9 @@ import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader;
 
 import java.util.List;
 
-public class MultiInputSingleOutputIntermediateLayer extends IntermediateLayer {
+public class MultiInputColumnSingleReferenceIntermediateLayer extends IntermediateLayer {
 
-  public MultiInputSingleOutputIntermediateLayer(
+  public MultiInputColumnSingleReferenceIntermediateLayer(
       long queryId, float memoryBudgetInMB, List<LayerPointReader> parentLayerPointReaders) {
     super(queryId, memoryBudgetInMB);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnMultiReferenceIntermediateLayer.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java
rename to server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnMultiReferenceIntermediateLayer.java
index b243e54..7bde88c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnMultiReferenceIntermediateLayer.java
@@ -36,7 +36,7 @@ import org.apache.iotdb.tsfile.utils.Binary;
 
 import java.io.IOException;
 
-public class SingleInputMultiOutputIntermediateLayer extends IntermediateLayer {
+public class SingleInputColumnMultiReferenceIntermediateLayer extends IntermediateLayer {
 
   private static final int CACHE_BLOCK_SIZE = 2;
 
@@ -45,7 +45,7 @@ public class SingleInputMultiOutputIntermediateLayer extends IntermediateLayer {
   private final ElasticSerializableTVList tvList;
   private final SafetyLine safetyLine;
 
-  public SingleInputMultiOutputIntermediateLayer(
+  public SingleInputColumnMultiReferenceIntermediateLayer(
       long queryId, float memoryBudgetInMB, LayerPointReader parentLayerPointReader)
       throws QueryProcessException {
     super(queryId, memoryBudgetInMB);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
rename to server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java
index 86111c1..08946b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputColumnSingleReferenceIntermediateLayer.java
@@ -34,12 +34,12 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.io.IOException;
 
-public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer {
+public class SingleInputColumnSingleReferenceIntermediateLayer extends IntermediateLayer {
 
   private final LayerPointReader parentLayerPointReader;
   private final TSDataType dataType;
 
-  public SingleInputSingleOutputIntermediateLayer(
+  public SingleInputColumnSingleReferenceIntermediateLayer(
       long queryId, float memoryBudgetInMB, LayerPointReader parentLayerPointReader) {
     super(queryId, memoryBudgetInMB);
     this.parentLayerPointReader = parentLayerPointReader;