You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/07/07 15:48:20 UTC
[iotdb] 07/14: refactor AggregationOperator to batch process
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/aggrOpRefactor
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit aef556d6d0a14f874c88ba08b415bf8dd953013f
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Wed Jul 6 18:28:29 2022 +0800
refactor AggregationOperator to batch process
---
.../operator/process/AggregationOperator.java | 72 ++++++++++++++++++----
1 file changed, 60 insertions(+), 12 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 0027328e1e..9f02abab9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -34,8 +34,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
-import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.updateResultTsBlockFromAggregators;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.skipOutOfTimeRangePoints;
/**
* AggregationOperator can process the situation: aggregation of intermediate aggregate result, it
@@ -50,12 +51,16 @@ public class AggregationOperator implements ProcessOperator {
private final int inputOperatorsCount;
private final TsBlock[] inputTsBlocks;
- private final TsBlockBuilder tsBlockBuilder;
+ private final TsBlockBuilder resultTsBlockBuilder;
private final ITimeRangeIterator timeRangeIterator;
// current interval of aggregation window [curStartTime, curEndTime)
private TimeRange curTimeRange;
+ private final boolean ascending;
+
+ private final boolean[] canCallNext;
+
public AggregationOperator(
OperatorContext operatorContext,
List<Aggregator> aggregators,
@@ -73,9 +78,12 @@ public class AggregationOperator implements ProcessOperator {
for (Aggregator aggregator : aggregators) {
dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
}
- tsBlockBuilder = new TsBlockBuilder(dataTypes);
+ this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
this.timeRangeIterator =
initTimeRangeIterator(groupByTimeParameter, ascending, outputPartialTimeWindow);
+ this.ascending = ascending;
+
+ this.canCallNext = new boolean[inputOperatorsCount];
}
@Override
@@ -96,30 +104,70 @@ public class AggregationOperator implements ProcessOperator {
@Override
public TsBlock next() {
- // update input tsBlock
- if (curTimeRange == null && timeRangeIterator.hasNextTimeRange()) {
- curTimeRange = timeRangeIterator.nextTimeRange();
+ resultTsBlockBuilder.reset();
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ canCallNext[i] = true;
}
+
+ while ((curTimeRange != null || timeRangeIterator.hasNextTimeRange())
+ && !resultTsBlockBuilder.isFull()) {
+ if (hasCachedData()) {
+ calculateNextResult();
+ } else {
+ break;
+ }
+ }
+
+ if (resultTsBlockBuilder.getPositionCount() > 0) {
+ return resultTsBlockBuilder.build();
+ } else {
+ return null;
+ }
+ }
+
+ private boolean hasCachedData() {
for (int i = 0; i < inputOperatorsCount; i++) {
if (inputTsBlocks[i] != null) {
continue;
}
+ if (!canCallNext[i]) {
+ return false;
+ }
+
inputTsBlocks[i] = children.get(i).next();
+ canCallNext[i] = false;
if (inputTsBlocks[i] == null) {
- return null;
+ return false;
}
}
+ return true;
+ }
+
+ private void calculateNextResult() {
+ if (curTimeRange == null && timeRangeIterator.hasNextTimeRange()) {
+ curTimeRange = timeRangeIterator.nextTimeRange();
+ // consume current input tsBlocks
+ for (Aggregator aggregator : aggregators) {
+ aggregator.reset();
+ aggregator.updateTimeRange(curTimeRange);
+ }
+ }
+
// consume current input tsBlocks
for (Aggregator aggregator : aggregators) {
- aggregator.reset();
aggregator.processTsBlocks(inputTsBlocks);
}
- // output result from aggregator
- curTimeRange = null;
+
for (int i = 0; i < inputOperatorsCount; i++) {
- inputTsBlocks[i] = null;
+ inputTsBlocks[i] = skipOutOfTimeRangePoints(inputTsBlocks[i], curTimeRange, ascending);
+ if (inputTsBlocks[i].isEmpty()) {
+ inputTsBlocks[i] = null;
+ }
}
- return updateResultTsBlockFromAggregators(tsBlockBuilder, aggregators, timeRangeIterator);
+ curTimeRange = null;
+
+ // output result from aggregator
+ appendAggregationResult(resultTsBlockBuilder, aggregators, timeRangeIterator);
}
@Override