You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/03/01 08:00:47 UTC

[iotdb] branch rel/1.1 updated: [IOTDB-5599] Bug: One query is divided too much tasks to be allowed by system

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new 31c451a27e [IOTDB-5599] Bug: One query is divided too much tasks to be allowed by system
31c451a27e is described below

commit 31c451a27eb123ad0ac90f300917175721c29d5a
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Wed Mar 1 16:00:38 2023 +0800

    [IOTDB-5599] Bug: One query is divided too much tasks to be allowed by system
---
 .../db/mpp/plan/planner/OperatorTreeGenerator.java      | 17 ++++++++---------
 1 file changed, 8 insertions(+), 9 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index eb53954cfa..efbb423fbf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -1887,10 +1887,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                 IdentitySinkOperator.class.getSimpleName());
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
 
-    List<Operator> children =
-        node.getChildren().stream()
-            .map(child -> child.accept(this, context))
-            .collect(Collectors.toList());
+    List<Operator> children = dealWithConsumeChildrenOneByOneNode(node, context);
 
     checkArgument(
         MPP_DATA_EXCHANGE_MANAGER != null, "MPP_DATA_EXCHANGE_MANAGER should not be null");
@@ -1922,10 +1919,9 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
                 ShuffleHelperOperator.class.getSimpleName());
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
 
-    List<Operator> children =
-        node.getChildren().stream()
-            .map(child -> child.accept(this, context))
-            .collect(Collectors.toList());
+    // TODO implement pipeline division for shuffle sink
+    context.setDegreeOfParallelism(1);
+    List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, context);
 
     checkArgument(
         MPP_DATA_EXCHANGE_MANAGER != null, "MPP_DATA_EXCHANGE_MANAGER should not be null");
@@ -2552,7 +2548,10 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
           subContext.setISink(localSinkChannel);
           subContext.addPipelineDriverFactory(childOperation, subContext.getDriverContext());
 
-          int curChildPipelineNum = subContext.getPipelineNumber() - originPipeNum;
+          // OneByOneChild may be divided into more than dop pipelines, but the number of running
+          // actually is dop
+          int curChildPipelineNum =
+              Math.min(dopForChild, subContext.getPipelineNumber() - originPipeNum);
           childPipelineNums.add(curChildPipelineNum);
           sumOfChildPipelines += curChildPipelineNum;
           // If sumOfChildPipelines > dopForChild, we have to wait until some pipelines finish