You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/11 12:00:51 UTC
[iotdb] 01/13: implement consumeAllChildren pipeline divided by dop
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch advancePipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8051908a265c74c351d1c6d7b9da6fdc1008248b
Author: Alima777 <wx...@gmail.com>
AuthorDate: Wed Feb 8 11:29:18 2023 +0800
implement consumeAllChildren pipeline divided by dop
---
.../plan/planner/LocalExecutionPlanContext.java | 9 +++
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 79 ++++++++++++++++++----
.../db/mpp/plan/planner/plan/node/PlanNode.java | 13 ++++
.../planner/plan/node/process/AggregationNode.java | 16 ++++-
.../planner/plan/node/process/DeviceMergeNode.java | 9 +++
.../plan/node/process/GroupByLevelNode.java | 10 +++
.../planner/plan/node/process/GroupByTagNode.java | 15 +++-
.../planner/plan/node/process/MergeSortNode.java | 9 +++
.../planner/plan/node/process/TimeJoinNode.java | 8 +++
.../plan/node/process/VerticallyConcatNode.java | 6 ++
10 files changed, 156 insertions(+), 18 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 2d8e6d7051..32988da5ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -59,6 +59,7 @@ public class LocalExecutionPlanContext {
private final AtomicInteger nextOperatorId;
private final TypeProvider typeProvider;
private final Map<String, Set<String>> allSensorsMap;
+ private int degreeOfParallelism = 4;
// this is shared with all subContexts
private AtomicInteger nextPipelineId;
private List<PipelineDriverFactory> pipelineDriverFactories;
@@ -142,6 +143,14 @@ public class LocalExecutionPlanContext {
return driverContext;
}
+ public int getDegreeOfParallelism() {
+ return degreeOfParallelism;
+ }
+
+ public void setDegreeOfParallelism(int degreeOfParallelism) {
+ this.degreeOfParallelism = degreeOfParallelism;
+ }
+
private int getNextPipelineId() {
return nextPipelineId.getAndIncrement();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index de21989478..d397db9980 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -216,7 +216,9 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -2211,22 +2213,71 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
private List<Operator> dealWithConsumeAllChildrenPipelineBreaker(
PlanNode node, LocalExecutionPlanContext context) {
// children after pipelining
- List<Operator> children = new ArrayList<>();
+ LinkedList<Operator> parentPipelineChildren = new LinkedList<>();
int finalExchangeNum = context.getExchangeSumNum();
- for (PlanNode childSource : node.getChildren()) {
- // Create pipelines for children
- LocalExecutionPlanContext subContext = context.createSubContext();
- Operator childOperation = childSource.accept(this, subContext);
- // If the child belongs to another fragment instance, we don't create pipeline for it
- if (childOperation instanceof ExchangeOperator) {
- children.add(childOperation);
+ // 1. exclude ExchangeOperator first
+ Iterator<PlanNode> childrenIterator = node.getChildren().listIterator();
+ while (childrenIterator.hasNext()) {
+ PlanNode childSource = childrenIterator.next();
+ if (childSource instanceof ExchangeNode) {
+ Operator childOperation = childSource.accept(this, context);
finalExchangeNum += 1;
- } else {
+ parentPipelineChildren.add(childOperation);
+ // Remove exchangeNode directly for later use
+ childrenIterator.remove();
+ }
+ }
+ List<PlanNode> localChildren = node.getChildren();
+ if (context.getDegreeOfParallelism() == 1) {
+ // If dop = 1, we don't create extra pipeline
+ for (PlanNode localChild : localChildren) {
+ Operator childOperation = localChild.accept(this, context);
+ parentPipelineChildren.add(childOperation);
+ }
+ } else {
+ // 2. divide every childNumInEachPipeline localChildren to different pipeline
+ int childNumInEachPipeline =
+ Math.max(1, localChildren.size() / context.getDegreeOfParallelism());
+ // If dop > size(children) + 1, we can allocate extra dop to child node
+ // Extra dop = dop - size(children), since dop = 1 means serial but not 0
+ int maxDop = Math.min(context.getDegreeOfParallelism(), localChildren.size() + 1);
+ int dopForChild = Math.max(1, context.getDegreeOfParallelism() - localChildren.size());
+
+ for (int i = 0; i < maxDop; i++) {
+ // Only if dop >= size(children) + 1, split all children to new pipeline
+ // Otherwise, the first group but not last will belong to the parent pipeline since the
+ // children number of last group is greaterEqual than the first group
+ if (i == 0 && context.getDegreeOfParallelism() < localChildren.size() + 1) {
+ for (int j = 0; j < childNumInEachPipeline; j++) {
+ Operator childOperation = localChildren.get(j).accept(this, context);
+ parentPipelineChildren.addFirst(childOperation);
+ }
+ continue;
+ }
+ LocalExecutionPlanContext subContext = context.createSubContext();
+ subContext.setDegreeOfParallelism(dopForChild);
+ // Create partial parent operator for children
+ PlanNode partialParentNode = null;
+ Operator partialParentOperator = null;
+ if (childNumInEachPipeline == 1) {
+ partialParentNode = localChildren.get(i);
+ partialParentOperator = localChildren.get(i).accept(this, subContext);
+ } else {
+ // PartialParentNode is equals to parentNode except children
+ int startIndex = i * childNumInEachPipeline,
+ endIndex = i < (maxDop - 1) ? (i + 1) * childNumInEachPipeline : localChildren.size();
+ partialParentNode = node.createSubNode(i, startIndex, endIndex);
+ for (int j = startIndex; j < endIndex; j++) {
+ partialParentNode.addChild(localChildren.get(i));
+ }
+ partialParentOperator = partialParentNode.accept(this, context);
+ }
ISinkHandle localSinkHandle =
MPP_DATA_EXCHANGE_MANAGER.createLocalSinkHandleForPipeline(
- subContext.getDriverContext(), childSource.getPlanNodeId().getId());
+ // Attention, there is no parent node, use first child node instead
+ subContext.getDriverContext(), localChildren.get(i).getPlanNodeId().getId());
subContext.setSinkHandle(localSinkHandle);
- subContext.addPipelineDriverFactory(childOperation, subContext.getDriverContext());
+ subContext.addPipelineDriverFactory(partialParentOperator, subContext.getDriverContext());
ExchangeOperator sourceOperator =
new ExchangeOperator(
@@ -2237,17 +2288,17 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline(
((LocalSinkHandle) localSinkHandle).getSharedTsBlockQueue(),
context.getDriverContext()),
- childSource.getPlanNodeId());
+ partialParentNode.getPlanNodeId());
context
.getTimeSliceAllocator()
.recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
- children.add(sourceOperator);
+ parentPipelineChildren.add(sourceOperator);
context.addExchangeOperator(sourceOperator);
finalExchangeNum += subContext.getExchangeSumNum() - context.getExchangeSumNum() + 1;
}
}
context.setExchangeSumNum(finalExchangeNum);
- return children;
+ return parentPipelineChildren;
}
private List<Operator> dealWithConsumeChildrenOneByOneNode(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
index 80ec99d1f3..ae66ddc843 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
@@ -65,6 +65,19 @@ public abstract class PlanNode implements IConsensusRequest {
@Override
public abstract PlanNode clone();
+ /**
+ * Create sub node which has exactly the same function of origin node, only its children is a part
+ * of it, which is composed by the [startIndex, endIndex) of origin children list.
+ *
+ * @param subNodeId the sub node id
+ * @param startIndex the start Index of origin children
+ * @param endIndex the endIndex Index of origin children
+ */
+ public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+ throw new UnsupportedOperationException(
+ String.format("Can't Create subNode for %s", this.getClass().toString()));
+ }
+
public PlanNode cloneWithChildren(List<PlanNode> children) {
if (!(children == null
|| allowedChildCount() == CHILD_COUNT_NO_LIMIT
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
index 6c0c7703e8..69b8eb02db 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
@@ -50,8 +50,7 @@ import java.util.stream.Collectors;
public class AggregationNode extends MultiChildProcessNode {
// The list of aggregate functions, each AggregateDescriptor will be output as one or two column
- // of
- // result TsBlock
+ // of result TsBlock
protected List<AggregationDescriptor> aggregationDescriptorList;
// The parameter of `group by time`.
@@ -167,6 +166,19 @@ public class AggregationNode extends MultiChildProcessNode {
getScanOrder());
}
+ public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+ return new AggregationNode(
+ new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+ children.subList(startIndex, endIndex),
+ // TODO figure out the relation of aggregationDescriptorList and children node
+ getAggregationDescriptorList().subList(startIndex, endIndex),
+ getGroupByTimeParameter(),
+ getGroupByParameter(),
+ getGroupByExpression(),
+ outputEndTime,
+ getScanOrder());
+ }
+
@Override
public List<String> getOutputColumnNames() {
List<String> outputColumnNames = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
index d3ec660e7f..c56d965c49 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
@@ -63,6 +63,15 @@ public class DeviceMergeNode extends MultiChildProcessNode {
return new DeviceMergeNode(getPlanNodeId(), getMergeOrderParameter(), getDevices());
}
+ @Override
+ public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+ return new DeviceMergeNode(
+ new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+ getMergeOrderParameter(),
+ // TODO figure out the relation of devices and children node
+ devices.subList(startIndex, endIndex));
+ }
+
@Override
public List<String> getOutputColumnNames() {
return children.stream()
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
index da7b8dc9c0..8959eae92b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
@@ -93,6 +93,16 @@ public class GroupByLevelNode extends MultiChildProcessNode {
getPlanNodeId(), getGroupByLevelDescriptors(), this.groupByTimeParameter, this.scanOrder);
}
+ @Override
+ public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+ return new GroupByLevelNode(
+ new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+ // TODO figure out the relation of aggregationDescriptorList and children node
+ this.groupByLevelDescriptors.subList(startIndex, endIndex),
+ this.groupByTimeParameter,
+ this.scanOrder);
+ }
+
public List<CrossSeriesAggregationDescriptor> getGroupByLevelDescriptors() {
return groupByLevelDescriptors;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
index c2e558bdc7..87580783f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
@@ -28,9 +28,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.commons.lang3.Validate;
-
import javax.annotation.Nullable;
+import org.apache.commons.lang3.Validate;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -99,6 +98,18 @@ public class GroupByTagNode extends MultiChildProcessNode {
this.outputColumnNames);
}
+ @Override
+ public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+ return new GroupByTagNode(
+ new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+ this.groupByTimeParameter,
+ this.scanOrder,
+ // TODO figure out the relation of aggregationDescriptorList and children node
+ this.tagKeys,
+ this.tagValuesToAggregationDescriptors,
+ this.outputColumnNames);
+ }
+
@Override
public List<String> getOutputColumnNames() {
List<String> ret = new ArrayList<>(tagKeys);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java
index 564a5dbfa4..6c69a70722 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java
@@ -54,6 +54,15 @@ public class MergeSortNode extends MultiChildProcessNode {
return new MergeSortNode(getPlanNodeId(), getMergeOrderParameter(), outputColumns);
}
+ @Override
+ public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+ return new MergeSortNode(
+ new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+ getMergeOrderParameter(),
+ // TODO figure out the relation of outputColumns and children node
+ outputColumns.subList(startIndex, endIndex));
+ }
+
@Override
public List<String> getOutputColumnNames() {
return outputColumns;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
index 2598cd4e28..2e89b841d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
@@ -62,6 +62,14 @@ public class TimeJoinNode extends MultiChildProcessNode {
return new TimeJoinNode(getPlanNodeId(), getMergeOrder());
}
+ @Override
+ public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+ return new TimeJoinNode(
+ new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+ getMergeOrder(),
+ children.subList(startIndex, endIndex));
+ }
+
@Override
public List<String> getOutputColumnNames() {
return children.stream()
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/VerticallyConcatNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/VerticallyConcatNode.java
index 24760a4228..ab6ef28fda 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/VerticallyConcatNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/VerticallyConcatNode.java
@@ -47,6 +47,12 @@ public class VerticallyConcatNode extends MultiChildProcessNode {
return new VerticallyConcatNode(getPlanNodeId());
}
+ @Override
+ public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+ return new VerticallyConcatNode(
+ new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)));
+ }
+
@Override
public List<String> getOutputColumnNames() {
return children.stream()