You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/29 03:23:13 UTC

[iotdb] 01/01: fix

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/FixGroupByTag
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2b043dfff2b10585cb3ebc8a85bca7ba6dc92388
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Nov 29 11:22:35 2022 +0800

    fix
---
 .../operator/process/TagAggregationOperator.java   | 41 ++++++++++++++--------
 1 file changed, 26 insertions(+), 15 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
index 4abbd1dcb3..cf18c26b74 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.utils.Binary;
 
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.commons.lang3.Validate;
 
@@ -38,6 +37,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
 public class TagAggregationOperator implements ProcessOperator {
 
   private final OperatorContext operatorContext;
@@ -46,8 +47,8 @@ public class TagAggregationOperator implements ProcessOperator {
   private final List<Operator> children;
   private final TsBlock[] inputTsBlocks;
 
-  // Indicate whether a child operator's next() is called
-  private final boolean[] hasCalledNext;
+  // Indicate whether a child operator's next() can be called
+  private final boolean[] canCallNext;
 
   // These fields record the to be consumed index of each tsBlock.
   private final int[] consumedIndices;
@@ -84,7 +85,8 @@ public class TagAggregationOperator implements ProcessOperator {
     this.tsBlockBuilder = new TsBlockBuilder(actualOutputColumnTypes);
     // Initialize input tsblocks for each aggregator group.
     this.inputTsBlocks = new TsBlock[children.size()];
-    this.hasCalledNext = new boolean[children.size()];
+    this.canCallNext = new boolean[children.size()];
+    Arrays.fill(canCallNext, false);
     this.consumedIndices = new int[children.size()];
     this.maxRetainedSize = children.stream().mapToLong(Operator::calculateMaxReturnSize).sum();
     this.childrenRetainedSize =
@@ -99,9 +101,9 @@ public class TagAggregationOperator implements ProcessOperator {
 
   @Override
   public TsBlock next() {
-    Arrays.fill(hasCalledNext, false);
     long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
     long start = System.nanoTime();
+
     boolean successful = true;
     while (System.nanoTime() - start < maxRuntime && !tsBlockBuilder.isFull() && successful) {
       successful = processOneRow();
@@ -116,13 +118,19 @@ public class TagAggregationOperator implements ProcessOperator {
 
   private boolean processOneRow() {
     for (int i = 0; i < children.size(); i++) {
-      // If the data is unavailable first, try to find next tsblock of the child.
-      if (dataUnavailable(i) && !hasCalledNext[i]) {
-        inputTsBlocks[i] = children.get(i).next();
-        consumedIndices[i] = 0;
-        hasCalledNext[i] = true;
+      if (!dataUnavailable(i)) {
+        continue;
+      }
+
+      if (!canCallNext[i]) {
+        return false;
       }
 
+      // If the data is unavailable first, try to find next tsblock of the child.
+      inputTsBlocks[i] = children.get(i).next();
+      consumedIndices[i] = 0;
+      canCallNext[i] = false;
+
       // If it's still unavailable, then blocked by children i.
       if (dataUnavailable(i)) {
         return false;
@@ -193,15 +201,18 @@ public class TagAggregationOperator implements ProcessOperator {
   @Override
   public ListenableFuture<?> isBlocked() {
     List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
-    for (int i = 0; i < children.size(); i++) {
-      if (dataUnavailable(i)) {
-        ListenableFuture<?> blocked = children.get(i).isBlocked();
-        if (!blocked.isDone()) {
+    for (int i = 0, size = children.size(); i < size; i++) {
+      ListenableFuture<?> blocked = children.get(i).isBlocked();
+      if (blocked.isDone()) {
+        canCallNext[i] = true;
+      } else {
+        if (dataUnavailable(i)) {
           listenableFutures.add(blocked);
+          canCallNext[i] = true;
         }
       }
     }
-    return listenableFutures.isEmpty() ? NOT_BLOCKED : Futures.successfulAsList(listenableFutures);
+    return listenableFutures.isEmpty() ? NOT_BLOCKED : successfulAsList(listenableFutures);
   }
 
   @Override