You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/02/28 07:31:58 UTC
[iotdb] branch master updated: [IOTDB-5593] Improve efficiency of DistributionPlanner by recording map instead of recursive search
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4fd995957f [IOTDB-5593] Improve efficiency of DistributionPlanner by recording map instead of recursive search
4fd995957f is described below
commit 4fd995957fabb030e35d7caf115a99f2f4edb260
Author: Liao Lanyu <14...@qq.com>
AuthorDate: Tue Feb 28 15:31:52 2023 +0800
[IOTDB-5593] Improve efficiency of DistributionPlanner by recording map instead of recursive search
---
.../mpp/execution/exchange/sink/SinkChannel.java | 6 +-
.../execution/exchange/source/SourceHandle.java | 7 ++-
.../planner/distribution/DistributionPlanner.java | 73 +++++++++-------------
.../planner/distribution/ExchangeNodeAdder.java | 15 +++--
.../planner/distribution/NodeGroupContext.java | 7 +--
.../SimpleFragmentParallelPlanner.java | 16 ++---
.../db/mpp/plan/planner/plan/PlanFragment.java | 18 ------
.../plan/node/sink/MultiChildrenSinkNode.java | 4 ++
8 files changed, 63 insertions(+), 83 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
index 9ad82768c6..fbcdaf2e0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
@@ -419,7 +419,11 @@ public class SinkChannel implements ISinkChannel {
public void run() {
try (SetThreadName sinkChannelName = new SetThreadName(threadName)) {
LOGGER.debug(
- "[NotifyNewTsBlock] [{}, {})", startSequenceId, startSequenceId + blockSizes.size());
+ "[NotifyNewTsBlock] [{}, {}) to {}.{}",
+ startSequenceId,
+ startSequenceId + blockSizes.size(),
+ remoteFragmentInstanceId,
+ remotePlanNodeId);
int attempt = 0;
TNewDataBlockEvent newDataBlockEvent =
new TNewDataBlockEvent(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
index 10821ca1ac..db462721d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
@@ -470,7 +470,12 @@ public class SourceHandle implements ISourceHandle {
@Override
public void run() {
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
- LOGGER.debug("[StartPullTsBlocksFromRemote] [{}, {}) ", startSequenceId, endSequenceId);
+ LOGGER.debug(
+ "[StartPullTsBlocksFromRemote] {}-{} [{}, {}) ",
+ remoteFragmentInstanceId,
+ indexOfUpstreamSinkHandle,
+ startSequenceId,
+ endSequenceId);
TGetDataBlockRequest req =
new TGetDataBlockRequest(
remoteFragmentInstanceId,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
index 73fe5c5259..6b58ae1fe7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -45,7 +45,6 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
import org.apache.commons.lang3.Validate;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -89,7 +88,7 @@ public class DistributionPlanner {
&& (((QueryStatement) analysis.getStatement()).isAlignByDevice()),
root);
PlanNode newRoot = adder.visit(root, nodeGroupContext);
- adjustUpStream(nodeGroupContext);
+ adjustUpStream(newRoot, nodeGroupContext);
return newRoot;
}
@@ -99,8 +98,8 @@ public class DistributionPlanner {
* org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.ShuffleSinkNode} for the children of
* ExchangeNodes with Same DataRegion.
*/
- private void adjustUpStream(NodeGroupContext context) {
- if (context.exchangeNodes.isEmpty()) {
+ private void adjustUpStream(PlanNode root, NodeGroupContext context) {
+ if (!context.hasExchangeNode) {
return;
}
@@ -108,44 +107,34 @@ public class DistributionPlanner {
analysis.getStatement() instanceof QueryStatement
&& needShuffleSinkNode((QueryStatement) analysis.getStatement(), context);
- // step1: group children of ExchangeNodes
- Map<TRegionReplicaSet, List<PlanNode>> nodeGroups = new HashMap<>();
- context.exchangeNodes.forEach(
- exchangeNode ->
- nodeGroups
- .computeIfAbsent(
- context.getNodeDistribution(exchangeNode.getChild().getPlanNodeId()).region,
- exchangeNodes -> new ArrayList<>())
- .add(exchangeNode.getChild()));
-
- // step2: add IdentitySinkNode/ShuffleSinkNode as parent for nodes of each group
- nodeGroups
- .values()
- .forEach(
- planNodeList -> {
- MultiChildrenSinkNode parent =
- needShuffleSinkNode
- ? new ShuffleSinkNode(context.queryContext.getQueryId().genPlanNodeId())
- : new IdentitySinkNode(context.queryContext.getQueryId().genPlanNodeId());
- parent.addChildren(planNodeList);
- // we put the parent in list to get it quickly by dataRegion of one ExchangeNode
- planNodeList.add(parent);
- });
-
- // step3: add child for each ExchangeNode,
- // the child is IdentitySinkNode/ShuffleSinkNode we generated in the last step
+ adjustUpStreamHelper(root, new HashMap<>(), needShuffleSinkNode, context);
+ }
- // count the visited time of each SinkNode
- Map<TRegionReplicaSet, Integer> visitedCount = new HashMap<>();
- context.exchangeNodes.forEach(
- exchangeNode -> {
- TRegionReplicaSet regionOfChild =
- context.getNodeDistribution(exchangeNode.getChild().getPlanNodeId()).region;
- visitedCount.compute(regionOfChild, (region, count) -> (count == null) ? 0 : count + 1);
- List<PlanNode> planNodeList = nodeGroups.get(regionOfChild);
- exchangeNode.setChild(planNodeList.get(planNodeList.size() - 1));
- exchangeNode.setIndexOfUpstreamSinkHandle(visitedCount.get(regionOfChild));
- });
+ private void adjustUpStreamHelper(
+ PlanNode root,
+ Map<TRegionReplicaSet, MultiChildrenSinkNode> memo,
+ boolean needShuffleSinkNode,
+ NodeGroupContext context) {
+ for (PlanNode child : root.getChildren()) {
+ adjustUpStreamHelper(child, memo, needShuffleSinkNode, context);
+ if (child instanceof ExchangeNode) {
+ ExchangeNode exchangeNode = (ExchangeNode) child;
+ TRegionReplicaSet regionOfChild =
+ context.getNodeDistribution(exchangeNode.getChild().getPlanNodeId()).region;
+ MultiChildrenSinkNode newChild =
+ memo.computeIfAbsent(
+ regionOfChild,
+ tRegionReplicaSet ->
+ needShuffleSinkNode
+ ? new ShuffleSinkNode(context.queryContext.getQueryId().genPlanNodeId())
+ : new IdentitySinkNode(context.queryContext.getQueryId().genPlanNodeId()));
+ newChild.addChild(exchangeNode.getChild());
+ newChild.addDownStreamChannelLocation(
+ new DownStreamChannelLocation(exchangeNode.getPlanNodeId().toString()));
+ exchangeNode.setChild(newChild);
+ exchangeNode.setIndexOfUpstreamSinkHandle(newChild.getCurrentLastIndex());
+ }
+ }
}
/** Return true if we need to use ShuffleSinkNode instead of IdentitySinkNode. */
@@ -265,8 +254,6 @@ public class DistributionPlanner {
exchangeNode.getChild() instanceof MultiChildrenSinkNode,
"child of ExchangeNode must be MultiChildrenSinkNode");
MultiChildrenSinkNode sinkNode = (MultiChildrenSinkNode) (exchangeNode.getChild());
- sinkNode.addDownStreamChannelLocation(
- new DownStreamChannelLocation(exchangeNode.getPlanNodeId().toString()));
// We cut off the subtree to make the ExchangeNode as the leaf node of current PlanFragment
exchangeNode.cleanChildren();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index 14a5ec1c00..56dfda1cd7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -124,7 +124,7 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
exchangeNode.setChild(child);
exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
- context.exchangeNodes.add(exchangeNode);
+ context.hasExchangeNode = true;
newNode.addChild(exchangeNode);
} else {
newNode.addChild(child);
@@ -288,13 +288,18 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
// For align by device,
// if dataRegions of children are the same, we set child's dataRegion to this node,
// else we set the selected mostlyUsedDataRegion to this node
+ boolean inSame = nodeDistributionIsSame(visitedChildren, context);
dataRegion =
- nodeDistributionIsSame(visitedChildren, context)
+ inSame
? context.getNodeDistribution(visitedChildren.get(0).getPlanNodeId()).region
: context.getMostlyUsedDataRegion();
context.putNodeDistribution(
newNode.getPlanNodeId(),
- new NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, dataRegion));
+ new NodeDistribution(
+ inSame
+ ? NodeDistributionType.SAME_WITH_ALL_CHILDREN
+ : NodeDistributionType.SAME_WITH_SOME_CHILD,
+ dataRegion));
} else {
// TODO For align by time, we keep old logic for now
dataRegion = calculateDataRegionByChildren(visitedChildren, context);
@@ -324,7 +329,7 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
exchangeNode.setChild(child);
exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
- context.exchangeNodes.add(exchangeNode);
+ context.hasExchangeNode = true;
newNode.addChild(exchangeNode);
} else {
newNode.addChild(child);
@@ -345,7 +350,7 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
exchangeNode.setChild(child);
exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
- context.exchangeNodes.add(exchangeNode);
+ context.hasExchangeNode = true;
newNode.addChild(exchangeNode);
}
return newNode;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/NodeGroupContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/NodeGroupContext.java
index 389e30f0eb..6dda89e41f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/NodeGroupContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/NodeGroupContext.java
@@ -24,13 +24,10 @@ import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -39,14 +36,14 @@ public class NodeGroupContext {
private final Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
private final boolean isAlignByDevice;
private final TRegionReplicaSet mostlyUsedDataRegion;
- protected final List<ExchangeNode> exchangeNodes;
+ protected boolean hasExchangeNode;
public NodeGroupContext(MPPQueryContext queryContext, boolean isAlignByDevice, PlanNode root) {
this.queryContext = queryContext;
this.nodeDistributionMap = new HashMap<>();
this.isAlignByDevice = isAlignByDevice;
this.mostlyUsedDataRegion = isAlignByDevice ? getMostlyUsedDataRegion(root) : null;
- this.exchangeNodes = new ArrayList<>();
+ this.hasExchangeNode = false;
}
private TRegionReplicaSet getMostlyUsedDataRegion(PlanNode root) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index d86c96d452..6361a91f98 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -34,12 +34,12 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.MultiChildrenSinkNode;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.ShowQueriesStatement;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,7 +64,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
// Record all the FragmentInstances belonged to same PlanFragment
Map<PlanFragmentId, FragmentInstance> instanceMap;
// Record which PlanFragment the PlanNode belongs
- Map<PlanNodeId, PlanFragmentId> planNodeMap;
+ Map<PlanNodeId, Pair<PlanFragmentId, PlanNode>> planNodeMap;
List<FragmentInstance> fragmentInstanceList;
public SimpleFragmentParallelPlanner(
@@ -93,13 +93,10 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
}
private void produceFragmentInstance(PlanFragment fragment) {
- // If one PlanFragment will produce several FragmentInstance, the instanceIdx will be increased
- // one by one
- PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getPlanNodeTree());
Filter timeFilter = analysis.getGlobalTimeFilter();
FragmentInstance fragmentInstance =
new FragmentInstance(
- new PlanFragment(fragment.getId(), rootCopy),
+ fragment,
fragment.getId().genFragmentInstanceId(),
timeFilter,
queryContext.getQueryType(),
@@ -217,8 +214,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
downStreamInstance.getId().toThrift());
// Set upstream info for corresponding ExchangeNode in downstream FragmentInstance
- PlanNode downStreamExchangeNode =
- downStreamInstance.getFragment().getPlanNodeById(downStreamNodeId);
+ PlanNode downStreamExchangeNode = planNodeMap.get(downStreamNodeId).right;
((ExchangeNode) downStreamExchangeNode)
.setUpstream(
instance.getHostDataNode().getMPPDataExchangeEndPoint(),
@@ -230,11 +226,11 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
}
private FragmentInstance findDownStreamInstance(PlanNodeId exchangeNodeId) {
- return instanceMap.get(planNodeMap.get(exchangeNodeId));
+ return instanceMap.get(planNodeMap.get(exchangeNodeId).left);
}
private void recordPlanNodeRelation(PlanNode root, PlanFragmentId planFragmentId) {
- planNodeMap.put(root.getPlanNodeId(), planFragmentId);
+ planNodeMap.put(root.getPlanNodeId(), new Pair<>(planFragmentId, root));
for (PlanNode child : root.getChildren()) {
recordPlanNodeRelation(child, planFragmentId);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
index aac15e7940..5fd8b79a7a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
import org.apache.iotdb.db.mpp.plan.planner.SubPlanTypeExtractor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.VirtualSourceNode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -133,23 +132,6 @@ public class PlanFragment {
return null;
}
- public PlanNode getPlanNodeById(PlanNodeId nodeId) {
- return getPlanNodeById(planNodeTree, nodeId);
- }
-
- private PlanNode getPlanNodeById(PlanNode root, PlanNodeId nodeId) {
- if (root.getPlanNodeId().equals(nodeId)) {
- return root;
- }
- for (PlanNode child : root.getChildren()) {
- PlanNode node = getPlanNodeById(child, nodeId);
- if (node != null) {
- return node;
- }
- }
- return null;
- }
-
public void serialize(ByteBuffer byteBuffer) {
id.serialize(byteBuffer);
planNodeTree.serialize(byteBuffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/MultiChildrenSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/MultiChildrenSinkNode.java
index 686dae93e9..1e1ad0f428 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/MultiChildrenSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/sink/MultiChildrenSinkNode.java
@@ -114,4 +114,8 @@ public abstract class MultiChildrenSinkNode extends SinkNode {
public void addDownStreamChannelLocation(DownStreamChannelLocation downStreamChannelLocation) {
downStreamChannelLocationList.add(downStreamChannelLocation);
}
+
+ public int getCurrentLastIndex() {
+ return children.size() - 1;
+ }
}