You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/14 07:05:16 UTC

[iotdb] 03/05: add WindowConcatOperator

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 64b03425133cc6b758e93762b92994818638d7fe
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Nov 14 10:27:00 2022 +0800

    add WindowConcatOperator
---
 .../TimeRangeIteratorFactory.java                  |  3 +-
 ...plitOperator.java => WindowConcatOperator.java} | 82 ++--------------------
 .../operator/process/WindowSplitOperator.java      | 12 ++--
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 32 ++++++++-
 4 files changed, 45 insertions(+), 84 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java
index cd0ac8a08e..552cca4bb2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java
@@ -80,7 +80,8 @@ public class TimeRangeIteratorFactory {
       long endTime,
       long interval,
       long slidingStep,
-      List<Integer> samplingIndexes) {
+      List<Integer> samplingIndexes,
+      boolean outputPartialTimeWindow) {
     return new SampleWindowIterator(startTime, endTime, interval, slidingStep, samplingIndexes);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java
similarity index 50%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java
index d8d4ebc1f7..1f40016f9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java
@@ -26,15 +26,10 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockUtil;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
-
-import com.google.common.util.concurrent.ListenableFuture;
 
 import java.util.List;
 
-public class WindowSplitOperator implements ProcessOperator {
+public class WindowConcatOperator implements ProcessOperator {
 
   protected final OperatorContext operatorContext;
 
@@ -47,7 +42,7 @@ public class WindowSplitOperator implements ProcessOperator {
 
   private final TsBlockBuilder resultTsBlockBuilder;
 
-  public WindowSplitOperator(
+  public WindowConcatOperator(
       OperatorContext operatorContext,
       Operator child,
       ITimeRangeIterator sampleTimeRangeIterator,
@@ -60,87 +55,22 @@ public class WindowSplitOperator implements ProcessOperator {
 
   @Override
   public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  @Override
-  public ListenableFuture<?> isBlocked() {
-    return child.isBlocked();
+    return null;
   }
 
   @Override
   public TsBlock next() {
-    // reset operator state
-    canCallNext = true;
-
-    if (curTimeRange == null && sampleTimeRangeIterator.hasNextTimeRange()) {
-      // move to next time window
-      curTimeRange = sampleTimeRangeIterator.nextTimeRange();
-    }
-
-    if (!fetchData()) {
-      return null;
-    } else {
-      curTimeRange = null;
-      TsBlock resultTsBlock = resultTsBlockBuilder.build();
-      resultTsBlockBuilder.reset();
-      return resultTsBlock;
-    }
-  }
-
-  private boolean fetchData() {
-    while (!consumeInput()) {
-      // NOTE: child.next() can only be invoked once
-      if (child.hasNext() && canCallNext) {
-        inputTsBlock = child.next();
-        canCallNext = false;
-      } else {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  private boolean consumeInput() {
-    if (inputTsBlock == null) {
-      return false;
-    }
-
-    inputTsBlock = TsBlockUtil.skipPointsOutOfTimeRange(inputTsBlock, curTimeRange, true);
-    if (inputTsBlock == null) {
-      return false;
-    }
-
-    for (int readIndex = 0; readIndex < inputTsBlock.getPositionCount(); readIndex++) {
-      long time = inputTsBlock.getTimeByIndex(readIndex);
-      if (curTimeRange.contains(time)) {
-        writeData(readIndex);
-      } else {
-        inputTsBlock = inputTsBlock.subTsBlock(readIndex);
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private void writeData(int readIndex) {
-    TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
-    timeColumnBuilder.writeLong(inputTsBlock.getTimeByIndex(readIndex));
-    ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
-    for (int columnIndex = 0; columnIndex < columnBuilders.length; columnIndex++) {
-      columnBuilders[columnIndex].write(inputTsBlock.getColumn(columnIndex), readIndex);
-    }
-    resultTsBlockBuilder.declarePosition();
+    return null;
   }
 
   @Override
   public boolean hasNext() {
-    return curTimeRange != null || sampleTimeRangeIterator.hasNextTimeRange();
+    return false;
   }
 
   @Override
   public boolean isFinished() {
-    return !this.hasNext();
+    return false;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
index d8d4ebc1f7..f2ee804f75 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
@@ -42,7 +42,7 @@ public class WindowSplitOperator implements ProcessOperator {
   protected TsBlock inputTsBlock;
   protected boolean canCallNext;
 
-  private final ITimeRangeIterator sampleTimeRangeIterator;
+  private final ITimeRangeIterator sampleTimeRangeSliceIterator;
   private TimeRange curTimeRange;
 
   private final TsBlockBuilder resultTsBlockBuilder;
@@ -50,11 +50,11 @@ public class WindowSplitOperator implements ProcessOperator {
   public WindowSplitOperator(
       OperatorContext operatorContext,
       Operator child,
-      ITimeRangeIterator sampleTimeRangeIterator,
+      ITimeRangeIterator sampleTimeRangeSliceIterator,
       List<TSDataType> outputDataTypes) {
     this.operatorContext = operatorContext;
     this.child = child;
-    this.sampleTimeRangeIterator = sampleTimeRangeIterator;
+    this.sampleTimeRangeSliceIterator = sampleTimeRangeSliceIterator;
     this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
   }
 
@@ -73,9 +73,9 @@ public class WindowSplitOperator implements ProcessOperator {
     // reset operator state
     canCallNext = true;
 
-    if (curTimeRange == null && sampleTimeRangeIterator.hasNextTimeRange()) {
+    if (curTimeRange == null && sampleTimeRangeSliceIterator.hasNextTimeRange()) {
       // move to next time window
-      curTimeRange = sampleTimeRangeIterator.nextTimeRange();
+      curTimeRange = sampleTimeRangeSliceIterator.nextTimeRange();
     }
 
     if (!fetchData()) {
@@ -135,7 +135,7 @@ public class WindowSplitOperator implements ProcessOperator {
 
   @Override
   public boolean hasNext() {
-    return curTimeRange != null || sampleTimeRangeIterator.hasNextTimeRange();
+    return curTimeRange != null || sampleTimeRangeSliceIterator.hasNextTimeRange();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 4c69cd5720..888c3f94a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -54,6 +54,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOper
 import org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TagAggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.WindowConcatOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.WindowSplitOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
@@ -149,6 +150,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggre
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.WindowConcatNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.WindowSplitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
@@ -1596,7 +1598,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
             groupByTimeParameter.getEndTime(),
             groupByTimeParameter.getInterval(),
             groupByTimeParameter.getSlidingStep(),
-            node.getSamplingIndexes());
+            node.getSamplingIndexes(),
+            true);
 
     List<TSDataType> outputDataTypes = getOutputColumnTypes(node, context.getTypeProvider());
 
@@ -1604,6 +1607,33 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     return new WindowSplitOperator(operatorContext, child, timeRangeIterator, outputDataTypes);
   }
 
+  @Override
+  public Operator visitWindowConcat(WindowConcatNode node, LocalExecutionPlanContext context) {
+    Operator child = node.getChild().accept(this, context);
+    OperatorContext operatorContext =
+        context
+            .getInstanceContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                WindowConcatOperator.class.getSimpleName());
+
+    GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+    ITimeRangeIterator timeRangeIterator =
+        TimeRangeIteratorFactory.getSampleTimeRangeIterator(
+            groupByTimeParameter.getStartTime(),
+            groupByTimeParameter.getEndTime(),
+            groupByTimeParameter.getInterval(),
+            groupByTimeParameter.getSlidingStep(),
+            node.getSamplingIndexes(),
+            false);
+
+    List<TSDataType> outputDataTypes = getOutputColumnTypes(node, context.getTypeProvider());
+
+    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+    return new WindowConcatOperator(operatorContext, child, timeRangeIterator, outputDataTypes);
+  }
+
   @Override
   public Operator visitSchemaFetchMerge(
       SchemaFetchMergeNode node, LocalExecutionPlanContext context) {