You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/01 20:02:22 UTC

(pinot) branch master updated: Refactor PlanFragmenter to make the logic more clear (#11912)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 03a9ec73cd Refactor PlanFragmenter to make the logic more clear (#11912)
03a9ec73cd is described below

commit 03a9ec73cd8faee5d6b0a27f09c78cbbc9e75eb5
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed Nov 1 13:02:17 2023 -0700

    Refactor PlanFragmenter to make the logic more clear (#11912)
---
 .../planner/logical/PinotLogicalQueryPlanner.java  | 50 ++++++------
 .../query/planner/logical/PlanFragmenter.java      | 90 ++++++++++++----------
 2 files changed, 77 insertions(+), 63 deletions(-)

diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
index 4659eb593c..07df7845b8 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
@@ -18,6 +18,10 @@
  */
 package org.apache.pinot.query.planner.logical;
 
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.ints.IntListIterator;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -83,10 +87,13 @@ public class PinotLogicalQueryPlanner {
     for (Map.Entry<Integer, PlanNode> subPlanEntry : subPlanContext._subPlanIdToRootNodeMap.entrySet()) {
       int subPlanId = subPlanEntry.getKey();
       PlanNode subPlanRoot = subPlanEntry.getValue();
-      PlanFragmenter.Context planFragmentContext = new PlanFragmenter.Context();
-      planFragmentContext._planFragmentIdToRootNodeMap.put(1,
-          new PlanFragment(1, subPlanRoot, new PlanFragmentMetadata(), new ArrayList<>()));
-      subPlanRoot = subPlanRoot.visit(PlanFragmenter.INSTANCE, planFragmentContext);
+
+      // Fragment the SubPlan into multiple PlanFragments.
+      PlanFragmenter fragmenter = new PlanFragmenter();
+      PlanFragmenter.Context fragmenterContext = fragmenter.createContext();
+      subPlanRoot = subPlanRoot.visit(fragmenter, fragmenterContext);
+      Int2ObjectOpenHashMap<PlanFragment> planFragmentMap = fragmenter.getPlanFragmentMap();
+      Int2ObjectOpenHashMap<IntList> childPlanFragmentIdsMap = fragmenter.getChildPlanFragmentIdsMap();
 
       // Sub plan root needs to send final results back to the Broker
       // TODO: Should be SINGLETON (currently SINGLETON has to be local, so use BROADCAST_DISTRIBUTED instead)
@@ -95,27 +102,24 @@ public class PinotLogicalQueryPlanner {
               RelDistribution.Type.BROADCAST_DISTRIBUTED, PinotRelExchangeType.getDefaultExchangeType(), null, null,
               false);
       subPlanRootSenderNode.addInput(subPlanRoot);
-      subPlanRoot = new MailboxReceiveNode(0, subPlanRoot.getDataSchema(), subPlanRoot.getPlanFragmentId(),
-          RelDistribution.Type.BROADCAST_DISTRIBUTED, PinotRelExchangeType.getDefaultExchangeType(), null, null, false,
-          false, subPlanRootSenderNode);
-      PlanFragment planFragment1 = planFragmentContext._planFragmentIdToRootNodeMap.get(1);
-      planFragmentContext._planFragmentIdToRootNodeMap.put(1,
-          new PlanFragment(1, subPlanRootSenderNode, planFragment1.getFragmentMetadata(), planFragment1.getChildren()));
-      PlanFragment rootPlanFragment =
-          new PlanFragment(subPlanRoot.getPlanFragmentId(), subPlanRoot, new PlanFragmentMetadata(),
-              Collections.singletonList(planFragmentContext._planFragmentIdToRootNodeMap.get(1)));
-      planFragmentContext._planFragmentIdToRootNodeMap.put(0, rootPlanFragment);
-      for (Map.Entry<Integer, List<Integer>> planFragmentToChildrenEntry
-          : planFragmentContext._planFragmentIdToChildrenMap.entrySet()) {
-        int planFragmentId = planFragmentToChildrenEntry.getKey();
-        List<Integer> planFragmentChildren = planFragmentToChildrenEntry.getValue();
-        for (int planFragmentChild : planFragmentChildren) {
-          planFragmentContext._planFragmentIdToRootNodeMap.get(planFragmentId).getChildren()
-              .add(planFragmentContext._planFragmentIdToRootNodeMap.get(planFragmentChild));
+      PlanFragment planFragment1 =
+          new PlanFragment(1, subPlanRootSenderNode, new PlanFragmentMetadata(), new ArrayList<>());
+      planFragmentMap.put(1, planFragment1);
+      for (Int2ObjectMap.Entry<IntList> entry : childPlanFragmentIdsMap.int2ObjectEntrySet()) {
+        PlanFragment planFragment = planFragmentMap.get(entry.getIntKey());
+        List<PlanFragment> childPlanFragments = planFragment.getChildren();
+        IntListIterator childPlanFragmentIdIterator = entry.getValue().iterator();
+        while (childPlanFragmentIdIterator.hasNext()) {
+          childPlanFragments.add(planFragmentMap.get(childPlanFragmentIdIterator.nextInt()));
         }
       }
-      SubPlan subPlan = new SubPlan(planFragmentContext._planFragmentIdToRootNodeMap.get(0),
-          subPlanContext._subPlanIdToMetadataMap.get(0), new ArrayList<>());
+      MailboxReceiveNode rootReceiveNode =
+          new MailboxReceiveNode(0, subPlanRoot.getDataSchema(), subPlanRoot.getPlanFragmentId(),
+              RelDistribution.Type.BROADCAST_DISTRIBUTED, PinotRelExchangeType.getDefaultExchangeType(), null, null,
+              false, false, subPlanRootSenderNode);
+      PlanFragment rootPlanFragment =
+          new PlanFragment(0, rootReceiveNode, new PlanFragmentMetadata(), Collections.singletonList(planFragment1));
+      SubPlan subPlan = new SubPlan(rootPlanFragment, subPlanContext._subPlanIdToMetadataMap.get(0), new ArrayList<>());
       subPlanMap.put(subPlanId, subPlan);
     }
     for (Map.Entry<Integer, List<Integer>> subPlanToChildrenEntry : subPlanContext._subPlanIdToChildrenMap.entrySet()) {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java
index fdb5858fdd..9d7e5a2fdd 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java
@@ -18,14 +18,16 @@
  */
 package org.apache.pinot.query.planner.logical;
 
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.logical.PinotRelExchangeType;
 import org.apache.pinot.query.planner.PlanFragment;
 import org.apache.pinot.query.planner.PlanFragmentMetadata;
+import org.apache.pinot.query.planner.SubPlan;
 import org.apache.pinot.query.planner.plannode.AggregateNode;
 import org.apache.pinot.query.planner.plannode.ExchangeNode;
 import org.apache.pinot.query.planner.plannode.FilterNode;
@@ -43,26 +45,39 @@ import org.apache.pinot.query.planner.plannode.WindowNode;
 
 
 /**
- * PlanFragmenter is an implementation of {@link PlanNodeVisitor} to fragment a
- * {@link org.apache.pinot.query.planner.SubPlan} into multiple {@link PlanFragment}.
+ * PlanFragmenter is an implementation of {@link PlanNodeVisitor} to fragment a {@link SubPlan} into multiple
+ * {@link PlanFragment}s.
  *
  * The fragmenting process is as follows:
  * 1. Traverse the plan tree in a depth-first manner;
  * 2. For each node, if it is a PlanFragment splittable ExchangeNode, split it into {@link MailboxReceiveNode} and
  * {@link MailboxSendNode} pair;
- * 3. Assign current PlanFragment Id to {@link MailboxReceiveNode};
- * 4. Increment current PlanFragment Id by one and assign it to the {@link MailboxSendNode}.
+ * 3. Assign current PlanFragment ID to {@link MailboxReceiveNode};
+ * 4. Increment current PlanFragment ID by one and assign it to the {@link MailboxSendNode}.
  */
 public class PlanFragmenter implements PlanNodeVisitor<PlanNode, PlanFragmenter.Context> {
-  public static final PlanFragmenter INSTANCE = new PlanFragmenter();
+  private final Int2ObjectOpenHashMap<PlanFragment> _planFragmentMap = new Int2ObjectOpenHashMap<>();
+  private final Int2ObjectOpenHashMap<IntList> _childPlanFragmentIdsMap = new Int2ObjectOpenHashMap<>();
+
+  // ROOT PlanFragment ID is 0, current PlanFragment ID starts with 1, next PlanFragment ID starts with 2.
+  private int _nextPlanFragmentId = 2;
+
+  public Context createContext() {
+    // ROOT PlanFragment ID is 0, current PlanFragment ID starts with 1.
+    return new Context(1);
+  }
+
+  public Int2ObjectOpenHashMap<PlanFragment> getPlanFragmentMap() {
+    return _planFragmentMap;
+  }
+
+  public Int2ObjectOpenHashMap<IntList> getChildPlanFragmentIdsMap() {
+    return _childPlanFragmentIdsMap;
+  }
 
   private PlanNode process(PlanNode node, Context context) {
     node.setPlanFragmentId(context._currentPlanFragmentId);
-    List<PlanNode> inputs = node.getInputs();
-    for (int i = 0; i < inputs.size(); i++) {
-      context._previousPlanFragmentId = node.getPlanFragmentId();
-      inputs.set(i, inputs.get(i).visit(this, context));
-    }
+    node.getInputs().replaceAll(planNode -> planNode.visit(this, context));
     return node;
   }
 
@@ -126,36 +141,33 @@ public class PlanFragmenter implements PlanNodeVisitor<PlanNode, PlanFragmenter.
     if (!isPlanFragmentSplitter(node)) {
       return process(node, context);
     }
-    int currentPlanFragmentId = context._previousPlanFragmentId;
-    int nextPlanFragmentId = ++context._currentPlanFragmentId;
-    // Set previous PlanFragment ID in the context to be the next PlanFragment ID to be used by the child node.
-    context._previousPlanFragmentId = nextPlanFragmentId;
-    PlanNode nextPlanFragmentRoot = node.getInputs().get(0).visit(this, context);
 
+    // Split the ExchangeNode to a MailboxReceiveNode and a MailboxSendNode, where MailboxReceiveNode is the leave node
+    // of the current PlanFragment, and MailboxSendNode is the root node of the next PlanFragment.
+    int receiverPlanFragmentId = context._currentPlanFragmentId;
+    int senderPlanFragmentId = _nextPlanFragmentId++;
+    _childPlanFragmentIdsMap.computeIfAbsent(receiverPlanFragmentId, k -> new IntArrayList())
+        .add(senderPlanFragmentId);
+
+    // Create a new context for the next PlanFragment with MailboxSendNode as the root node.
+    PlanNode nextPlanFragmentRoot = node.getInputs().get(0).visit(this, new Context(senderPlanFragmentId));
     PinotRelExchangeType exchangeType = node.getExchangeType();
     RelDistribution.Type distributionType = node.getDistributionType();
     // NOTE: Only HASH_DISTRIBUTED requires distribution keys
     // TODO: Revisit ExchangeNode creation logic to avoid using HASH_DISTRIBUTED with empty distribution keys
     List<Integer> distributionKeys =
         distributionType == RelDistribution.Type.HASH_DISTRIBUTED ? node.getDistributionKeys() : null;
-
-    PlanNode mailboxSender =
-        new MailboxSendNode(nextPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), currentPlanFragmentId,
+    MailboxSendNode mailboxSendNode =
+        new MailboxSendNode(senderPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), receiverPlanFragmentId,
             distributionType, exchangeType, distributionKeys, node.getCollations(), node.isSortOnSender());
-    PlanNode mailboxReceiver =
-        new MailboxReceiveNode(currentPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), nextPlanFragmentId,
-            distributionType, exchangeType, distributionKeys, node.getCollations(), node.isSortOnSender(),
-            node.isSortOnReceiver(), mailboxSender);
-    mailboxSender.addInput(nextPlanFragmentRoot);
-
-    context._planFragmentIdToRootNodeMap.put(nextPlanFragmentId,
-        new PlanFragment(nextPlanFragmentId, mailboxSender, new PlanFragmentMetadata(), new ArrayList<>()));
-    if (!context._planFragmentIdToChildrenMap.containsKey(currentPlanFragmentId)) {
-      context._planFragmentIdToChildrenMap.put(currentPlanFragmentId, new ArrayList<>());
-    }
-    context._planFragmentIdToChildrenMap.get(currentPlanFragmentId).add(nextPlanFragmentId);
-
-    return mailboxReceiver;
+    mailboxSendNode.addInput(nextPlanFragmentRoot);
+    _planFragmentMap.put(senderPlanFragmentId,
+        new PlanFragment(senderPlanFragmentId, mailboxSendNode, new PlanFragmentMetadata(), new ArrayList<>()));
+
+    // Return the MailboxReceiveNode as the leave node of the current PlanFragment.
+    return new MailboxReceiveNode(receiverPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), senderPlanFragmentId,
+        distributionType, exchangeType, distributionKeys, node.getCollations(), node.isSortOnSender(),
+        node.isSortOnReceiver(), mailboxSendNode);
   }
 
   private boolean isPlanFragmentSplitter(PlanNode node) {
@@ -163,12 +175,10 @@ public class PlanFragmenter implements PlanNodeVisitor<PlanNode, PlanFragmenter.
   }
 
   public static class Context {
+    private final int _currentPlanFragmentId;
 
-    // PlanFragment ID starts with 1, 0 will be reserved for ROOT PlanFragment.
-    Integer _currentPlanFragmentId = 1;
-    Integer _previousPlanFragmentId = 1;
-    Map<Integer, PlanFragment> _planFragmentIdToRootNodeMap = new HashMap<>();
-
-    Map<Integer, List<Integer>> _planFragmentIdToChildrenMap = new HashMap<>();
+    private Context(int currentPlanFragmentId) {
+      _currentPlanFragmentId = currentPlanFragmentId;
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org