You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/17 05:14:08 UTC

[iotdb] 01/01: implement AggregationOperator

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch DONTBlock
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3dae858983da70286c562ca1bc6ba967740ed4ed
Author: Alima777 <wx...@gmail.com>
AuthorDate: Fri Feb 17 13:13:53 2023 +0800

    implement AggregationOperator
---
 .../operator/process/AggregationOperator.java      | 35 ++++++++++++----------
 1 file changed, 19 insertions(+), 16 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index c320839813..bee71bbe77 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -120,19 +120,21 @@ public class AggregationOperator implements ProcessOperator {
 
   @Override
   public ListenableFuture<?> isBlocked() {
+    boolean isBlocked = false;
     List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
     for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!isEmpty(i)) {
+        continue;
+      }
       ListenableFuture<?> blocked = children.get(i).isBlocked();
       if (blocked.isDone()) {
+        isBlocked = true;
         canCallNext[i] = true;
-      } else {
-        if (isEmpty(i)) {
-          listenableFutures.add(blocked);
-          canCallNext[i] = true;
-        }
+      } else if (!blocked.isDone()) {
+        listenableFutures.add(blocked);
       }
     }
-    return listenableFutures.isEmpty() ? NOT_BLOCKED : successfulAsList(listenableFutures);
+    return isBlocked ? NOT_BLOCKED : successfulAsList(listenableFutures);
   }
 
   @Override
@@ -190,21 +192,22 @@ public class AggregationOperator implements ProcessOperator {
   }
 
   private boolean prepareInput() {
+    boolean allReady = true;
     for (int i = 0; i < inputOperatorsCount; i++) {
-      if (inputTsBlocks[i] != null) {
+      if (!isEmpty(i)) {
         continue;
       }
-      if (!canCallNext[i]) {
-        return false;
-      }
-
-      inputTsBlocks[i] = children.get(i).nextWithTimer();
-      canCallNext[i] = false;
-      if (inputTsBlocks[i] == null) {
-        return false;
+      if (canCallNext[i]) {
+        inputTsBlocks[i] = children.get(i).nextWithTimer();
+        canCallNext[i] = false;
+        if (inputTsBlocks[i] == null) {
+          allReady = false;
+        }
+      } else {
+        allReady = false;
       }
     }
-    return true;
+    return allReady;
   }
 
   private void calculateNextAggregationResult() {