You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/07/07 15:48:20 UTC

[iotdb] 07/14: refactor AggregationOperator to batch process

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/aggrOpRefactor
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit aef556d6d0a14f874c88ba08b415bf8dd953013f
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Jul 6 18:28:29 2022 +0800

    refactor AggregationOperator to batch process
---
 .../operator/process/AggregationOperator.java      | 72 ++++++++++++++++++----
 1 file changed, 60 insertions(+), 12 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 0027328e1e..9f02abab9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -34,8 +34,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
-import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.updateResultTsBlockFromAggregators;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.skipOutOfTimeRangePoints;
 
 /**
  * AggregationOperator can process the situation: aggregation of intermediate aggregate result, it
@@ -50,12 +51,16 @@ public class AggregationOperator implements ProcessOperator {
 
   private final int inputOperatorsCount;
   private final TsBlock[] inputTsBlocks;
-  private final TsBlockBuilder tsBlockBuilder;
+  private final TsBlockBuilder resultTsBlockBuilder;
 
   private final ITimeRangeIterator timeRangeIterator;
   // current interval of aggregation window [curStartTime, curEndTime)
   private TimeRange curTimeRange;
 
+  private final boolean ascending;
+
+  private final boolean[] canCallNext;
+
   public AggregationOperator(
       OperatorContext operatorContext,
       List<Aggregator> aggregators,
@@ -73,9 +78,12 @@ public class AggregationOperator implements ProcessOperator {
     for (Aggregator aggregator : aggregators) {
       dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
     }
-    tsBlockBuilder = new TsBlockBuilder(dataTypes);
+    this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
     this.timeRangeIterator =
         initTimeRangeIterator(groupByTimeParameter, ascending, outputPartialTimeWindow);
+    this.ascending = ascending;
+
+    this.canCallNext = new boolean[inputOperatorsCount];
   }
 
   @Override
@@ -96,30 +104,70 @@ public class AggregationOperator implements ProcessOperator {
 
   @Override
   public TsBlock next() {
-    // update input tsBlock
-    if (curTimeRange == null && timeRangeIterator.hasNextTimeRange()) {
-      curTimeRange = timeRangeIterator.nextTimeRange();
+    resultTsBlockBuilder.reset();
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      canCallNext[i] = true;
     }
+
+    while ((curTimeRange != null || timeRangeIterator.hasNextTimeRange())
+        && !resultTsBlockBuilder.isFull()) {
+      if (hasCachedData()) {
+        calculateNextResult();
+      } else {
+        break;
+      }
+    }
+
+    if (resultTsBlockBuilder.getPositionCount() > 0) {
+      return resultTsBlockBuilder.build();
+    } else {
+      return null;
+    }
+  }
+
+  private boolean hasCachedData() {
     for (int i = 0; i < inputOperatorsCount; i++) {
       if (inputTsBlocks[i] != null) {
         continue;
       }
+      if (!canCallNext[i]) {
+        return false;
+      }
+
       inputTsBlocks[i] = children.get(i).next();
+      canCallNext[i] = false;
       if (inputTsBlocks[i] == null) {
-        return null;
+        return false;
       }
     }
+    return true;
+  }
+
+  private void calculateNextResult() {
+    if (curTimeRange == null && timeRangeIterator.hasNextTimeRange()) {
+      curTimeRange = timeRangeIterator.nextTimeRange();
+      // consume current input tsBlocks
+      for (Aggregator aggregator : aggregators) {
+        aggregator.reset();
+        aggregator.updateTimeRange(curTimeRange);
+      }
+    }
+
     // consume current input tsBlocks
     for (Aggregator aggregator : aggregators) {
-      aggregator.reset();
       aggregator.processTsBlocks(inputTsBlocks);
     }
-    // output result from aggregator
-    curTimeRange = null;
+
     for (int i = 0; i < inputOperatorsCount; i++) {
-      inputTsBlocks[i] = null;
+      inputTsBlocks[i] = skipOutOfTimeRangePoints(inputTsBlocks[i], curTimeRange, ascending);
+      if (inputTsBlocks[i].isEmpty()) {
+        inputTsBlocks[i] = null;
+      }
     }
-    return updateResultTsBlockFromAggregators(tsBlockBuilder, aggregators, timeRangeIterator);
+    curTimeRange = null;
+
+    // output result from aggregator
+    appendAggregationResult(resultTsBlockBuilder, aggregators, timeRangeIterator);
   }
 
   @Override