You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/07/07 15:48:18 UTC

[iotdb] 05/14: refactor RawDataAggregationOperator to batch process

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/aggrOpRefactor
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4a3b4e7cf1f24245c8af924a003a2fc1dbf3b16f
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Jul 5 09:44:26 2022 +0800

    refactor RawDataAggregationOperator to batch process
---
 .../process/RawDataAggregationOperator.java        | 31 +++++++++++++++++-----
 1 file changed, 25 insertions(+), 6 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index a64a924426..55e47bb159 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -35,11 +35,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isEndCalc;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.satisfied;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.skipOutOfTimeRangePoints;
-import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.updateResultTsBlockFromAggregators;
 
 /**
  * RawDataAggregationOperator is used to process raw data tsBlock input calculating using value
@@ -57,14 +57,14 @@ public class RawDataAggregationOperator implements ProcessOperator {
   private final List<Aggregator> aggregators;
   private final Operator child;
   private final boolean ascending;
-  private ITimeRangeIterator timeRangeIterator;
+  private final ITimeRangeIterator timeRangeIterator;
   // current interval of aggregation window [curStartTime, curEndTime)
   private TimeRange curTimeRange;
 
   private TsBlock preCachedData;
 
   // Using for building result tsBlock
-  private final TsBlockBuilder tsBlockBuilder;
+  private final TsBlockBuilder resultTsBlockBuilder;
 
   public RawDataAggregationOperator(
       OperatorContext operatorContext,
@@ -81,7 +81,7 @@ public class RawDataAggregationOperator implements ProcessOperator {
     for (Aggregator aggregator : aggregators) {
       dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
     }
-    tsBlockBuilder = new TsBlockBuilder(dataTypes);
+    resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
     this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending, true);
   }
 
@@ -97,9 +97,26 @@ public class RawDataAggregationOperator implements ProcessOperator {
 
   @Override
   public TsBlock next() {
+    resultTsBlockBuilder.reset();
+    while ((curTimeRange != null || timeRangeIterator.hasNextTimeRange())
+        && !resultTsBlockBuilder.isFull()) {
+      if (!calculateNextResult()) {
+        break;
+      }
+    }
+
+    if (resultTsBlockBuilder.getPositionCount() > 0) {
+      return resultTsBlockBuilder.build();
+    } else {
+      return null;
+    }
+  }
+
+  private boolean calculateNextResult() {
     // Move to next timeRange
     if (curTimeRange == null && timeRangeIterator.hasNextTimeRange()) {
       curTimeRange = timeRangeIterator.nextTimeRange();
+      // 0. Clear previous aggregation result
       for (Aggregator aggregator : aggregators) {
         aggregator.reset();
         aggregator.updateTimeRange(curTimeRange);
@@ -116,7 +133,7 @@ public class RawDataAggregationOperator implements ProcessOperator {
         canCallNext = false;
         // if child still has next but can't be invoked now
       } else if (child.hasNext()) {
-        return null;
+        return false;
       } else {
         break;
       }
@@ -124,7 +141,9 @@ public class RawDataAggregationOperator implements ProcessOperator {
 
     // 2. Update result using aggregators
     curTimeRange = null;
-    return updateResultTsBlockFromAggregators(tsBlockBuilder, aggregators, timeRangeIterator);
+    appendAggregationResult(resultTsBlockBuilder, aggregators, timeRangeIterator);
+
+    return true;
   }
 
   @Override