You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/03/01 06:11:39 UTC
[iotdb] 01/01: [IOTDB-5599] Bug: One query is divided too much tasks to be allowed by system
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch SinkBug1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 17624b529f4bdbcb0aac5e29631d359a28eda400
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Wed Mar 1 14:05:37 2023 +0800
[IOTDB-5599] Bug: One query is divided too much tasks to be allowed by system
(cherry picked from commit 388df2ff4c29cfef3dde65a67bc7c2e612fa0d72)
---
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 17 ++++++++---------
1 file changed, 8 insertions(+), 9 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index eb53954cfa..efbb423fbf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -1887,10 +1887,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
IdentitySinkOperator.class.getSimpleName());
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
- List<Operator> children =
- node.getChildren().stream()
- .map(child -> child.accept(this, context))
- .collect(Collectors.toList());
+ List<Operator> children = dealWithConsumeChildrenOneByOneNode(node, context);
checkArgument(
MPP_DATA_EXCHANGE_MANAGER != null, "MPP_DATA_EXCHANGE_MANAGER should not be null");
@@ -1922,10 +1919,9 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
ShuffleHelperOperator.class.getSimpleName());
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
- List<Operator> children =
- node.getChildren().stream()
- .map(child -> child.accept(this, context))
- .collect(Collectors.toList());
+ // TODO implement pipeline division for shuffle sink
+ context.setDegreeOfParallelism(1);
+ List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, context);
checkArgument(
MPP_DATA_EXCHANGE_MANAGER != null, "MPP_DATA_EXCHANGE_MANAGER should not be null");
@@ -2552,7 +2548,10 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
subContext.setISink(localSinkChannel);
subContext.addPipelineDriverFactory(childOperation, subContext.getDriverContext());
- int curChildPipelineNum = subContext.getPipelineNumber() - originPipeNum;
+ // OneByOneChild may be divided into more than dop pipelines, but the number of running
+ // actually is dop
+ int curChildPipelineNum =
+ Math.min(dopForChild, subContext.getPipelineNumber() - originPipeNum);
childPipelineNums.add(curChildPipelineNum);
sumOfChildPipelines += curChildPipelineNum;
// If sumOfChildPipelines > dopForChild, we have to wait until some pipelines finish