You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/29 03:23:13 UTC
[iotdb] 01/01: fix
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/FixGroupByTag
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2b043dfff2b10585cb3ebc8a85bca7ba6dc92388
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Nov 29 11:22:35 2022 +0800
fix
---
.../operator/process/TagAggregationOperator.java | 41 ++++++++++++++--------
1 file changed, 26 insertions(+), 15 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
index 4abbd1dcb3..cf18c26b74 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.iotdb.tsfile.utils.Binary;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.lang3.Validate;
@@ -38,6 +37,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
public class TagAggregationOperator implements ProcessOperator {
private final OperatorContext operatorContext;
@@ -46,8 +47,8 @@ public class TagAggregationOperator implements ProcessOperator {
private final List<Operator> children;
private final TsBlock[] inputTsBlocks;
- // Indicate whether a child operator's next() is called
- private final boolean[] hasCalledNext;
+ // Indicate whether a child operator's next() can be called
+ private final boolean[] canCallNext;
// These fields record the to be consumed index of each tsBlock.
private final int[] consumedIndices;
@@ -84,7 +85,8 @@ public class TagAggregationOperator implements ProcessOperator {
this.tsBlockBuilder = new TsBlockBuilder(actualOutputColumnTypes);
// Initialize input tsblocks for each aggregator group.
this.inputTsBlocks = new TsBlock[children.size()];
- this.hasCalledNext = new boolean[children.size()];
+ this.canCallNext = new boolean[children.size()];
+ Arrays.fill(canCallNext, false);
this.consumedIndices = new int[children.size()];
this.maxRetainedSize = children.stream().mapToLong(Operator::calculateMaxReturnSize).sum();
this.childrenRetainedSize =
@@ -99,9 +101,9 @@ public class TagAggregationOperator implements ProcessOperator {
@Override
public TsBlock next() {
- Arrays.fill(hasCalledNext, false);
long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
long start = System.nanoTime();
+
boolean successful = true;
while (System.nanoTime() - start < maxRuntime && !tsBlockBuilder.isFull() && successful) {
successful = processOneRow();
@@ -116,13 +118,19 @@ public class TagAggregationOperator implements ProcessOperator {
private boolean processOneRow() {
for (int i = 0; i < children.size(); i++) {
- // If the data is unavailable first, try to find next tsblock of the child.
- if (dataUnavailable(i) && !hasCalledNext[i]) {
- inputTsBlocks[i] = children.get(i).next();
- consumedIndices[i] = 0;
- hasCalledNext[i] = true;
+ if (!dataUnavailable(i)) {
+ continue;
+ }
+
+ if (!canCallNext[i]) {
+ return false;
}
+ // If the data is unavailable first, try to find next tsblock of the child.
+ inputTsBlocks[i] = children.get(i).next();
+ consumedIndices[i] = 0;
+ canCallNext[i] = false;
+
// If it's still unavailable, then blocked by children i.
if (dataUnavailable(i)) {
return false;
@@ -193,15 +201,18 @@ public class TagAggregationOperator implements ProcessOperator {
@Override
public ListenableFuture<?> isBlocked() {
List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
- for (int i = 0; i < children.size(); i++) {
- if (dataUnavailable(i)) {
- ListenableFuture<?> blocked = children.get(i).isBlocked();
- if (!blocked.isDone()) {
+ for (int i = 0, size = children.size(); i < size; i++) {
+ ListenableFuture<?> blocked = children.get(i).isBlocked();
+ if (blocked.isDone()) {
+ canCallNext[i] = true;
+ } else {
+ if (dataUnavailable(i)) {
listenableFutures.add(blocked);
+ canCallNext[i] = true;
}
}
}
- return listenableFutures.isEmpty() ? NOT_BLOCKED : Futures.successfulAsList(listenableFutures);
+ return listenableFutures.isEmpty() ? NOT_BLOCKED : successfulAsList(listenableFutures);
}
@Override