You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/17 05:14:07 UTC

[iotdb] branch DONTBlock created (now 3dae858983)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a change to branch DONTBlock
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 3dae858983 implement AggregationOperator

This branch includes the following new commits:

     new 3dae858983 implement AggregationOperator

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: implement AggregationOperator

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch DONTBlock
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3dae858983da70286c562ca1bc6ba967740ed4ed
Author: Alima777 <wx...@gmail.com>
AuthorDate: Fri Feb 17 13:13:53 2023 +0800

    implement AggregationOperator
---
 .../operator/process/AggregationOperator.java      | 35 ++++++++++++----------
 1 file changed, 19 insertions(+), 16 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index c320839813..bee71bbe77 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -120,19 +120,21 @@ public class AggregationOperator implements ProcessOperator {
 
   @Override
   public ListenableFuture<?> isBlocked() {
+    boolean isBlocked = false;
     List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
     for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!isEmpty(i)) {
+        continue;
+      }
       ListenableFuture<?> blocked = children.get(i).isBlocked();
       if (blocked.isDone()) {
+        isBlocked = true;
         canCallNext[i] = true;
-      } else {
-        if (isEmpty(i)) {
-          listenableFutures.add(blocked);
-          canCallNext[i] = true;
-        }
+      } else if (!blocked.isDone()) {
+        listenableFutures.add(blocked);
       }
     }
-    return listenableFutures.isEmpty() ? NOT_BLOCKED : successfulAsList(listenableFutures);
+    return isBlocked ? NOT_BLOCKED : successfulAsList(listenableFutures);
   }
 
   @Override
@@ -190,21 +192,22 @@ public class AggregationOperator implements ProcessOperator {
   }
 
   private boolean prepareInput() {
+    boolean allReady = true;
     for (int i = 0; i < inputOperatorsCount; i++) {
-      if (inputTsBlocks[i] != null) {
+      if (!isEmpty(i)) {
         continue;
       }
-      if (!canCallNext[i]) {
-        return false;
-      }
-
-      inputTsBlocks[i] = children.get(i).nextWithTimer();
-      canCallNext[i] = false;
-      if (inputTsBlocks[i] == null) {
-        return false;
+      if (canCallNext[i]) {
+        inputTsBlocks[i] = children.get(i).nextWithTimer();
+        canCallNext[i] = false;
+        if (inputTsBlocks[i] == null) {
+          allReady = false;
+        }
+      } else {
+        allReady = false;
       }
     }
-    return true;
+    return allReady;
   }
 
   private void calculateNextAggregationResult() {