You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/08/22 12:23:41 UTC

[GitHub] [iotdb] ZhanGHanG9991 commented on a diff in pull request #7005: [IOTDB-4085] Add StateWindowAccessStrategy in UDF

ZhanGHanG9991 commented on code in PR #7005:
URL: https://github.com/apache/iotdb/pull/7005#discussion_r951373144


##########
server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java:
##########
@@ -530,4 +532,204 @@ public RowWindow currentWindow() {
       }
     };
   }
+
+  @Override
+  protected LayerRowWindowReader constructRowStateWindowReader(
+      StateWindowAccessStrategy strategy, float memoryBudgetInMB) {
+
+    final long displayWindowBegin = strategy.getDisplayWindowBegin();
+    final long displayWindowEnd = strategy.getDisplayWindowEnd();
+    final double delta = strategy.getDelta();
+
+    final ElasticSerializableTVList tvList =
+        ElasticSerializableTVList.newElasticSerializableTVList(
+            dataType, queryId, memoryBudgetInMB, CACHE_BLOCK_SIZE);
+    final ElasticSerializableTVListBackedSingleColumnWindow window =
+        new ElasticSerializableTVListBackedSingleColumnWindow(tvList);
+
+    return new LayerRowWindowReader() {
+
+      private boolean isFirstIteration = true;
+      private boolean hasAtLeastOneRow = false;
+
+      private long nextWindowTimeBegin = displayWindowBegin;
+      private long nextWindowTimeEnd = 0;
+      private int nextIndexBegin = 0;
+      private int nextIndexEnd = 1;
+
+      private ValueRecorder valueRecorder = new ValueRecorder();
+
+      @Override
+      public YieldableState yield() throws IOException, QueryProcessException {
+        if (isFirstIteration) {
+          if (tvList.size() == 0) {
+            final YieldableState yieldableState =
+                LayerCacheUtils.yieldPoint(dataType, parentLayerPointReader, tvList);
+            if (yieldableState != YieldableState.YIELDABLE) {
+              return yieldableState;
+            }
+          }
+          nextWindowTimeBegin = Math.max(displayWindowBegin, tvList.getTime(0));
+          hasAtLeastOneRow = tvList.size() != 0;
+          isFirstIteration = false;
+        }
+
+        if (!hasAtLeastOneRow || displayWindowEnd <= nextWindowTimeBegin) {
+          return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+        }
+
+        while (tvList.getTime(tvList.size() - 1) < displayWindowEnd) {
+          final YieldableState yieldableState =
+              LayerCacheUtils.yieldPoint(dataType, parentLayerPointReader, tvList);
+          if (yieldableState == YieldableState.YIELDABLE) {
+            if (tvList.getTime(tvList.size() - 2) >= displayWindowBegin
+                && splitWindowForStateWindow(valueRecorder, delta, tvList)) {
+              nextIndexEnd = tvList.size() - 1;
+              break;
+            } else {
+              nextIndexEnd++;
+            }
+          } else if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
+            return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
+          } else if (yieldableState == YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
+            nextIndexEnd = tvList.size();
+            break;
+          }
+        }
+
+        nextWindowTimeEnd = tvList.getTime(nextIndexEnd - 1);
+
+        if (nextIndexBegin == nextIndexEnd) {
+          return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+        }
+
+        // Only if encounter user set the strategy's displayWindowBegin, which will go into the for
+        // loop to find the true index of the first window begin.
+        // For other situation, we will only go into if (nextWindowTimeBegin <= tvList.getTime(i))
+        // once.
+        for (int i = nextIndexBegin; i < tvList.size(); ++i) {
+          if (nextWindowTimeBegin <= tvList.getTime(i)) {
+            nextIndexBegin = i;
+            break;
+          }
+          // The first window's beginning time is greater than all the timestamp of the query result
+          // set
+          if (i == tvList.size() - 1) {
+            return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+          }
+        }
+
+        window.seek(nextIndexBegin, nextIndexEnd, nextWindowTimeBegin, nextWindowTimeEnd);
+
+        return YieldableState.YIELDABLE;
+      }
+
+      @Override
+      public boolean next() throws IOException, QueryProcessException {
+        return false;
+      }
+
+      @Override
+      public void readyForNext() throws IOException {
+        if (nextIndexEnd < tvList.size()) {
+          nextWindowTimeBegin = tvList.getTime(nextIndexEnd);
+        }
+        tvList.setEvictionUpperBound(nextIndexBegin + 1);
+        nextIndexBegin = nextIndexEnd;
+      }
+
+      @Override
+      public TSDataType[] getDataTypes() {
+        return new TSDataType[] {dataType};
+      }
+
+      @Override
+      public RowWindow currentWindow() {
+        return window;
+      }
+    };
+  }
+
+  boolean splitWindowForStateWindow(

Review Comment:
   I've moved it to TransformUtils.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org