You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/07/18 04:01:20 UTC

[iotdb] branch lmh/isBlockedDebug created (now 34aff4e196)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a change to branch lmh/isBlockedDebug
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 34aff4e196 fix isBlocked() bug in AggregationOperator

This branch includes the following new commits:

     new 34aff4e196 fix isBlocked() bug in AggregationOperator

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: fix isBlocked() bug in AggregationOperator

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/isBlockedDebug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 34aff4e1969e1c09ae72e879d56c2aaeb7b8d6e1
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Jul 18 12:00:47 2022 +0800

    fix isBlocked() bug in AggregationOperator
---
 .../operator/process/AggregationOperator.java      | 23 ++++++++++++++++------
 1 file changed, 17 insertions(+), 6 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 76b81c2873..8cf4aef8b0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -35,6 +35,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import static com.google.common.util.concurrent.Futures.successfulAsList;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 
@@ -75,6 +76,9 @@ public class AggregationOperator implements ProcessOperator {
     this.inputOperatorsCount = children.size();
     this.inputTsBlocks = new TsBlock[inputOperatorsCount];
     this.canCallNext = new boolean[inputOperatorsCount];
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      canCallNext[i] = false;
+    }
 
     this.timeRangeIterator =
         initTimeRangeIterator(groupByTimeParameter, ascending, outputPartialTimeWindow);
@@ -93,13 +97,19 @@ public class AggregationOperator implements ProcessOperator {
 
   @Override
   public ListenableFuture<?> isBlocked() {
+    List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
     for (int i = 0; i < inputOperatorsCount; i++) {
       ListenableFuture<?> blocked = children.get(i).isBlocked();
-      if (!blocked.isDone()) {
-        return blocked;
+      if (blocked.isDone()) {
+        canCallNext[i] = true;
+      } else {
+        if (isEmpty(i)) {
+          listenableFutures.add(blocked);
+          canCallNext[i] = true;
+        }
       }
     }
-    return NOT_BLOCKED;
+    return listenableFutures.isEmpty() ? NOT_BLOCKED : successfulAsList(listenableFutures);
   }
 
   @Override
@@ -115,9 +125,6 @@ public class AggregationOperator implements ProcessOperator {
 
     // reset operator state
     resultTsBlockBuilder.reset();
-    for (int i = 0; i < inputOperatorsCount; i++) {
-      canCallNext[i] = true;
-    }
 
     while (System.nanoTime() - start < maxRuntime
         && (curTimeRange != null || timeRangeIterator.hasNextTimeRange())
@@ -199,4 +206,8 @@ public class AggregationOperator implements ProcessOperator {
     curTimeRange = null;
     appendAggregationResult(resultTsBlockBuilder, aggregators, timeRangeIterator);
   }
+
+  private boolean isEmpty(int index) {
+    return inputTsBlocks[index] == null || inputTsBlocks[index].isEmpty();
+  }
 }