You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/07/07 15:48:17 UTC
[iotdb] 04/14: refactor scan operator to batch process
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/aggrOpRefactor
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b6848520a628dd32c73d8895a2ac1ab28a821793
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Jul 4 21:51:35 2022 +0800
refactor scan operator to batch process
---
.../db/mpp/execution/operator/AggregationUtil.java | 11 +++++--
.../source/SeriesAggregationScanOperator.java | 37 ++++++++++++----------
2 files changed, 29 insertions(+), 19 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
index a7ec92ec8c..64b80d0bfe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
@@ -35,11 +35,10 @@ import java.util.List;
public class AggregationUtil {
- public static TsBlock updateResultTsBlockFromAggregators(
+ public static void appendAggregationResult(
TsBlockBuilder tsBlockBuilder,
List<? extends Aggregator> aggregators,
ITimeRangeIterator timeRangeIterator) {
- tsBlockBuilder.reset();
TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
// Use start time of current time range as time column
timeColumnBuilder.writeLong(timeRangeIterator.currentOutputTime());
@@ -54,6 +53,14 @@ public class AggregationUtil {
aggregator.outputResult(columnBuilder);
}
tsBlockBuilder.declarePosition();
+ }
+
+ public static TsBlock updateResultTsBlockFromAggregators(
+ TsBlockBuilder tsBlockBuilder,
+ List<? extends Aggregator> aggregators,
+ ITimeRangeIterator timeRangeIterator) {
+ tsBlockBuilder.reset();
+ appendAggregationResult(tsBlockBuilder, aggregators, timeRangeIterator);
return tsBlockBuilder.build();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
index 06b4878d2b..1f0e11485c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java
@@ -42,10 +42,10 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isEndCalc;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.skipOutOfTimeRangePoints;
-import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.updateResultTsBlockFromAggregators;
/**
* This operator is responsible to do the aggregation calculation for one series based on global
@@ -72,8 +72,7 @@ public class SeriesAggregationScanOperator implements DataSourceOperator {
protected TsBlock preCachedData;
- protected final TsBlockBuilder tsBlockBuilder;
- protected TsBlock resultTsBlock;
+ protected final TsBlockBuilder resultTsBlockBuilder;
protected boolean finished = false;
public SeriesAggregationScanOperator(
@@ -118,7 +117,7 @@ public class SeriesAggregationScanOperator implements DataSourceOperator {
for (Aggregator aggregator : aggregators) {
dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
}
- tsBlockBuilder = new TsBlockBuilder(dataTypes);
+ this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending, true);
this.isGroupByQuery = groupByTimeParameter != null;
}
@@ -135,20 +134,25 @@ public class SeriesAggregationScanOperator implements DataSourceOperator {
@Override
public TsBlock next() {
- if (!timeRangeIterator.hasNextTimeRange()) {
- return null;
- }
- curTimeRange = timeRangeIterator.nextTimeRange();
+ resultTsBlockBuilder.reset();
+ while (timeRangeIterator.hasNextTimeRange() && !resultTsBlockBuilder.isFull()) {
+ curTimeRange = timeRangeIterator.nextTimeRange();
- // 1. Clear previous aggregation result
- for (Aggregator aggregator : aggregators) {
- aggregator.reset();
- aggregator.updateTimeRange(curTimeRange);
+ // 1. Clear previous aggregation result
+ for (Aggregator aggregator : aggregators) {
+ aggregator.reset();
+ aggregator.updateTimeRange(curTimeRange);
+ }
+
+ // 2. Calculate aggregation result based on current time window
+ calculateNextResult();
}
- // 2. Calculate aggregation result based on current time window
- calculateNextResult();
- return resultTsBlock;
+ if (resultTsBlockBuilder.getPositionCount() > 0) {
+ return resultTsBlockBuilder.build();
+ } else {
+ return null;
+ }
}
@Override
@@ -224,8 +228,7 @@ public class SeriesAggregationScanOperator implements DataSourceOperator {
}
protected void updateResultTsBlock() {
- resultTsBlock =
- updateResultTsBlockFromAggregators(tsBlockBuilder, aggregators, timeRangeIterator);
+ appendAggregationResult(resultTsBlockBuilder, aggregators, timeRangeIterator);
}
/** @return if already get the result */