You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/29 03:23:12 UTC

[iotdb] branch lmh/FixGroupByTag created (now 2b043dfff2)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a change to branch lmh/FixGroupByTag
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 2b043dfff2 fix

This branch includes the following new commits:

     new 2b043dfff2 fix

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: fix

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/FixGroupByTag
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2b043dfff2b10585cb3ebc8a85bca7ba6dc92388
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Nov 29 11:22:35 2022 +0800

    fix
---
 .../operator/process/TagAggregationOperator.java   | 41 ++++++++++++++--------
 1 file changed, 26 insertions(+), 15 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
index 4abbd1dcb3..cf18c26b74 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.utils.Binary;
 
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.commons.lang3.Validate;
 
@@ -38,6 +37,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
 public class TagAggregationOperator implements ProcessOperator {
 
   private final OperatorContext operatorContext;
@@ -46,8 +47,8 @@ public class TagAggregationOperator implements ProcessOperator {
   private final List<Operator> children;
   private final TsBlock[] inputTsBlocks;
 
-  // Indicate whether a child operator's next() is called
-  private final boolean[] hasCalledNext;
+  // Indicate whether a child operator's next() can be called
+  private final boolean[] canCallNext;
 
   // These fields record the to be consumed index of each tsBlock.
   private final int[] consumedIndices;
@@ -84,7 +85,8 @@ public class TagAggregationOperator implements ProcessOperator {
     this.tsBlockBuilder = new TsBlockBuilder(actualOutputColumnTypes);
     // Initialize input tsblocks for each aggregator group.
     this.inputTsBlocks = new TsBlock[children.size()];
-    this.hasCalledNext = new boolean[children.size()];
+    this.canCallNext = new boolean[children.size()];
+    Arrays.fill(canCallNext, false);
     this.consumedIndices = new int[children.size()];
     this.maxRetainedSize = children.stream().mapToLong(Operator::calculateMaxReturnSize).sum();
     this.childrenRetainedSize =
@@ -99,9 +101,9 @@ public class TagAggregationOperator implements ProcessOperator {
 
   @Override
   public TsBlock next() {
-    Arrays.fill(hasCalledNext, false);
     long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
     long start = System.nanoTime();
+
     boolean successful = true;
     while (System.nanoTime() - start < maxRuntime && !tsBlockBuilder.isFull() && successful) {
       successful = processOneRow();
@@ -116,13 +118,19 @@ public class TagAggregationOperator implements ProcessOperator {
 
   private boolean processOneRow() {
     for (int i = 0; i < children.size(); i++) {
-      // If the data is unavailable first, try to find next tsblock of the child.
-      if (dataUnavailable(i) && !hasCalledNext[i]) {
-        inputTsBlocks[i] = children.get(i).next();
-        consumedIndices[i] = 0;
-        hasCalledNext[i] = true;
+      if (!dataUnavailable(i)) {
+        continue;
+      }
+
+      if (!canCallNext[i]) {
+        return false;
       }
 
+      // If the data is unavailable first, try to find next tsblock of the child.
+      inputTsBlocks[i] = children.get(i).next();
+      consumedIndices[i] = 0;
+      canCallNext[i] = false;
+
       // If it's still unavailable, then blocked by children i.
       if (dataUnavailable(i)) {
         return false;
@@ -193,15 +201,18 @@ public class TagAggregationOperator implements ProcessOperator {
   @Override
   public ListenableFuture<?> isBlocked() {
     List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
-    for (int i = 0; i < children.size(); i++) {
-      if (dataUnavailable(i)) {
-        ListenableFuture<?> blocked = children.get(i).isBlocked();
-        if (!blocked.isDone()) {
+    for (int i = 0, size = children.size(); i < size; i++) {
+      ListenableFuture<?> blocked = children.get(i).isBlocked();
+      if (blocked.isDone()) {
+        canCallNext[i] = true;
+      } else {
+        if (dataUnavailable(i)) {
           listenableFutures.add(blocked);
+          canCallNext[i] = true;
         }
       }
     }
-    return listenableFutures.isEmpty() ? NOT_BLOCKED : Futures.successfulAsList(listenableFutures);
+    return listenableFutures.isEmpty() ? NOT_BLOCKED : successfulAsList(listenableFutures);
   }
 
   @Override