You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/07/07 15:48:17 UTC

[iotdb] 04/14: refactor scan operator to batch process

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/aggrOpRefactor
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b6848520a628dd32c73d8895a2ac1ab28a821793
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Jul 4 21:51:35 2022 +0800

    refactor scan operator to batch process
---
 .../db/mpp/execution/operator/AggregationUtil.java | 11 +++++--
 .../source/SeriesAggregationScanOperator.java      | 37 ++++++++++++----------
 2 files changed, 29 insertions(+), 19 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
index a7ec92ec8c..64b80d0bfe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
@@ -35,11 +35,10 @@ import java.util.List;
 
 public class AggregationUtil {
 
-  public static TsBlock updateResultTsBlockFromAggregators(
+  public static void appendAggregationResult(
       TsBlockBuilder tsBlockBuilder,
       List<? extends Aggregator> aggregators,
       ITimeRangeIterator timeRangeIterator) {
-    tsBlockBuilder.reset();
     TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
     // Use start time of current time range as time column
     timeColumnBuilder.writeLong(timeRangeIterator.currentOutputTime());
@@ -54,6 +53,14 @@ public class AggregationUtil {
       aggregator.outputResult(columnBuilder);
     }
     tsBlockBuilder.declarePosition();
+  }
+
+  public static TsBlock updateResultTsBlockFromAggregators(
+      TsBlockBuilder tsBlockBuilder,
+      List<? extends Aggregator> aggregators,
+      ITimeRangeIterator timeRangeIterator) {
+    tsBlockBuilder.reset();
+    appendAggregationResult(tsBlockBuilder, aggregators, timeRangeIterator);
     return tsBlockBuilder.build();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
index 06b4878d2b..1f0e11485c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
@@ -42,10 +42,10 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isEndCalc;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.skipOutOfTimeRangePoints;
-import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.updateResultTsBlockFromAggregators;
 
 /**
  * This operator is responsible to do the aggregation calculation for one series based on global
@@ -72,8 +72,7 @@ public class SeriesAggregationScanOperator implements DataSourceOperator {
 
   protected TsBlock preCachedData;
 
-  protected final TsBlockBuilder tsBlockBuilder;
-  protected TsBlock resultTsBlock;
+  protected final TsBlockBuilder resultTsBlockBuilder;
   protected boolean finished = false;
 
   public SeriesAggregationScanOperator(
@@ -118,7 +117,7 @@ public class SeriesAggregationScanOperator implements DataSourceOperator {
     for (Aggregator aggregator : aggregators) {
       dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
     }
-    tsBlockBuilder = new TsBlockBuilder(dataTypes);
+    this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
     this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending, true);
     this.isGroupByQuery = groupByTimeParameter != null;
   }
@@ -135,20 +134,25 @@ public class SeriesAggregationScanOperator implements DataSourceOperator {
 
   @Override
   public TsBlock next() {
-    if (!timeRangeIterator.hasNextTimeRange()) {
-      return null;
-    }
-    curTimeRange = timeRangeIterator.nextTimeRange();
+    resultTsBlockBuilder.reset();
+    while (timeRangeIterator.hasNextTimeRange() && !resultTsBlockBuilder.isFull()) {
+      curTimeRange = timeRangeIterator.nextTimeRange();
 
-    // 1. Clear previous aggregation result
-    for (Aggregator aggregator : aggregators) {
-      aggregator.reset();
-      aggregator.updateTimeRange(curTimeRange);
+      // 1. Clear previous aggregation result
+      for (Aggregator aggregator : aggregators) {
+        aggregator.reset();
+        aggregator.updateTimeRange(curTimeRange);
+      }
+
+      // 2. Calculate aggregation result based on current time window
+      calculateNextResult();
     }
 
-    // 2. Calculate aggregation result based on current time window
-    calculateNextResult();
-    return resultTsBlock;
+    if (resultTsBlockBuilder.getPositionCount() > 0) {
+      return resultTsBlockBuilder.build();
+    } else {
+      return null;
+    }
   }
 
   @Override
@@ -224,8 +228,7 @@ public class SeriesAggregationScanOperator implements DataSourceOperator {
   }
 
   protected void updateResultTsBlock() {
-    resultTsBlock =
-        updateResultTsBlockFromAggregators(tsBlockBuilder, aggregators, timeRangeIterator);
+    appendAggregationResult(resultTsBlockBuilder, aggregators, timeRangeIterator);
   }
 
   /** @return if already get the result */