You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/03/01 06:11:39 UTC

[iotdb] 01/01: [IOTDB-5599] Bug: One query is divided too much tasks to be allowed by system

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch SinkBug1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 17624b529f4bdbcb0aac5e29631d359a28eda400
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Wed Mar 1 14:05:37 2023 +0800

    [IOTDB-5599] Bug: One query is divided too much tasks to be allowed by system
    
    (cherry picked from commit 388df2ff4c29cfef3dde65a67bc7c2e612fa0d72)
---
 .../db/mpp/plan/planner/OperatorTreeGenerator.java      | 17 ++++++++---------
 1 file changed, 8 insertions(+), 9 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index eb53954cfa..efbb423fbf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -1887,10 +1887,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                 IdentitySinkOperator.class.getSimpleName());
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
 
-    List<Operator> children =
-        node.getChildren().stream()
-            .map(child -> child.accept(this, context))
-            .collect(Collectors.toList());
+    List<Operator> children = dealWithConsumeChildrenOneByOneNode(node, context);
 
     checkArgument(
         MPP_DATA_EXCHANGE_MANAGER != null, "MPP_DATA_EXCHANGE_MANAGER should not be null");
@@ -1922,10 +1919,9 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                 ShuffleHelperOperator.class.getSimpleName());
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
 
-    List<Operator> children =
-        node.getChildren().stream()
-            .map(child -> child.accept(this, context))
-            .collect(Collectors.toList());
+    // TODO implement pipeline division for shuffle sink
+    context.setDegreeOfParallelism(1);
+    List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, context);
 
     checkArgument(
         MPP_DATA_EXCHANGE_MANAGER != null, "MPP_DATA_EXCHANGE_MANAGER should not be null");
@@ -2552,7 +2548,10 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
           subContext.setISink(localSinkChannel);
           subContext.addPipelineDriverFactory(childOperation, subContext.getDriverContext());
 
-          int curChildPipelineNum = subContext.getPipelineNumber() - originPipeNum;
+          // OneByOneChild may be divided into more than dop pipelines, but the number of running
+          // actually is dop
+          int curChildPipelineNum =
+              Math.min(dopForChild, subContext.getPipelineNumber() - originPipeNum);
           childPipelineNums.add(curChildPipelineNum);
           sumOfChildPipelines += curChildPipelineNum;
           // If sumOfChildPipelines > dopForChild, we have to wait until some pipelines finish