You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/07/07 15:48:18 UTC
[iotdb] 05/14: refactor RawDataAggregationOperator to batch process
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/aggrOpRefactor
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4a3b4e7cf1f24245c8af924a003a2fc1dbf3b16f
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Jul 5 09:44:26 2022 +0800
refactor RawDataAggregationOperator to batch process
---
.../process/RawDataAggregationOperator.java | 31 +++++++++++++++++-----
1 file changed, 25 insertions(+), 6 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index a64a924426..55e47bb159 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -35,11 +35,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isEndCalc;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.satisfied;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.skipOutOfTimeRangePoints;
-import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.updateResultTsBlockFromAggregators;
/**
* RawDataAggregationOperator is used to process raw data tsBlock input calculating using value
@@ -57,14 +57,14 @@ public class RawDataAggregationOperator implements ProcessOperator {
private final List<Aggregator> aggregators;
private final Operator child;
private final boolean ascending;
- private ITimeRangeIterator timeRangeIterator;
+ private final ITimeRangeIterator timeRangeIterator;
// current interval of aggregation window [curStartTime, curEndTime)
private TimeRange curTimeRange;
private TsBlock preCachedData;
// Using for building result tsBlock
- private final TsBlockBuilder tsBlockBuilder;
+ private final TsBlockBuilder resultTsBlockBuilder;
public RawDataAggregationOperator(
OperatorContext operatorContext,
@@ -81,7 +81,7 @@ public class RawDataAggregationOperator implements ProcessOperator {
for (Aggregator aggregator : aggregators) {
dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
}
- tsBlockBuilder = new TsBlockBuilder(dataTypes);
+ resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending, true);
}
@@ -97,9 +97,26 @@ public class RawDataAggregationOperator implements ProcessOperator {
@Override
public TsBlock next() {
+ resultTsBlockBuilder.reset();
+ while ((curTimeRange != null || timeRangeIterator.hasNextTimeRange())
+ && !resultTsBlockBuilder.isFull()) {
+ if (!calculateNextResult()) {
+ break;
+ }
+ }
+
+ if (resultTsBlockBuilder.getPositionCount() > 0) {
+ return resultTsBlockBuilder.build();
+ } else {
+ return null;
+ }
+ }
+
+ private boolean calculateNextResult() {
// Move to next timeRange
if (curTimeRange == null && timeRangeIterator.hasNextTimeRange()) {
curTimeRange = timeRangeIterator.nextTimeRange();
+ // 0. Clear previous aggregation result
for (Aggregator aggregator : aggregators) {
aggregator.reset();
aggregator.updateTimeRange(curTimeRange);
@@ -116,7 +133,7 @@ public class RawDataAggregationOperator implements ProcessOperator {
canCallNext = false;
// if child still has next but can't be invoked now
} else if (child.hasNext()) {
- return null;
+ return false;
} else {
break;
}
@@ -124,7 +141,9 @@ public class RawDataAggregationOperator implements ProcessOperator {
// 2. Update result using aggregators
curTimeRange = null;
- return updateResultTsBlockFromAggregators(tsBlockBuilder, aggregators, timeRangeIterator);
+ appendAggregationResult(resultTsBlockBuilder, aggregators, timeRangeIterator);
+
+ return true;
}
@Override