You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/15 03:33:50 UTC

[iotdb] branch advancePipeline created (now 7f9f6e0568)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a change to branch advancePipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 7f9f6e0568 change pipeline logic

This branch includes the following new commits:

     new 7f9f6e0568 change pipeline logic

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: change pipeline logic

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch advancePipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7f9f6e056881833dba743d68ffa266862391ba79
Author: Alima777 <wx...@gmail.com>
AuthorDate: Wed Feb 15 11:33:37 2023 +0800

    change pipeline logic
---
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 169 +++++++++++++--------
 .../db/mpp/plan/plan/PipelineBuilderTest.java      |   9 +-
 2 files changed, 109 insertions(+), 69 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 5d99a29ad1..f24dd4103b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -2243,77 +2243,86 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
       // Keep it since we may change the structure of origin children nodes
       List<PlanNode> afterwardsNodes = new ArrayList<>();
       // 1. Calculate localChildren size
-      int localChildrenSize = 0;
-      for (PlanNode child : node.getChildren()) {
-        if (!(child instanceof ExchangeNode)) {
+      int localChildrenSize = 0, firstChildIndex = -1;
+      for (int i = 0; i < node.getChildren().size(); i++) {
+        if (!(node.getChildren().get(i) instanceof ExchangeNode)) {
           localChildrenSize++;
+          firstChildIndex = firstChildIndex == -1 ? i : firstChildIndex;
+          // deal with exchangeNode at head
+        } else if (firstChildIndex == -1) {
+          Operator childOperation = node.getChildren().get(i).accept(this, context);
+          finalExchangeNum += 1;
+          parentPipelineChildren.add(childOperation);
+          afterwardsNodes.add(node.getChildren().get(i));
         }
       }
-      // 2. divide every childNumInEachPipeline localChildren to different pipeline
-      int[] childNumInEachPipeline =
-          getChildNumInEachPipeline(
-              node.getChildren(), localChildrenSize, context.getDegreeOfParallelism());
-      // If dop > size(children) + 1, we can allocate extra dop to child node
-      // Extra dop = dop - size(children), since dop = 1 means serial but not 0
-      int childGroupNum = Math.min(context.getDegreeOfParallelism(), localChildrenSize);
+      if (firstChildIndex == -1) {
+        context.setExchangeSumNum(finalExchangeNum);
+        return parentPipelineChildren;
+      }
+      // If dop > localChildrenSize + 1, we can allocate extra dop to child node
+      // Extra dop = dop - localChildrenSize, since dop = 1 means serial but not 0
       int dopForChild = Math.max(1, context.getDegreeOfParallelism() - localChildrenSize);
-      int startIndex, endIndex = 0;
-      for (int i = 0; i < childGroupNum; i++) {
-        startIndex = endIndex;
-        endIndex += childNumInEachPipeline[i];
-        // Only if dop >= size(children) + 1, split all children to new pipeline
-        // Otherwise, the first group will belong to the parent pipeline
-        if (i == 0 && context.getDegreeOfParallelism() < localChildrenSize + 1) {
-          for (int j = startIndex; j < endIndex; j++) {
-            Operator childOperation = node.getChildren().get(j).accept(this, context);
+      // If dop > localChildrenSize, we create one new pipeline for each child
+      if (context.getDegreeOfParallelism() > localChildrenSize) {
+        for (int i = firstChildIndex; i < node.getChildren().size(); i++) {
+          PlanNode childNode = node.getChildren().get(i);
+          if (childNode instanceof ExchangeNode) {
+            Operator childOperation = childNode.accept(this, context);
+            finalExchangeNum += 1;
             parentPipelineChildren.add(childOperation);
-            afterwardsNodes.add(node.getChildren().get(j));
+          } else {
+            LocalExecutionPlanContext subContext = context.createSubContext();
+            subContext.setDegreeOfParallelism(dopForChild);
+
+            int originPipeNum = context.getPipelineNumber();
+            Operator sourceOperator = createNewPipelineForChildNode(context, subContext, childNode);
+            parentPipelineChildren.add(sourceOperator);
+            dopForChild =
+                Math.max(1, dopForChild - (subContext.getPipelineNumber() - 1 - originPipeNum));
+            finalExchangeNum += subContext.getExchangeSumNum() - context.getExchangeSumNum() + 1;
           }
-          continue;
         }
-        LocalExecutionPlanContext subContext = context.createSubContext();
-        subContext.setDegreeOfParallelism(dopForChild);
-        // Create partial parent operator for children
-        PlanNode partialParentNode = null;
-        Operator partialParentOperator = null;
-
-        int originPipeNum = context.getPipelineNumber();
-        if (endIndex - startIndex == 1) {
-          partialParentNode = node.getChildren().get(i);
-          partialParentOperator = node.getChildren().get(i).accept(this, subContext);
-        } else {
-          // PartialParentNode is equals to parentNode except children
-          partialParentNode = node.createSubNode(i, startIndex, endIndex);
-          partialParentOperator = partialParentNode.accept(this, subContext);
+      } else {
+        // If dop <= localChildrenSize, we have to divide every childNumInEachPipeline localChildren
+        // to different pipeline
+        int[] childNumInEachPipeline =
+            getChildNumInEachPipeline(
+                node.getChildren(), localChildrenSize, context.getDegreeOfParallelism());
+        int childGroupNum = Math.min(context.getDegreeOfParallelism(), localChildrenSize);
+        int startIndex, endIndex = firstChildIndex;
+        for (int i = 0; i < childGroupNum; i++) {
+          startIndex = endIndex;
+          endIndex += childNumInEachPipeline[i];
+          // Only if dop >= size(children) + 1, split all children to new pipeline
+          // Otherwise, the first group will belong to the parent pipeline
+          if (i == 0) {
+            for (int j = startIndex; j < endIndex; j++) {
+              Operator childOperation = node.getChildren().get(j).accept(this, context);
+              parentPipelineChildren.add(childOperation);
+              afterwardsNodes.add(node.getChildren().get(j));
+            }
+            continue;
+          }
+          LocalExecutionPlanContext subContext = context.createSubContext();
+          subContext.setDegreeOfParallelism(1);
+          // Create partial parent operator for children
+          PlanNode partialParentNode = null;
+          if (endIndex - startIndex == 1) {
+            partialParentNode = node.getChildren().get(i);
+          } else {
+            // PartialParentNode is equals to parentNode except children
+            partialParentNode = node.createSubNode(i, startIndex, endIndex);
+          }
+
+          Operator sourceOperator =
+              createNewPipelineForChildNode(context, subContext, partialParentNode);
+          parentPipelineChildren.add(sourceOperator);
+          afterwardsNodes.add(partialParentNode);
+          finalExchangeNum += subContext.getExchangeSumNum() - context.getExchangeSumNum() + 1;
         }
-        // update dop for child
-        dopForChild = Math.max(1, dopForChild - (subContext.getPipelineNumber() - originPipeNum));
-        ISinkHandle localSinkHandle =
-            MPP_DATA_EXCHANGE_MANAGER.createLocalSinkHandleForPipeline(
-                // Attention, there is no parent node, use first child node instead
-                subContext.getDriverContext(), node.getChildren().get(i).getPlanNodeId().getId());
-        subContext.setSinkHandle(localSinkHandle);
-        subContext.addPipelineDriverFactory(partialParentOperator, subContext.getDriverContext());
-
-        ExchangeOperator sourceOperator =
-            new ExchangeOperator(
-                context
-                    .getDriverContext()
-                    .addOperatorContext(
-                        context.getNextOperatorId(), null, ExchangeOperator.class.getSimpleName()),
-                MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline(
-                    ((LocalSinkHandle) localSinkHandle).getSharedTsBlockQueue(),
-                    context.getDriverContext()),
-                partialParentNode.getPlanNodeId());
-        context
-            .getTimeSliceAllocator()
-            .recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
-        parentPipelineChildren.add(sourceOperator);
-        afterwardsNodes.add(partialParentNode);
-        context.addExchangeOperator(sourceOperator);
-        finalExchangeNum += subContext.getExchangeSumNum() - context.getExchangeSumNum() + 1;
+        ((MultiChildProcessNode) node).setChildren(afterwardsNodes);
       }
-      ((MultiChildProcessNode) node).setChildren(afterwardsNodes);
     }
     context.setExchangeSumNum(finalExchangeNum);
     return parentPipelineChildren;
@@ -2332,9 +2341,13 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     int[] childNumInEachPipeline = new int[maxPipelineNum];
     int avgChildNum = Math.max(1, localChildrenSize / dop);
     // allocate remaining child to group from splitIndex
-    int splitIndex =
-        localChildrenSize <= dop ? maxPipelineNum : maxPipelineNum - localChildrenSize % dop;
-    int pipelineIndex = 0, childIndex = 0;
+    int splitIndex = maxPipelineNum - localChildrenSize % dop;
+    int childIndex = 0;
+    // Skip ExchangeNode at head
+    while (childIndex < allChildren.size() && allChildren.get(childIndex) instanceof ExchangeNode) {
+      childIndex++;
+    }
+    int pipelineIndex = 0;
     while (pipelineIndex < maxPipelineNum) {
       int childNum = pipelineIndex < splitIndex ? avgChildNum : avgChildNum + 1;
       int originChildIndex = childIndex;
@@ -2353,6 +2366,32 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     return childNumInEachPipeline;
   }
 
+  private Operator createNewPipelineForChildNode(
+      LocalExecutionPlanContext context, LocalExecutionPlanContext subContext, PlanNode childNode) {
+    Operator childOperation = childNode.accept(this, subContext);
+    ISinkHandle localSinkHandle =
+        MPP_DATA_EXCHANGE_MANAGER.createLocalSinkHandleForPipeline(
+            // Attention, there is no parent node, use first child node instead
+            subContext.getDriverContext(), childNode.getPlanNodeId().getId());
+    subContext.setSinkHandle(localSinkHandle);
+    subContext.addPipelineDriverFactory(childOperation, subContext.getDriverContext());
+
+    ExchangeOperator sourceOperator =
+        new ExchangeOperator(
+            context
+                .getDriverContext()
+                .addOperatorContext(
+                    context.getNextOperatorId(), null, ExchangeOperator.class.getSimpleName()),
+            MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline(
+                ((LocalSinkHandle) localSinkHandle).getSharedTsBlockQueue(),
+                context.getDriverContext()),
+            childNode.getPlanNodeId());
+
+    context.getTimeSliceAllocator().recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
+    context.addExchangeOperator(sourceOperator);
+    return sourceOperator;
+  }
+
   public List<Operator> dealWithConsumeChildrenOneByOneNode(
       PlanNode node, LocalExecutionPlanContext context) {
     List<Operator> parentPipelineChildren = new ArrayList<>();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
index c0ee46142b..d3db6d7811 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
@@ -679,22 +679,23 @@ public class PipelineBuilderTest {
   @Test
   public void testGetChildNumInEachPipeline() {
     List<PlanNode> allChildren = new ArrayList<>();
-    allChildren.add(new SeriesScanNode(new PlanNodeId("localNode1"), null));
     allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode1")));
+    allChildren.add(new SeriesScanNode(new PlanNodeId("localNode1"), null));
+    allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode2")));
     allChildren.add(new SeriesScanNode(new PlanNodeId("localNode2"), null));
 
     int[] childNumInEachPipeline =
-        operatorTreeGenerator.getChildNumInEachPipeline(allChildren, 2, 3);
+        operatorTreeGenerator.getChildNumInEachPipeline(allChildren, 2, 2);
     assertEquals(2, childNumInEachPipeline.length);
     assertEquals(2, childNumInEachPipeline[0]);
     assertEquals(1, childNumInEachPipeline[1]);
 
     allChildren.add(new SeriesScanNode(new PlanNodeId("localNode3"), null));
     allChildren.add(new SeriesScanNode(new PlanNodeId("localNode4"), null));
-    allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode2")));
     allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode3")));
-    allChildren.add(new SeriesScanNode(new PlanNodeId("localNode5"), null));
     allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode4")));
+    allChildren.add(new SeriesScanNode(new PlanNodeId("localNode5"), null));
+    allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode5")));
     childNumInEachPipeline = operatorTreeGenerator.getChildNumInEachPipeline(allChildren, 5, 3);
     assertEquals(3, childNumInEachPipeline.length);
     assertEquals(2, childNumInEachPipeline[0]);