You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/15 03:33:51 UTC
[iotdb] 01/01: change pipeline logic
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch advancePipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7f9f6e056881833dba743d68ffa266862391ba79
Author: Alima777 <wx...@gmail.com>
AuthorDate: Wed Feb 15 11:33:37 2023 +0800
change pipeline logic
---
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 169 +++++++++++++--------
.../db/mpp/plan/plan/PipelineBuilderTest.java | 9 +-
2 files changed, 109 insertions(+), 69 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 5d99a29ad1..f24dd4103b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -2243,77 +2243,86 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
// Keep it since we may change the structure of origin children nodes
List<PlanNode> afterwardsNodes = new ArrayList<>();
// 1. Calculate localChildren size
- int localChildrenSize = 0;
- for (PlanNode child : node.getChildren()) {
- if (!(child instanceof ExchangeNode)) {
+ int localChildrenSize = 0, firstChildIndex = -1;
+ for (int i = 0; i < node.getChildren().size(); i++) {
+ if (!(node.getChildren().get(i) instanceof ExchangeNode)) {
localChildrenSize++;
+ firstChildIndex = firstChildIndex == -1 ? i : firstChildIndex;
+ // deal with exchangeNode at head
+ } else if (firstChildIndex == -1) {
+ Operator childOperation = node.getChildren().get(i).accept(this, context);
+ finalExchangeNum += 1;
+ parentPipelineChildren.add(childOperation);
+ afterwardsNodes.add(node.getChildren().get(i));
}
}
- // 2. divide every childNumInEachPipeline localChildren to different pipeline
- int[] childNumInEachPipeline =
- getChildNumInEachPipeline(
- node.getChildren(), localChildrenSize, context.getDegreeOfParallelism());
- // If dop > size(children) + 1, we can allocate extra dop to child node
- // Extra dop = dop - size(children), since dop = 1 means serial but not 0
- int childGroupNum = Math.min(context.getDegreeOfParallelism(), localChildrenSize);
+ if (firstChildIndex == -1) {
+ context.setExchangeSumNum(finalExchangeNum);
+ return parentPipelineChildren;
+ }
+ // If dop > localChildrenSize + 1, we can allocate extra dop to child node
+ // Extra dop = dop - localChildrenSize, since dop = 1 means serial but not 0
int dopForChild = Math.max(1, context.getDegreeOfParallelism() - localChildrenSize);
- int startIndex, endIndex = 0;
- for (int i = 0; i < childGroupNum; i++) {
- startIndex = endIndex;
- endIndex += childNumInEachPipeline[i];
- // Only if dop >= size(children) + 1, split all children to new pipeline
- // Otherwise, the first group will belong to the parent pipeline
- if (i == 0 && context.getDegreeOfParallelism() < localChildrenSize + 1) {
- for (int j = startIndex; j < endIndex; j++) {
- Operator childOperation = node.getChildren().get(j).accept(this, context);
+ // If dop > localChildrenSize, we create one new pipeline for each child
+ if (context.getDegreeOfParallelism() > localChildrenSize) {
+ for (int i = firstChildIndex; i < node.getChildren().size(); i++) {
+ PlanNode childNode = node.getChildren().get(i);
+ if (childNode instanceof ExchangeNode) {
+ Operator childOperation = childNode.accept(this, context);
+ finalExchangeNum += 1;
parentPipelineChildren.add(childOperation);
- afterwardsNodes.add(node.getChildren().get(j));
+ } else {
+ LocalExecutionPlanContext subContext = context.createSubContext();
+ subContext.setDegreeOfParallelism(dopForChild);
+
+ int originPipeNum = context.getPipelineNumber();
+ Operator sourceOperator = createNewPipelineForChildNode(context, subContext, childNode);
+ parentPipelineChildren.add(sourceOperator);
+ dopForChild =
+ Math.max(1, dopForChild - (subContext.getPipelineNumber() - 1 - originPipeNum));
+ finalExchangeNum += subContext.getExchangeSumNum() - context.getExchangeSumNum() + 1;
}
- continue;
}
- LocalExecutionPlanContext subContext = context.createSubContext();
- subContext.setDegreeOfParallelism(dopForChild);
- // Create partial parent operator for children
- PlanNode partialParentNode = null;
- Operator partialParentOperator = null;
-
- int originPipeNum = context.getPipelineNumber();
- if (endIndex - startIndex == 1) {
- partialParentNode = node.getChildren().get(i);
- partialParentOperator = node.getChildren().get(i).accept(this, subContext);
- } else {
- // PartialParentNode is equals to parentNode except children
- partialParentNode = node.createSubNode(i, startIndex, endIndex);
- partialParentOperator = partialParentNode.accept(this, subContext);
+ } else {
+ // If dop <= localChildrenSize, we have to divide every childNumInEachPipeline localChildren
+ // to different pipeline
+ int[] childNumInEachPipeline =
+ getChildNumInEachPipeline(
+ node.getChildren(), localChildrenSize, context.getDegreeOfParallelism());
+ int childGroupNum = Math.min(context.getDegreeOfParallelism(), localChildrenSize);
+ int startIndex, endIndex = firstChildIndex;
+ for (int i = 0; i < childGroupNum; i++) {
+ startIndex = endIndex;
+ endIndex += childNumInEachPipeline[i];
+ // Only if dop >= size(children) + 1, split all children to new pipeline
+ // Otherwise, the first group will belong to the parent pipeline
+ if (i == 0) {
+ for (int j = startIndex; j < endIndex; j++) {
+ Operator childOperation = node.getChildren().get(j).accept(this, context);
+ parentPipelineChildren.add(childOperation);
+ afterwardsNodes.add(node.getChildren().get(j));
+ }
+ continue;
+ }
+ LocalExecutionPlanContext subContext = context.createSubContext();
+ subContext.setDegreeOfParallelism(1);
+ // Create partial parent operator for children
+ PlanNode partialParentNode = null;
+ if (endIndex - startIndex == 1) {
+ partialParentNode = node.getChildren().get(i);
+ } else {
+ // PartialParentNode is equals to parentNode except children
+ partialParentNode = node.createSubNode(i, startIndex, endIndex);
+ }
+
+ Operator sourceOperator =
+ createNewPipelineForChildNode(context, subContext, partialParentNode);
+ parentPipelineChildren.add(sourceOperator);
+ afterwardsNodes.add(partialParentNode);
+ finalExchangeNum += subContext.getExchangeSumNum() - context.getExchangeSumNum() + 1;
}
- // update dop for child
- dopForChild = Math.max(1, dopForChild - (subContext.getPipelineNumber() - originPipeNum));
- ISinkHandle localSinkHandle =
- MPP_DATA_EXCHANGE_MANAGER.createLocalSinkHandleForPipeline(
- // Attention, there is no parent node, use first child node instead
- subContext.getDriverContext(), node.getChildren().get(i).getPlanNodeId().getId());
- subContext.setSinkHandle(localSinkHandle);
- subContext.addPipelineDriverFactory(partialParentOperator, subContext.getDriverContext());
-
- ExchangeOperator sourceOperator =
- new ExchangeOperator(
- context
- .getDriverContext()
- .addOperatorContext(
- context.getNextOperatorId(), null, ExchangeOperator.class.getSimpleName()),
- MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline(
- ((LocalSinkHandle) localSinkHandle).getSharedTsBlockQueue(),
- context.getDriverContext()),
- partialParentNode.getPlanNodeId());
- context
- .getTimeSliceAllocator()
- .recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
- parentPipelineChildren.add(sourceOperator);
- afterwardsNodes.add(partialParentNode);
- context.addExchangeOperator(sourceOperator);
- finalExchangeNum += subContext.getExchangeSumNum() - context.getExchangeSumNum() + 1;
+ ((MultiChildProcessNode) node).setChildren(afterwardsNodes);
}
- ((MultiChildProcessNode) node).setChildren(afterwardsNodes);
}
context.setExchangeSumNum(finalExchangeNum);
return parentPipelineChildren;
@@ -2332,9 +2341,13 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
int[] childNumInEachPipeline = new int[maxPipelineNum];
int avgChildNum = Math.max(1, localChildrenSize / dop);
// allocate remaining child to group from splitIndex
- int splitIndex =
- localChildrenSize <= dop ? maxPipelineNum : maxPipelineNum - localChildrenSize % dop;
- int pipelineIndex = 0, childIndex = 0;
+ int splitIndex = maxPipelineNum - localChildrenSize % dop;
+ int childIndex = 0;
+ // Skip ExchangeNode at head
+ while (childIndex < allChildren.size() && allChildren.get(childIndex) instanceof ExchangeNode) {
+ childIndex++;
+ }
+ int pipelineIndex = 0;
while (pipelineIndex < maxPipelineNum) {
int childNum = pipelineIndex < splitIndex ? avgChildNum : avgChildNum + 1;
int originChildIndex = childIndex;
@@ -2353,6 +2366,32 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
return childNumInEachPipeline;
}
+ private Operator createNewPipelineForChildNode(
+ LocalExecutionPlanContext context, LocalExecutionPlanContext subContext, PlanNode childNode) {
+ Operator childOperation = childNode.accept(this, subContext);
+ ISinkHandle localSinkHandle =
+ MPP_DATA_EXCHANGE_MANAGER.createLocalSinkHandleForPipeline(
+ // Attention, there is no parent node, use first child node instead
+ subContext.getDriverContext(), childNode.getPlanNodeId().getId());
+ subContext.setSinkHandle(localSinkHandle);
+ subContext.addPipelineDriverFactory(childOperation, subContext.getDriverContext());
+
+ ExchangeOperator sourceOperator =
+ new ExchangeOperator(
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(), null, ExchangeOperator.class.getSimpleName()),
+ MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline(
+ ((LocalSinkHandle) localSinkHandle).getSharedTsBlockQueue(),
+ context.getDriverContext()),
+ childNode.getPlanNodeId());
+
+ context.getTimeSliceAllocator().recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
+ context.addExchangeOperator(sourceOperator);
+ return sourceOperator;
+ }
+
public List<Operator> dealWithConsumeChildrenOneByOneNode(
PlanNode node, LocalExecutionPlanContext context) {
List<Operator> parentPipelineChildren = new ArrayList<>();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
index c0ee46142b..d3db6d7811 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java
@@ -679,22 +679,23 @@ public class PipelineBuilderTest {
@Test
public void testGetChildNumInEachPipeline() {
List<PlanNode> allChildren = new ArrayList<>();
- allChildren.add(new SeriesScanNode(new PlanNodeId("localNode1"), null));
allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode1")));
+ allChildren.add(new SeriesScanNode(new PlanNodeId("localNode1"), null));
+ allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode2")));
allChildren.add(new SeriesScanNode(new PlanNodeId("localNode2"), null));
int[] childNumInEachPipeline =
- operatorTreeGenerator.getChildNumInEachPipeline(allChildren, 2, 3);
+ operatorTreeGenerator.getChildNumInEachPipeline(allChildren, 2, 2);
assertEquals(2, childNumInEachPipeline.length);
assertEquals(2, childNumInEachPipeline[0]);
assertEquals(1, childNumInEachPipeline[1]);
allChildren.add(new SeriesScanNode(new PlanNodeId("localNode3"), null));
allChildren.add(new SeriesScanNode(new PlanNodeId("localNode4"), null));
- allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode2")));
allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode3")));
- allChildren.add(new SeriesScanNode(new PlanNodeId("localNode5"), null));
allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode4")));
+ allChildren.add(new SeriesScanNode(new PlanNodeId("localNode5"), null));
+ allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode5")));
childNumInEachPipeline = operatorTreeGenerator.getChildNumInEachPipeline(allChildren, 5, 3);
assertEquals(3, childNumInEachPipeline.length);
assertEquals(2, childNumInEachPipeline[0]);