You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/14 07:05:18 UTC

[iotdb] 05/05: modify WindowSplitOperator

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit cf1cffa7f9bad2b738a27d902a07e4a4b32558d3
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Nov 14 15:04:54 2022 +0800

    modify WindowSplitOperator
---
 .../main/java/org/apache/iotdb/SessionExample.java |  4 +--
 .../operator/process/WindowConcatOperator.java     | 14 ++++-----
 .../operator/process/WindowSplitOperator.java      | 33 ++++++++++++++++------
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 17 +++++++++--
 4 files changed, 49 insertions(+), 19 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 8760400c91..9baf3a8ad2 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -73,9 +73,9 @@ public class SessionExample {
     session.setFetchSize(10000);
 
     List<String> queryPaths = Arrays.asList("root.sg1.d1.s1", "root.sg1.d2.s1");
-    List<Integer> indexes = Arrays.asList(1, 3, 5, 7);
+    List<Integer> indexes = Arrays.asList(0, 1, 2, 3);
     List<SessionDataSet> windowBatch =
-        session.fetchWindowBatch(queryPaths, null, 1, 40, 2, 2, indexes);
+        session.fetchWindowBatch(queryPaths, null, 0, 32, 4, 3, indexes);
     for (SessionDataSet window : windowBatch) {
       System.out.println(window.getColumnNames());
       while (window.hasNext()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java
index 1f40016f9a..269b579feb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java
@@ -55,36 +55,36 @@ public class WindowConcatOperator implements ProcessOperator {
 
   @Override
   public OperatorContext getOperatorContext() {
-    return null;
+    return operatorContext;
   }
 
   @Override
   public TsBlock next() {
-    return null;
+    return child.next();
   }
 
   @Override
   public boolean hasNext() {
-    return false;
+    return child.hasNext();
   }
 
   @Override
   public boolean isFinished() {
-    return false;
+    return child.isFinished();
   }
 
   @Override
   public long calculateMaxPeekMemory() {
-    return 0;
+    return child.calculateMaxPeekMemory();
   }
 
   @Override
   public long calculateMaxReturnSize() {
-    return 0;
+    return child.calculateMaxReturnSize();
   }
 
   @Override
   public long calculateRetainedSizeAfterCallingNext() {
-    return 0;
+    return child.calculateRetainedSizeAfterCallingNext();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
index f2ee804f75..6b9544b1a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
@@ -42,18 +42,23 @@ public class WindowSplitOperator implements ProcessOperator {
   protected TsBlock inputTsBlock;
   protected boolean canCallNext;
 
-  private final ITimeRangeIterator sampleTimeRangeSliceIterator;
+  private final ITimeRangeIterator sampleTimeRangeIterator;
   private TimeRange curTimeRange;
 
+  private final ITimeRangeIterator sampleTimeRangeSliceIterator;
+  private TimeRange curTimeRangeSlice;
+
   private final TsBlockBuilder resultTsBlockBuilder;
 
   public WindowSplitOperator(
       OperatorContext operatorContext,
       Operator child,
+      ITimeRangeIterator sampleTimeRangeIterator,
       ITimeRangeIterator sampleTimeRangeSliceIterator,
       List<TSDataType> outputDataTypes) {
     this.operatorContext = operatorContext;
     this.child = child;
+    this.sampleTimeRangeIterator = sampleTimeRangeIterator;
     this.sampleTimeRangeSliceIterator = sampleTimeRangeSliceIterator;
     this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
   }
@@ -73,15 +78,27 @@ public class WindowSplitOperator implements ProcessOperator {
     // reset operator state
     canCallNext = true;
 
-    if (curTimeRange == null && sampleTimeRangeSliceIterator.hasNextTimeRange()) {
-      // move to next time window
-      curTimeRange = sampleTimeRangeSliceIterator.nextTimeRange();
+    if (curTimeRange == null && sampleTimeRangeIterator.hasNextTimeRange()) {
+      curTimeRange = sampleTimeRangeIterator.nextTimeRange();
+    }
+
+    while (curTimeRangeSlice == null && sampleTimeRangeSliceIterator.hasNextTimeRange()) {
+      curTimeRangeSlice = sampleTimeRangeSliceIterator.nextTimeRange();
+      if (curTimeRangeSlice.getMin() > curTimeRange.getMax()) {
+        if (sampleTimeRangeIterator.hasNextTimeRange()) {
+          curTimeRange = sampleTimeRangeIterator.nextTimeRange();
+        }
+        if (curTimeRangeSlice.getMin() > curTimeRange.getMax()
+            || curTimeRangeSlice.getMax() < curTimeRange.getMin()) {
+          curTimeRangeSlice = null;
+        }
+      }
     }
 
     if (!fetchData()) {
       return null;
     } else {
-      curTimeRange = null;
+      curTimeRangeSlice = null;
       TsBlock resultTsBlock = resultTsBlockBuilder.build();
       resultTsBlockBuilder.reset();
       return resultTsBlock;
@@ -106,14 +123,14 @@ public class WindowSplitOperator implements ProcessOperator {
       return false;
     }
 
-    inputTsBlock = TsBlockUtil.skipPointsOutOfTimeRange(inputTsBlock, curTimeRange, true);
+    inputTsBlock = TsBlockUtil.skipPointsOutOfTimeRange(inputTsBlock, curTimeRangeSlice, true);
     if (inputTsBlock == null) {
       return false;
     }
 
     for (int readIndex = 0; readIndex < inputTsBlock.getPositionCount(); readIndex++) {
       long time = inputTsBlock.getTimeByIndex(readIndex);
-      if (curTimeRange.contains(time)) {
+      if (curTimeRangeSlice.contains(time)) {
         writeData(readIndex);
       } else {
         inputTsBlock = inputTsBlock.subTsBlock(readIndex);
@@ -135,7 +152,7 @@ public class WindowSplitOperator implements ProcessOperator {
 
   @Override
   public boolean hasNext() {
-    return curTimeRange != null || sampleTimeRangeSliceIterator.hasNextTimeRange();
+    return curTimeRangeSlice != null || sampleTimeRangeSliceIterator.hasNextTimeRange();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 888c3f94a7..d839451544 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -1592,7 +1592,15 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                 WindowSplitOperator.class.getSimpleName());
 
     GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
-    ITimeRangeIterator timeRangeIterator =
+    ITimeRangeIterator sampleTimeRangeIterator =
+        TimeRangeIteratorFactory.getSampleTimeRangeIterator(
+            groupByTimeParameter.getStartTime(),
+            groupByTimeParameter.getEndTime(),
+            groupByTimeParameter.getInterval(),
+            groupByTimeParameter.getSlidingStep(),
+            node.getSamplingIndexes(),
+            false);
+    ITimeRangeIterator sampleTimeRangeSliceIterator =
         TimeRangeIteratorFactory.getSampleTimeRangeIterator(
             groupByTimeParameter.getStartTime(),
             groupByTimeParameter.getEndTime(),
@@ -1604,7 +1612,12 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     List<TSDataType> outputDataTypes = getOutputColumnTypes(node, context.getTypeProvider());
 
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
-    return new WindowSplitOperator(operatorContext, child, timeRangeIterator, outputDataTypes);
+    return new WindowSplitOperator(
+        operatorContext,
+        child,
+        sampleTimeRangeIterator,
+        sampleTimeRangeSliceIterator,
+        outputDataTypes);
   }
 
   @Override