You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/15 03:33:51 UTC

[iotdb] 01/01: change pipeline logic

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch advancePipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7f9f6e056881833dba743d68ffa266862391ba79
Author: Alima777 <wx...@gmail.com>
AuthorDate: Wed Feb 15 11:33:37 2023 +0800

    change pipeline logic
---
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 169 +++++++++++++--------
 .../db/mpp/plan/plan/PipelineBuilderTest.java      |   9 +-
 2 files changed, 109 insertions(+), 69 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 5d99a29ad1..f24dd4103b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -2243,77 +2243,86 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
       // Keep it since we may change the structure of origin children nodes
       List<PlanNode> afterwardsNodes = new ArrayList<>();
       // 1. Calculate localChildren size
-      int localChildrenSize = 0;
-      for (PlanNode child : node.getChildren()) {
-        if (!(child instanceof ExchangeNode)) {
+      int localChildrenSize = 0, firstChildIndex = -1;
+      for (int i = 0; i < node.getChildren().size(); i++) {
+        if (!(node.getChildren().get(i) instanceof ExchangeNode)) {
           localChildrenSize++;
+          firstChildIndex = firstChildIndex == -1 ? i : firstChildIndex;
+          // deal with exchangeNode at head
+        } else if (firstChildIndex == -1) {
+          Operator childOperation = node.getChildren().get(i).accept(this, context);
+          finalExchangeNum += 1;
+          parentPipelineChildren.add(childOperation);
+          afterwardsNodes.add(node.getChildren().get(i));
         }
       }
-      // 2. divide every childNumInEachPipeline localChildren to different pipeline
-      int[] childNumInEachPipeline =
-          getChildNumInEachPipeline(
-              node.getChildren(), localChildrenSize, context.getDegreeOfParallelism());
-      // If dop > size(children) + 1, we can allocate extra dop to child node
-      // Extra dop = dop - size(children), since dop = 1 means serial but not 0
-      int childGroupNum = Math.min(context.getDegreeOfParallelism(), localChildrenSize);
+      if (firstChildIndex == -1) {
+        context.setExchangeSumNum(finalExchangeNum);
+        return parentPipelineChildren;
+      }
+      // If dop > localChildrenSize + 1, we can allocate extra dop to child node
+      // Extra dop = dop - localChildrenSize, since dop = 1 means serial but not 0
       int dopForChild = Math.max(1, context.getDegreeOfParallelism() - localChildrenSize);
-      int startIndex, endIndex = 0;
-      for (int i = 0; i < childGroupNum; i++) {
-        startIndex = endIndex;
-        endIndex += childNumInEachPipeline[i];
-        // Only if dop >= size(children) + 1, split all children to new pipeline
-        // Otherwise, the first group will belong to the parent pipeline
-        if (i == 0 && context.getDegreeOfParallelism() < localChildrenSize + 1) {
-          for (int j = startIndex; j < endIndex; j++) {
-            Operator childOperation = node.getChildren().get(j).accept(this, context);
+      // If dop > localChildrenSize, we create one new pipeline for each child
+      if (context.getDegreeOfParallelism() > localChildrenSize) {
+        for (int i = firstChildIndex; i < node.getChildren().size(); i++) {
+          PlanNode childNode = node.getChildren().get(i);
+          if (childNode instanceof ExchangeNode) {
+            Operator childOperation = childNode.accept(this, context);
+            finalExchangeNum += 1;
             parentPipelineChildren.add(childOperation);
-            afterwardsNodes.add(node.getChildren().get(j));
+          } else {
+            LocalExecutionPlanContext subContext = context.createSubContext();
+            subContext.setDegreeOfParallelism(dopForChild);
+
+            int originPipeNum = context.getPipelineNumber();
+            Operator sourceOperator = createNewPipelineForChildNode(context, subContext, childNode);
+            parentPipelineChildren.add(sourceOperator);
+            dopForChild =
+                Math.max(1, dopForChild - (subContext.getPipelineNumber() - 1 - originPipeNum));
+            finalExchangeNum += subContext.getExchangeSumNum() - context.getExchangeSumNum() + 1;
           }
-          continue;
         }
-        LocalExecutionPlanContext subContext = context.createSubContext();
-        subContext.setDegreeOfParallelism(dopForChild);
-        // Create partial parent operator for children
-        PlanNode partialParentNode = null;
-        Operator partialParentOperator = null;
-
-        int originPipeNum = context.getPipelineNumber();
-        if (endIndex - startIndex == 1) {
-          partialParentNode = node.getChildren().get(i);
-          partialParentOperator = node.getChildren().get(i).accept(this, subContext);
-        } else {
-          // PartialParentNode is equals to parentNode except children
-          partialParentNode = node.createSubNode(i, startIndex, endIndex);
-          partialParentOperator = partialParentNode.accept(this, subContext);
+      } else {
+        // If dop <= localChildrenSize, we have to divide every childNumInEachPipeline localChildren
+        // to different pipeline
+        int[] childNumInEachPipeline =
+            getChildNumInEachPipeline(
+                node.getChildren(), localChildrenSize, context.getDegreeOfParallelism());
+        int childGroupNum = Math.min(context.getDegreeOfParallelism(), localChildrenSize);
+        int startIndex, endIndex = firstChildIndex;
+        for (int i = 0; i < childGroupNum; i++) {
+          startIndex = endIndex;
+          endIndex += childNumInEachPipeline[i];
+          // Only if dop >= size(children) + 1, split all children to new pipeline
+          // Otherwise, the first group will belong to the parent pipeline
+          if (i == 0) {
+            for (int j = startIndex; j < endIndex; j++) {
+              Operator childOperation = node.getChildren().get(j).accept(this, context);
+              parentPipelineChildren.add(childOperation);
+              afterwardsNodes.add(node.getChildren().get(j));
+            }
+            continue;
+          }
+          LocalExecutionPlanContext subContext = context.createSubContext();
+          subContext.setDegreeOfParallelism(1);
+          // Create partial parent operator for children
+          PlanNode partialParentNode = null;
+          if (endIndex - startIndex == 1) {
+            partialParentNode = node.getChildren().get(i);
+          } else {
+            // PartialParentNode is equals to parentNode except children
+            partialParentNode = node.createSubNode(i, startIndex, endIndex);
+          }
+
+          Operator sourceOperator =
+              createNewPipelineForChildNode(context, subContext, partialParentNode);
+          parentPipelineChildren.add(sourceOperator);
+          afterwardsNodes.add(partialParentNode);
+          finalExchangeNum += subContext.getExchangeSumNum() - context.getExchangeSumNum() + 1;
         }
-        // update dop for child
-        dopForChild = Math.max(1, dopForChild - (subContext.getPipelineNumber() - originPipeNum));
-        ISinkHandle localSinkHandle =
-            MPP_DATA_EXCHANGE_MANAGER.createLocalSinkHandleForPipeline(
-                // Attention, there is no parent node, use first child node instead
-                subContext.getDriverContext(), node.getChildren().get(i).getPlanNodeId().getId());
-        subContext.setSinkHandle(localSinkHandle);
-        subContext.addPipelineDriverFactory(partialParentOperator, subContext.getDriverContext());
-
-        ExchangeOperator sourceOperator =
-            new ExchangeOperator(
-                context
-                    .getDriverContext()
-                    .addOperatorContext(
-                        context.getNextOperatorId(), null, ExchangeOperator.class.getSimpleName()),
-                MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline(
-                    ((LocalSinkHandle) localSinkHandle).getSharedTsBlockQueue(),
-                    context.getDriverContext()),
-                partialParentNode.getPlanNodeId());
-        context
-            .getTimeSliceAllocator()
-            .recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
-        parentPipelineChildren.add(sourceOperator);
-        afterwardsNodes.add(partialParentNode);
-        context.addExchangeOperator(sourceOperator);
-        finalExchangeNum += subContext.getExchangeSumNum() - context.getExchangeSumNum() + 1;
+        ((MultiChildProcessNode) node).setChildren(afterwardsNodes);
       }
-      ((MultiChildProcessNode) node).setChildren(afterwardsNodes);
     }
     context.setExchangeSumNum(finalExchangeNum);
     return parentPipelineChildren;
@@ -2332,9 +2341,13 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     int[] childNumInEachPipeline = new int[maxPipelineNum];
     int avgChildNum = Math.max(1, localChildrenSize / dop);
     // allocate remaining child to group from splitIndex
-    int splitIndex =
-        localChildrenSize <= dop ? maxPipelineNum : maxPipelineNum - localChildrenSize % dop;
-    int pipelineIndex = 0, childIndex = 0;
+    int splitIndex = maxPipelineNum - localChildrenSize % dop;
+    int childIndex = 0;
+    // Skip ExchangeNode at head
+    while (childIndex < allChildren.size() && allChildren.get(childIndex) instanceof ExchangeNode) {
+      childIndex++;
+    }
+    int pipelineIndex = 0;
     while (pipelineIndex < maxPipelineNum) {
       int childNum = pipelineIndex < splitIndex ? avgChildNum : avgChildNum + 1;
       int originChildIndex = childIndex;
@@ -2353,6 +2366,32 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     return childNumInEachPipeline;
   }
 
+  private Operator createNewPipelineForChildNode(
+      LocalExecutionPlanContext context, LocalExecutionPlanContext subContext, PlanNode childNode) {
+    Operator childOperation = childNode.accept(this, subContext);
+    ISinkHandle localSinkHandle =
+        MPP_DATA_EXCHANGE_MANAGER.createLocalSinkHandleForPipeline(
+            // Attention, there is no parent node, use first child node instead
+            subContext.getDriverContext(), childNode.getPlanNodeId().getId());
+    subContext.setSinkHandle(localSinkHandle);
+    subContext.addPipelineDriverFactory(childOperation, subContext.getDriverContext());
+
+    ExchangeOperator sourceOperator =
+        new ExchangeOperator(
+            context
+                .getDriverContext()
+                .addOperatorContext(
+                    context.getNextOperatorId(), null, ExchangeOperator.class.getSimpleName()),
+            MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline(
+                ((LocalSinkHandle) localSinkHandle).getSharedTsBlockQueue(),
+                context.getDriverContext()),
+            childNode.getPlanNodeId());
+
+    context.getTimeSliceAllocator().recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
+    context.addExchangeOperator(sourceOperator);
+    return sourceOperator;
+  }
+
   public List<Operator> dealWithConsumeChildrenOneByOneNode(
       PlanNode node, LocalExecutionPlanContext context) {
     List<Operator> parentPipelineChildren = new ArrayList<>();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
index c0ee46142b..d3db6d7811 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
@@ -679,22 +679,23 @@ public class PipelineBuilderTest {
   @Test
   public void testGetChildNumInEachPipeline() {
     List<PlanNode> allChildren = new ArrayList<>();
-    allChildren.add(new SeriesScanNode(new PlanNodeId("localNode1"), null));
     allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode1")));
+    allChildren.add(new SeriesScanNode(new PlanNodeId("localNode1"), null));
+    allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode2")));
     allChildren.add(new SeriesScanNode(new PlanNodeId("localNode2"), null));
 
     int[] childNumInEachPipeline =
-        operatorTreeGenerator.getChildNumInEachPipeline(allChildren, 2, 3);
+        operatorTreeGenerator.getChildNumInEachPipeline(allChildren, 2, 2);
     assertEquals(2, childNumInEachPipeline.length);
     assertEquals(2, childNumInEachPipeline[0]);
     assertEquals(1, childNumInEachPipeline[1]);
 
     allChildren.add(new SeriesScanNode(new PlanNodeId("localNode3"), null));
     allChildren.add(new SeriesScanNode(new PlanNodeId("localNode4"), null));
-    allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode2")));
     allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode3")));
-    allChildren.add(new SeriesScanNode(new PlanNodeId("localNode5"), null));
     allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode4")));
+    allChildren.add(new SeriesScanNode(new PlanNodeId("localNode5"), null));
+    allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode5")));
     childNumInEachPipeline = operatorTreeGenerator.getChildNumInEachPipeline(allChildren, 5, 3);
     assertEquals(3, childNumInEachPipeline.length);
     assertEquals(2, childNumInEachPipeline[0]);