You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/17 05:14:08 UTC
[iotdb] 01/01: implement AggregationOperator
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch DONTBlock
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3dae858983da70286c562ca1bc6ba967740ed4ed
Author: Alima777 <wx...@gmail.com>
AuthorDate: Fri Feb 17 13:13:53 2023 +0800
implement AggregationOperator
---
.../operator/process/AggregationOperator.java | 35 ++++++++++++----------
1 file changed, 19 insertions(+), 16 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index c320839813..bee71bbe77 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -120,19 +120,21 @@ public class AggregationOperator implements ProcessOperator {
@Override
public ListenableFuture<?> isBlocked() {
+ boolean isBlocked = false;
List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
for (int i = 0; i < inputOperatorsCount; i++) {
+ if (!isEmpty(i)) {
+ continue;
+ }
ListenableFuture<?> blocked = children.get(i).isBlocked();
if (blocked.isDone()) {
+ isBlocked = true;
canCallNext[i] = true;
- } else {
- if (isEmpty(i)) {
- listenableFutures.add(blocked);
- canCallNext[i] = true;
- }
+ } else if (!blocked.isDone()) {
+ listenableFutures.add(blocked);
}
}
- return listenableFutures.isEmpty() ? NOT_BLOCKED : successfulAsList(listenableFutures);
+ return isBlocked ? NOT_BLOCKED : successfulAsList(listenableFutures);
}
@Override
@@ -190,21 +192,22 @@ public class AggregationOperator implements ProcessOperator {
}
private boolean prepareInput() {
+ boolean allReady = true;
for (int i = 0; i < inputOperatorsCount; i++) {
- if (inputTsBlocks[i] != null) {
+ if (!isEmpty(i)) {
continue;
}
- if (!canCallNext[i]) {
- return false;
- }
-
- inputTsBlocks[i] = children.get(i).nextWithTimer();
- canCallNext[i] = false;
- if (inputTsBlocks[i] == null) {
- return false;
+ if (canCallNext[i]) {
+ inputTsBlocks[i] = children.get(i).nextWithTimer();
+ canCallNext[i] = false;
+ if (inputTsBlocks[i] == null) {
+ allReady = false;
+ }
+ } else {
+ allReady = false;
}
}
- return true;
+ return allReady;
}
private void calculateNextAggregationResult() {