You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/01 20:02:22 UTC
(pinot) branch master updated: Refactor PlanFragmenter to make the logic more clear (#11912)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 03a9ec73cd Refactor PlanFragmenter to make the logic more clear (#11912)
03a9ec73cd is described below
commit 03a9ec73cd8faee5d6b0a27f09c78cbbc9e75eb5
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed Nov 1 13:02:17 2023 -0700
Refactor PlanFragmenter to make the logic more clear (#11912)
---
.../planner/logical/PinotLogicalQueryPlanner.java | 50 ++++++------
.../query/planner/logical/PlanFragmenter.java | 90 ++++++++++++----------
2 files changed, 77 insertions(+), 63 deletions(-)
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
index 4659eb593c..07df7845b8 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
@@ -18,6 +18,10 @@
*/
package org.apache.pinot.query.planner.logical;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.ints.IntListIterator;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -83,10 +87,13 @@ public class PinotLogicalQueryPlanner {
for (Map.Entry<Integer, PlanNode> subPlanEntry : subPlanContext._subPlanIdToRootNodeMap.entrySet()) {
int subPlanId = subPlanEntry.getKey();
PlanNode subPlanRoot = subPlanEntry.getValue();
- PlanFragmenter.Context planFragmentContext = new PlanFragmenter.Context();
- planFragmentContext._planFragmentIdToRootNodeMap.put(1,
- new PlanFragment(1, subPlanRoot, new PlanFragmentMetadata(), new ArrayList<>()));
- subPlanRoot = subPlanRoot.visit(PlanFragmenter.INSTANCE, planFragmentContext);
+
+ // Fragment the SubPlan into multiple PlanFragments.
+ PlanFragmenter fragmenter = new PlanFragmenter();
+ PlanFragmenter.Context fragmenterContext = fragmenter.createContext();
+ subPlanRoot = subPlanRoot.visit(fragmenter, fragmenterContext);
+ Int2ObjectOpenHashMap<PlanFragment> planFragmentMap = fragmenter.getPlanFragmentMap();
+ Int2ObjectOpenHashMap<IntList> childPlanFragmentIdsMap = fragmenter.getChildPlanFragmentIdsMap();
// Sub plan root needs to send final results back to the Broker
// TODO: Should be SINGLETON (currently SINGLETON has to be local, so use BROADCAST_DISTRIBUTED instead)
@@ -95,27 +102,24 @@ public class PinotLogicalQueryPlanner {
RelDistribution.Type.BROADCAST_DISTRIBUTED, PinotRelExchangeType.getDefaultExchangeType(), null, null,
false);
subPlanRootSenderNode.addInput(subPlanRoot);
- subPlanRoot = new MailboxReceiveNode(0, subPlanRoot.getDataSchema(), subPlanRoot.getPlanFragmentId(),
- RelDistribution.Type.BROADCAST_DISTRIBUTED, PinotRelExchangeType.getDefaultExchangeType(), null, null, false,
- false, subPlanRootSenderNode);
- PlanFragment planFragment1 = planFragmentContext._planFragmentIdToRootNodeMap.get(1);
- planFragmentContext._planFragmentIdToRootNodeMap.put(1,
- new PlanFragment(1, subPlanRootSenderNode, planFragment1.getFragmentMetadata(), planFragment1.getChildren()));
- PlanFragment rootPlanFragment =
- new PlanFragment(subPlanRoot.getPlanFragmentId(), subPlanRoot, new PlanFragmentMetadata(),
- Collections.singletonList(planFragmentContext._planFragmentIdToRootNodeMap.get(1)));
- planFragmentContext._planFragmentIdToRootNodeMap.put(0, rootPlanFragment);
- for (Map.Entry<Integer, List<Integer>> planFragmentToChildrenEntry
- : planFragmentContext._planFragmentIdToChildrenMap.entrySet()) {
- int planFragmentId = planFragmentToChildrenEntry.getKey();
- List<Integer> planFragmentChildren = planFragmentToChildrenEntry.getValue();
- for (int planFragmentChild : planFragmentChildren) {
- planFragmentContext._planFragmentIdToRootNodeMap.get(planFragmentId).getChildren()
- .add(planFragmentContext._planFragmentIdToRootNodeMap.get(planFragmentChild));
+ PlanFragment planFragment1 =
+ new PlanFragment(1, subPlanRootSenderNode, new PlanFragmentMetadata(), new ArrayList<>());
+ planFragmentMap.put(1, planFragment1);
+ for (Int2ObjectMap.Entry<IntList> entry : childPlanFragmentIdsMap.int2ObjectEntrySet()) {
+ PlanFragment planFragment = planFragmentMap.get(entry.getIntKey());
+ List<PlanFragment> childPlanFragments = planFragment.getChildren();
+ IntListIterator childPlanFragmentIdIterator = entry.getValue().iterator();
+ while (childPlanFragmentIdIterator.hasNext()) {
+ childPlanFragments.add(planFragmentMap.get(childPlanFragmentIdIterator.nextInt()));
}
}
- SubPlan subPlan = new SubPlan(planFragmentContext._planFragmentIdToRootNodeMap.get(0),
- subPlanContext._subPlanIdToMetadataMap.get(0), new ArrayList<>());
+ MailboxReceiveNode rootReceiveNode =
+ new MailboxReceiveNode(0, subPlanRoot.getDataSchema(), subPlanRoot.getPlanFragmentId(),
+ RelDistribution.Type.BROADCAST_DISTRIBUTED, PinotRelExchangeType.getDefaultExchangeType(), null, null,
+ false, false, subPlanRootSenderNode);
+ PlanFragment rootPlanFragment =
+ new PlanFragment(0, rootReceiveNode, new PlanFragmentMetadata(), Collections.singletonList(planFragment1));
+ SubPlan subPlan = new SubPlan(rootPlanFragment, subPlanContext._subPlanIdToMetadataMap.get(0), new ArrayList<>());
subPlanMap.put(subPlanId, subPlan);
}
for (Map.Entry<Integer, List<Integer>> subPlanToChildrenEntry : subPlanContext._subPlanIdToChildrenMap.entrySet()) {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java
index fdb5858fdd..9d7e5a2fdd 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java
@@ -18,14 +18,16 @@
*/
package org.apache.pinot.query.planner.logical;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.logical.PinotRelExchangeType;
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.planner.PlanFragmentMetadata;
+import org.apache.pinot.query.planner.SubPlan;
import org.apache.pinot.query.planner.plannode.AggregateNode;
import org.apache.pinot.query.planner.plannode.ExchangeNode;
import org.apache.pinot.query.planner.plannode.FilterNode;
@@ -43,26 +45,39 @@ import org.apache.pinot.query.planner.plannode.WindowNode;
/**
- * PlanFragmenter is an implementation of {@link PlanNodeVisitor} to fragment a
- * {@link org.apache.pinot.query.planner.SubPlan} into multiple {@link PlanFragment}.
+ * PlanFragmenter is an implementation of {@link PlanNodeVisitor} to fragment a {@link SubPlan} into multiple
+ * {@link PlanFragment}s.
*
* The fragmenting process is as follows:
* 1. Traverse the plan tree in a depth-first manner;
* 2. For each node, if it is a PlanFragment splittable ExchangeNode, split it into {@link MailboxReceiveNode} and
* {@link MailboxSendNode} pair;
- * 3. Assign current PlanFragment Id to {@link MailboxReceiveNode};
- * 4. Increment current PlanFragment Id by one and assign it to the {@link MailboxSendNode}.
+ * 3. Assign current PlanFragment ID to {@link MailboxReceiveNode};
+ * 4. Increment current PlanFragment ID by one and assign it to the {@link MailboxSendNode}.
*/
public class PlanFragmenter implements PlanNodeVisitor<PlanNode, PlanFragmenter.Context> {
- public static final PlanFragmenter INSTANCE = new PlanFragmenter();
+ private final Int2ObjectOpenHashMap<PlanFragment> _planFragmentMap = new Int2ObjectOpenHashMap<>();
+ private final Int2ObjectOpenHashMap<IntList> _childPlanFragmentIdsMap = new Int2ObjectOpenHashMap<>();
+
+ // ROOT PlanFragment ID is 0, current PlanFragment ID starts with 1, next PlanFragment ID starts with 2.
+ private int _nextPlanFragmentId = 2;
+
+ public Context createContext() {
+ // ROOT PlanFragment ID is 0, current PlanFragment ID starts with 1.
+ return new Context(1);
+ }
+
+ public Int2ObjectOpenHashMap<PlanFragment> getPlanFragmentMap() {
+ return _planFragmentMap;
+ }
+
+ public Int2ObjectOpenHashMap<IntList> getChildPlanFragmentIdsMap() {
+ return _childPlanFragmentIdsMap;
+ }
private PlanNode process(PlanNode node, Context context) {
node.setPlanFragmentId(context._currentPlanFragmentId);
- List<PlanNode> inputs = node.getInputs();
- for (int i = 0; i < inputs.size(); i++) {
- context._previousPlanFragmentId = node.getPlanFragmentId();
- inputs.set(i, inputs.get(i).visit(this, context));
- }
+ node.getInputs().replaceAll(planNode -> planNode.visit(this, context));
return node;
}
@@ -126,36 +141,33 @@ public class PlanFragmenter implements PlanNodeVisitor<PlanNode, PlanFragmenter.
if (!isPlanFragmentSplitter(node)) {
return process(node, context);
}
- int currentPlanFragmentId = context._previousPlanFragmentId;
- int nextPlanFragmentId = ++context._currentPlanFragmentId;
- // Set previous PlanFragment ID in the context to be the next PlanFragment ID to be used by the child node.
- context._previousPlanFragmentId = nextPlanFragmentId;
- PlanNode nextPlanFragmentRoot = node.getInputs().get(0).visit(this, context);
+ // Split the ExchangeNode to a MailboxReceiveNode and a MailboxSendNode, where MailboxReceiveNode is the leave node
+ // of the current PlanFragment, and MailboxSendNode is the root node of the next PlanFragment.
+ int receiverPlanFragmentId = context._currentPlanFragmentId;
+ int senderPlanFragmentId = _nextPlanFragmentId++;
+ _childPlanFragmentIdsMap.computeIfAbsent(receiverPlanFragmentId, k -> new IntArrayList())
+ .add(senderPlanFragmentId);
+
+ // Create a new context for the next PlanFragment with MailboxSendNode as the root node.
+ PlanNode nextPlanFragmentRoot = node.getInputs().get(0).visit(this, new Context(senderPlanFragmentId));
PinotRelExchangeType exchangeType = node.getExchangeType();
RelDistribution.Type distributionType = node.getDistributionType();
// NOTE: Only HASH_DISTRIBUTED requires distribution keys
// TODO: Revisit ExchangeNode creation logic to avoid using HASH_DISTRIBUTED with empty distribution keys
List<Integer> distributionKeys =
distributionType == RelDistribution.Type.HASH_DISTRIBUTED ? node.getDistributionKeys() : null;
-
- PlanNode mailboxSender =
- new MailboxSendNode(nextPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), currentPlanFragmentId,
+ MailboxSendNode mailboxSendNode =
+ new MailboxSendNode(senderPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), receiverPlanFragmentId,
distributionType, exchangeType, distributionKeys, node.getCollations(), node.isSortOnSender());
- PlanNode mailboxReceiver =
- new MailboxReceiveNode(currentPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), nextPlanFragmentId,
- distributionType, exchangeType, distributionKeys, node.getCollations(), node.isSortOnSender(),
- node.isSortOnReceiver(), mailboxSender);
- mailboxSender.addInput(nextPlanFragmentRoot);
-
- context._planFragmentIdToRootNodeMap.put(nextPlanFragmentId,
- new PlanFragment(nextPlanFragmentId, mailboxSender, new PlanFragmentMetadata(), new ArrayList<>()));
- if (!context._planFragmentIdToChildrenMap.containsKey(currentPlanFragmentId)) {
- context._planFragmentIdToChildrenMap.put(currentPlanFragmentId, new ArrayList<>());
- }
- context._planFragmentIdToChildrenMap.get(currentPlanFragmentId).add(nextPlanFragmentId);
-
- return mailboxReceiver;
+ mailboxSendNode.addInput(nextPlanFragmentRoot);
+ _planFragmentMap.put(senderPlanFragmentId,
+ new PlanFragment(senderPlanFragmentId, mailboxSendNode, new PlanFragmentMetadata(), new ArrayList<>()));
+
+ // Return the MailboxReceiveNode as the leave node of the current PlanFragment.
+ return new MailboxReceiveNode(receiverPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), senderPlanFragmentId,
+ distributionType, exchangeType, distributionKeys, node.getCollations(), node.isSortOnSender(),
+ node.isSortOnReceiver(), mailboxSendNode);
}
private boolean isPlanFragmentSplitter(PlanNode node) {
@@ -163,12 +175,10 @@ public class PlanFragmenter implements PlanNodeVisitor<PlanNode, PlanFragmenter.
}
public static class Context {
+ private final int _currentPlanFragmentId;
- // PlanFragment ID starts with 1, 0 will be reserved for ROOT PlanFragment.
- Integer _currentPlanFragmentId = 1;
- Integer _previousPlanFragmentId = 1;
- Map<Integer, PlanFragment> _planFragmentIdToRootNodeMap = new HashMap<>();
-
- Map<Integer, List<Integer>> _planFragmentIdToChildrenMap = new HashMap<>();
+ private Context(int currentPlanFragmentId) {
+ _currentPlanFragmentId = currentPlanFragmentId;
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org