You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2023/05/07 06:58:04 UTC

[pinot] branch master updated: Refactor all StageNodes to PlanNodes during query planning phase (#10735)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c3dd06a9b Refactor all StageNodes to PlanNodes during query planning phase (#10735)
7c3dd06a9b is described below

commit 7c3dd06a9b0966551b84512af97259c6c5c35537
Author: Xiang Fu <xi...@gmail.com>
AuthorDate: Sat May 6 23:57:58 2023 -0700

    Refactor all StageNodes to PlanNodes during query planning phase (#10735)
---
 .../PinotAggregateExchangeNodeInsertRule.java      |  7 +-
 ...ageVisitor.java => ExplainPlanPlanVisitor.java} | 50 +++++++-------
 .../apache/pinot/query/planner/PlannerUtils.java   |  8 +--
 .../org/apache/pinot/query/planner/QueryPlan.java  | 40 ++++++------
 .../query/planner/logical/RelToStageConverter.java | 46 ++++++-------
 .../planner/logical/ShuffleRewriteVisitor.java     | 38 +++++------
 .../query/planner/logical/StageFragmenter.java     | 76 +++++++++++-----------
 .../pinot/query/planner/logical/StagePlanner.java  | 27 ++++----
 .../planner/physical/DispatchablePlanContext.java  |  6 +-
 .../planner/physical/DispatchablePlanMetadata.java |  9 +--
 .../planner/physical/DispatchablePlanVisitor.java  | 57 ++++++++--------
 .../planner/physical/MailboxAssignmentVisitor.java | 29 +++++----
 .../colocated/GreedyShuffleRewriteContext.java     | 76 +++++++++++-----------
 .../GreedyShuffleRewritePreComputeVisitor.java     | 38 +++++------
 .../colocated/GreedyShuffleRewriteVisitor.java     | 70 ++++++++++----------
 .../AbstractPlanNode.java}                         | 30 ++++-----
 .../planner/{stage => plannode}/AggregateNode.java | 15 +++--
 .../DefaultPostOrderTraversalVisitor.java          |  6 +-
 .../planner/{stage => plannode}/ExchangeNode.java  | 10 +--
 .../planner/{stage => plannode}/FilterNode.java    | 10 +--
 .../planner/{stage => plannode}/JoinNode.java      | 14 ++--
 .../{stage => plannode}/MailboxReceiveNode.java    | 20 +++---
 .../{stage => plannode}/MailboxSendNode.java       | 14 ++--
 .../StageNode.java => plannode/PlanNode.java}      | 20 +++---
 .../PlanNodeVisitor.java}                          | 15 +++--
 .../planner/{stage => plannode}/ProjectNode.java   | 10 +--
 .../planner/{stage => plannode}/SetOpNode.java     | 14 ++--
 .../planner/{stage => plannode}/SortNode.java      | 15 +++--
 .../{stage => plannode}/StageNodeSerDeUtils.java   | 52 +++++++--------
 .../planner/{stage => plannode}/TableScanNode.java | 14 ++--
 .../planner/{stage => plannode}/ValueNode.java     | 10 +--
 .../planner/{stage => plannode}/WindowNode.java    | 15 +++--
 .../pinot/query/planner/serde/ProtoProperties.java |  3 +-
 ...tageMetadata.java => PlanFragmentMetadata.java} | 14 ++--
 .../apache/pinot/query/routing/WorkerManager.java  |  6 +-
 .../apache/pinot/query/routing/WorkerMetadata.java |  4 +-
 .../apache/pinot/query/QueryCompilationTest.java   | 68 ++++++++++---------
 .../{stage => plannode}/SerDeUtilsTest.java        | 16 ++---
 .../apache/pinot/query/runtime/QueryRunner.java    | 20 +++---
 .../query/runtime/operator/HashJoinOperator.java   |  2 +-
 .../runtime/operator/WindowAggregateOperator.java  |  2 +-
 .../runtime/operator/utils/OperatorUtils.java      |  8 +--
 .../query/runtime/plan/DistributedStagePlan.java   | 28 ++++----
 .../runtime/plan/OpChainExecutionContext.java      | 12 ++--
 .../query/runtime/plan/PhysicalPlanVisitor.java    | 42 ++++++------
 .../query/runtime/plan/PlanRequestContext.java     | 12 ++--
 .../runtime/plan/ServerRequestPlanVisitor.java     | 36 +++++-----
 .../runtime/plan/serde/QueryPlanSerDeUtils.java    | 18 ++---
 .../plan/server/ServerPlanRequestContext.java      |  6 +-
 .../query/service/dispatch/QueryDispatcher.java    | 12 ++--
 .../pinot/query/runtime/QueryRunnerTest.java       |  2 +-
 .../pinot/query/runtime/QueryRunnerTestBase.java   |  2 +-
 .../runtime/operator/HashJoinOperatorTest.java     |  2 +-
 .../operator/MailboxReceiveOperatorTest.java       | 34 +++++-----
 .../runtime/operator/MailboxSendOperatorTest.java  |  6 +-
 .../pinot/query/runtime/operator/OpChainTest.java  | 14 ++--
 .../operator/SortedMailboxReceiveOperatorTest.java | 38 +++++------
 .../operator/WindowAggregateOperatorTest.java      |  2 +-
 .../pinot/query/service/QueryServerTest.java       | 33 +++++-----
 .../service/dispatch/QueryDispatcherTest.java      |  2 +-
 60 files changed, 663 insertions(+), 642 deletions(-)

diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
index 99736870bf..6af2c983fb 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
@@ -51,7 +51,7 @@ import org.apache.calcite.tools.RelBuilder;
 import org.apache.calcite.tools.RelBuilderFactory;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ImmutableIntList;
-import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
 
 
 /**
@@ -95,8 +95,9 @@ public class PinotAggregateExchangeNodeInsertRule extends RelOptRule {
   }
 
   /**
-   * Split the AGG into 2 stages, both with the same AGG type,
-   * Pinot internal stage optimization can use the info of the input data type to infer whether it should generate
+   * Split the AGG into 2 plan fragments, both with the same AGG type,
+   * Pinot internal plan fragment optimization can use the info of the input data type to infer whether it should
+   * generate
    * the "intermediate-stage AGG operator" or a "leaf-stage AGG operator"
    * @see org.apache.pinot.core.query.aggregation.function.AggregationFunction
    *
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanPlanVisitor.java
similarity index 84%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanPlanVisitor.java
index e639701256..7b5e935010 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanPlanVisitor.java
@@ -23,20 +23,20 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.pinot.query.planner.physical.DispatchablePlanMetadata;
-import org.apache.pinot.query.planner.stage.AggregateNode;
-import org.apache.pinot.query.planner.stage.ExchangeNode;
-import org.apache.pinot.query.planner.stage.FilterNode;
-import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.ProjectNode;
-import org.apache.pinot.query.planner.stage.SetOpNode;
-import org.apache.pinot.query.planner.stage.SortNode;
-import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.StageNodeVisitor;
-import org.apache.pinot.query.planner.stage.TableScanNode;
-import org.apache.pinot.query.planner.stage.ValueNode;
-import org.apache.pinot.query.planner.stage.WindowNode;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.SortNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
 import org.apache.pinot.query.routing.QueryServerInstance;
 
 
@@ -46,7 +46,7 @@ import org.apache.pinot.query.routing.QueryServerInstance;
  * <p>It is currently not used programmatically and cannot be accessed by the user. Instead,
  * it is intended for use in manual debugging (e.g. setting breakpoints and calling QueryPlan#explain()).
  */
-public class ExplainPlanStageVisitor implements StageNodeVisitor<StringBuilder, ExplainPlanStageVisitor.Context> {
+public class ExplainPlanPlanVisitor implements PlanNodeVisitor<StringBuilder, ExplainPlanPlanVisitor.Context> {
 
   private final QueryPlan _queryPlan;
 
@@ -80,23 +80,23 @@ public class ExplainPlanStageVisitor implements StageNodeVisitor<StringBuilder,
    *
    * @return a query plan associated with
    */
-  public static String explainFrom(QueryPlan queryPlan, StageNode node, QueryServerInstance rootServer) {
-    final ExplainPlanStageVisitor visitor = new ExplainPlanStageVisitor(queryPlan);
+  public static String explainFrom(QueryPlan queryPlan, PlanNode node, QueryServerInstance rootServer) {
+    final ExplainPlanPlanVisitor visitor = new ExplainPlanPlanVisitor(queryPlan);
     return node
         .visit(visitor, new Context(rootServer, 0, "", "", new StringBuilder()))
         .toString();
   }
 
-  private ExplainPlanStageVisitor(QueryPlan queryPlan) {
+  private ExplainPlanPlanVisitor(QueryPlan queryPlan) {
     _queryPlan = queryPlan;
   }
 
-  private StringBuilder appendInfo(StageNode node, Context context) {
-    int stage = node.getStageId();
+  private StringBuilder appendInfo(PlanNode node, Context context) {
+    int planFragmentId = node.getPlanFragmentId();
     context._builder
         .append(context._prefix)
         .append('[')
-        .append(stage)
+        .append(planFragmentId)
         .append("]@")
         .append(context._host.getHostname())
         .append(':')
@@ -106,7 +106,7 @@ public class ExplainPlanStageVisitor implements StageNodeVisitor<StringBuilder,
     return context._builder;
   }
 
-  private StringBuilder visitSimpleNode(StageNode node, Context context) {
+  private StringBuilder visitSimpleNode(PlanNode node, Context context) {
     appendInfo(node, context).append('\n');
     return node.getInputs().get(0).visit(this, context.next(false, context._host, context._workerId));
   }
@@ -124,7 +124,7 @@ public class ExplainPlanStageVisitor implements StageNodeVisitor<StringBuilder,
   @Override
   public StringBuilder visitSetOp(SetOpNode setOpNode, Context context) {
     appendInfo(setOpNode, context).append('\n');
-    for (StageNode input : setOpNode.getInputs()) {
+    for (PlanNode input : setOpNode.getInputs()) {
       input.visit(this, context.next(false, context._host, context._workerId));
     }
     return context._builder;
@@ -195,7 +195,7 @@ public class ExplainPlanStageVisitor implements StageNodeVisitor<StringBuilder,
         .getServerInstanceToWorkerIdMap();
     context._builder.append("->");
     String receivers = servers.entrySet().stream()
-        .map(ExplainPlanStageVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
+        .map(ExplainPlanPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
         .map(s -> "[" + receiverStageId + "]@" + s)
         .collect(Collectors.joining(",", "{", "}"));
     return context._builder.append(receivers);
@@ -216,7 +216,7 @@ public class ExplainPlanStageVisitor implements StageNodeVisitor<StringBuilder,
     return appendInfo(node, context)
         .append(' ')
         .append(_queryPlan.getDispatchablePlanMetadataMap()
-            .get(node.getStageId())
+            .get(node.getPlanFragmentId())
             .getWorkerIdToSegmentsMap()
             .get(context._host))
         .append('\n');
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlannerUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlannerUtils.java
index c3ce9fc116..ad92eab6e8 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlannerUtils.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlannerUtils.java
@@ -36,12 +36,12 @@ public class PlannerUtils {
     // do not instantiate.
   }
 
-  public static boolean isRootStage(int stageId) {
-    return stageId == 0;
+  public static boolean isRootPlanFragment(int planFragmentId) {
+    return planFragmentId == 0;
   }
 
-  public static boolean isFinalStage(int stageId) {
-    return stageId == 1;
+  public static boolean isFinalPlanFragment(int planFragmentId) {
+    return planFragmentId == 1;
   }
 
   public static String explainPlan(RelNode relRoot, SqlExplainFormat format, SqlExplainLevel explainLevel) {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
index e21e1923ec..25f410cf84 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
@@ -23,10 +23,10 @@ import java.util.List;
 import java.util.Map;
 import org.apache.calcite.util.Pair;
 import org.apache.pinot.query.planner.physical.DispatchablePlanMetadata;
-import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
 import org.apache.pinot.query.routing.MailboxMetadata;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
 import org.apache.pinot.query.routing.QueryServerInstance;
-import org.apache.pinot.query.routing.StageMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
 
@@ -45,32 +45,32 @@ import org.apache.pinot.query.routing.WorkerMetadata;
  */
 public class QueryPlan {
   private final List<Pair<Integer, String>> _queryResultFields;
-  private final Map<Integer, StageNode> _queryStageMap;
-  private final List<StageMetadata> _stageMetadataList;
+  private final Map<Integer, PlanNode> _queryStageMap;
+  private final List<PlanFragmentMetadata> _planFragmentMetadataList;
   private final Map<Integer, DispatchablePlanMetadata> _dispatchablePlanMetadataMap;
 
-  public QueryPlan(List<Pair<Integer, String>> fields, Map<Integer, StageNode> queryStageMap,
+  public QueryPlan(List<Pair<Integer, String>> fields, Map<Integer, PlanNode> queryStageMap,
       Map<Integer, DispatchablePlanMetadata> dispatchablePlanMetadataMap) {
     _queryResultFields = fields;
     _queryStageMap = queryStageMap;
     _dispatchablePlanMetadataMap = dispatchablePlanMetadataMap;
-    _stageMetadataList = constructStageMetadataList(_dispatchablePlanMetadataMap);
+    _planFragmentMetadataList = constructStageMetadataList(_dispatchablePlanMetadataMap);
   }
 
   /**
    * Get the map between stageID and the stage plan root node.
    * @return stage plan map.
    */
-  public Map<Integer, StageNode> getQueryStageMap() {
+  public Map<Integer, PlanNode> getQueryStageMap() {
     return _queryStageMap;
   }
 
   /**
-   * Get the stage metadata information based on stageId.
+   * Get the stage metadata information based on planFragmentId.
    * @return stage metadata info.
    */
-  public StageMetadata getStageMetadata(int stageId) {
-    return _stageMetadataList.get(stageId);
+  public PlanFragmentMetadata getStageMetadata(int planFragmentId) {
+    return _planFragmentMetadataList.get(planFragmentId);
   }
 
   /**
@@ -93,21 +93,21 @@ public class QueryPlan {
    * Explains the {@code QueryPlan}
    *
    * @return a human-readable tree explaining the query plan
-   * @see ExplainPlanStageVisitor#explain(QueryPlan)
+   * @see ExplainPlanPlanVisitor#explain(QueryPlan)
    * @apiNote this is <b>NOT</b> identical to the SQL {@code EXPLAIN PLAN FOR} functionality
    *          and is instead intended to be used by developers debugging during feature
    *          development
    */
   public String explain() {
-    return ExplainPlanStageVisitor.explain(this);
+    return ExplainPlanPlanVisitor.explain(this);
   }
 
   /**
    * Convert the {@link DispatchablePlanMetadata} into dispatchable info for each stage/worker.
    */
-  private static List<StageMetadata> constructStageMetadataList(
+  private static List<PlanFragmentMetadata> constructStageMetadataList(
       Map<Integer, DispatchablePlanMetadata> dispatchablePlanMetadataMap) {
-    StageMetadata[] stageMetadataList = new StageMetadata[dispatchablePlanMetadataMap.size()];
+    PlanFragmentMetadata[] planFragmentMetadataList = new PlanFragmentMetadata[dispatchablePlanMetadataMap.size()];
     for (Map.Entry<Integer, DispatchablePlanMetadata> dispatchableEntry : dispatchablePlanMetadataMap.entrySet()) {
       DispatchablePlanMetadata dispatchablePlanMetadata = dispatchableEntry.getValue();
 
@@ -119,9 +119,9 @@ public class QueryPlan {
           VirtualServerAddress virtualServerAddress = new VirtualServerAddress(queryServerEntry.getKey(), workerId);
           WorkerMetadata.Builder builder = new WorkerMetadata.Builder();
           builder.setVirtualServerAddress(virtualServerAddress);
-          Map<Integer, MailboxMetadata> stageToMailboxMetadata =
+          Map<Integer, MailboxMetadata> planFragmentToMailboxMetadata =
               dispatchablePlanMetadata.getWorkerIdToMailBoxIdsMap().get(workerId);
-          builder.putAllMailBoxInfosMap(stageToMailboxMetadata);
+          builder.putAllMailBoxInfosMap(planFragmentToMailboxMetadata);
           if (dispatchablePlanMetadata.getScannedTables().size() == 1) {
             builder.addTableSegmentsMap(dispatchablePlanMetadata.getWorkerIdToSegmentsMap().get(workerId));
           }
@@ -130,8 +130,8 @@ public class QueryPlan {
       }
 
       // construct the stageMetadata
-      int stageId = dispatchableEntry.getKey();
-      StageMetadata.Builder builder = new StageMetadata.Builder();
+      int planFragmentId = dispatchableEntry.getKey();
+      PlanFragmentMetadata.Builder builder = new PlanFragmentMetadata.Builder();
       builder.setWorkerMetadataList(Arrays.asList(workerMetadataList));
       if (dispatchablePlanMetadata.getScannedTables().size() == 1) {
         builder.addTableName(dispatchablePlanMetadata.getScannedTables().get(0));
@@ -139,8 +139,8 @@ public class QueryPlan {
       if (dispatchablePlanMetadata.getTimeBoundaryInfo() != null) {
         builder.addTimeBoundaryInfo(dispatchablePlanMetadata.getTimeBoundaryInfo());
       }
-      stageMetadataList[stageId] = builder.build();
+      planFragmentMetadataList[planFragmentId] = builder.build();
     }
-    return Arrays.asList(stageMetadataList);
+    return Arrays.asList(planFragmentMetadataList);
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
index ea0c46b390..3d353b2514 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
@@ -44,22 +44,22 @@ import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.PinotDataType;
 import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
-import org.apache.pinot.query.planner.stage.AggregateNode;
-import org.apache.pinot.query.planner.stage.ExchangeNode;
-import org.apache.pinot.query.planner.stage.FilterNode;
-import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.planner.stage.ProjectNode;
-import org.apache.pinot.query.planner.stage.SetOpNode;
-import org.apache.pinot.query.planner.stage.SortNode;
-import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.TableScanNode;
-import org.apache.pinot.query.planner.stage.ValueNode;
-import org.apache.pinot.query.planner.stage.WindowNode;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.SortNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
 import org.apache.pinot.spi.data.FieldSpec;
 
 
 /**
- * The {@code StageNodeConverter} converts a logical {@link RelNode} to a {@link StageNode}.
+ * The {@code StageNodeConverter} converts a logical {@link RelNode} to a {@link PlanNode}.
  */
 public final class RelToStageConverter {
 
@@ -75,7 +75,7 @@ public final class RelToStageConverter {
    * @param node relational node
    * @return stage node.
    */
-  public static StageNode toStageNode(RelNode node, int currentStageId) {
+  public static PlanNode toStageNode(RelNode node, int currentStageId) {
     if (node instanceof LogicalTableScan) {
       return convertLogicalTableScan((LogicalTableScan) node, currentStageId);
     } else if (node instanceof LogicalJoin) {
@@ -101,7 +101,7 @@ public final class RelToStageConverter {
     }
   }
 
-  private static StageNode convertLogicalExchange(Exchange node, int currentStageId) {
+  private static PlanNode convertLogicalExchange(Exchange node, int currentStageId) {
     RelCollation collation = null;
     boolean isSortOnSender = false;
     boolean isSortOnReceiver = false;
@@ -118,47 +118,47 @@ public final class RelToStageConverter {
         isSortOnSender, isSortOnReceiver);
   }
 
-  private static StageNode convertLogicalSetOp(SetOp node, int currentStageId) {
+  private static PlanNode convertLogicalSetOp(SetOp node, int currentStageId) {
     return new SetOpNode(SetOpNode.SetOpType.fromObject(node), currentStageId, toDataSchema(node.getRowType()),
         node.all);
   }
 
-  private static StageNode convertLogicalValues(LogicalValues node, int currentStageId) {
+  private static PlanNode convertLogicalValues(LogicalValues node, int currentStageId) {
     return new ValueNode(currentStageId, toDataSchema(node.getRowType()), node.tuples);
   }
 
-  private static StageNode convertLogicalWindow(LogicalWindow node, int currentStageId) {
+  private static PlanNode convertLogicalWindow(LogicalWindow node, int currentStageId) {
     return new WindowNode(currentStageId, node.groups, node.constants, toDataSchema(node.getRowType()));
   }
 
-  private static StageNode convertLogicalSort(LogicalSort node, int currentStageId) {
+  private static PlanNode convertLogicalSort(LogicalSort node, int currentStageId) {
     int fetch = RexExpressionUtils.getValueAsInt(node.fetch);
     int offset = RexExpressionUtils.getValueAsInt(node.offset);
     return new SortNode(currentStageId, node.getCollation().getFieldCollations(), fetch, offset,
         toDataSchema(node.getRowType()));
   }
 
-  private static StageNode convertLogicalAggregate(LogicalAggregate node, int currentStageId) {
+  private static PlanNode convertLogicalAggregate(LogicalAggregate node, int currentStageId) {
     return new AggregateNode(currentStageId, toDataSchema(node.getRowType()), node.getAggCallList(),
         RexExpression.toRexInputRefs(node.getGroupSet()), node.getHints());
   }
 
-  private static StageNode convertLogicalProject(LogicalProject node, int currentStageId) {
+  private static PlanNode convertLogicalProject(LogicalProject node, int currentStageId) {
     return new ProjectNode(currentStageId, toDataSchema(node.getRowType()), node.getProjects());
   }
 
-  private static StageNode convertLogicalFilter(LogicalFilter node, int currentStageId) {
+  private static PlanNode convertLogicalFilter(LogicalFilter node, int currentStageId) {
     return new FilterNode(currentStageId, toDataSchema(node.getRowType()), node.getCondition());
   }
 
-  private static StageNode convertLogicalTableScan(LogicalTableScan node, int currentStageId) {
+  private static PlanNode convertLogicalTableScan(LogicalTableScan node, int currentStageId) {
     String tableName = node.getTable().getQualifiedName().get(0);
     List<String> columnNames =
         node.getRowType().getFieldList().stream().map(RelDataTypeField::getName).collect(Collectors.toList());
     return new TableScanNode(currentStageId, toDataSchema(node.getRowType()), tableName, columnNames);
   }
 
-  private static StageNode convertLogicalJoin(LogicalJoin node, int currentStageId) {
+  private static PlanNode convertLogicalJoin(LogicalJoin node, int currentStageId) {
     JoinRelType joinType = node.getJoinType();
 
     // Parse out all equality JOIN conditions
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
index 664ddd44ff..54d03da712 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
@@ -26,20 +26,20 @@ import java.util.Set;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
-import org.apache.pinot.query.planner.stage.AggregateNode;
-import org.apache.pinot.query.planner.stage.ExchangeNode;
-import org.apache.pinot.query.planner.stage.FilterNode;
-import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.ProjectNode;
-import org.apache.pinot.query.planner.stage.SetOpNode;
-import org.apache.pinot.query.planner.stage.SortNode;
-import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.StageNodeVisitor;
-import org.apache.pinot.query.planner.stage.TableScanNode;
-import org.apache.pinot.query.planner.stage.ValueNode;
-import org.apache.pinot.query.planner.stage.WindowNode;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.SortNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
 
 
 /**
@@ -48,10 +48,10 @@ import org.apache.pinot.query.planner.stage.WindowNode;
  * a single host. It gathers the information recursively by checking which partitioned
  * data is selected by each node in the tree.
  *
- * <p>The only method that should be used externally is {@link #optimizeShuffles(StageNode)},
- * other public methods are used only by {@link StageNode#visit(StageNodeVisitor, Object)}.
+ * <p>The only method that should be used externally is {@link #optimizeShuffles(PlanNode)},
+ * other public methods are used only by {@link PlanNode#visit(PlanNodeVisitor, Object)}.
  */
-public class ShuffleRewriteVisitor implements StageNodeVisitor<Set<Integer>, Void> {
+public class ShuffleRewriteVisitor implements PlanNodeVisitor<Set<Integer>, Void> {
 
   /**
    * This method rewrites {@code root} <b>in place</b>, removing any unnecessary shuffles
@@ -59,12 +59,12 @@ public class ShuffleRewriteVisitor implements StageNodeVisitor<Set<Integer>, Voi
    *
    * @param root the root node of the tree to rewrite
    */
-  public static void optimizeShuffles(StageNode root) {
+  public static void optimizeShuffles(PlanNode root) {
     root.visit(new ShuffleRewriteVisitor(), null);
   }
 
   /**
-   * Access to this class should only be used via {@link #optimizeShuffles(StageNode)}
+   * Access to this class should only be used via {@link #optimizeShuffles(PlanNode)}
    */
   private ShuffleRewriteVisitor() {
   }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageFragmenter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageFragmenter.java
index e33638d824..d04443f04f 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageFragmenter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageFragmenter.java
@@ -22,96 +22,96 @@ import java.util.List;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
-import org.apache.pinot.query.planner.stage.AggregateNode;
-import org.apache.pinot.query.planner.stage.ExchangeNode;
-import org.apache.pinot.query.planner.stage.FilterNode;
-import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.ProjectNode;
-import org.apache.pinot.query.planner.stage.SetOpNode;
-import org.apache.pinot.query.planner.stage.SortNode;
-import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.StageNodeVisitor;
-import org.apache.pinot.query.planner.stage.TableScanNode;
-import org.apache.pinot.query.planner.stage.ValueNode;
-import org.apache.pinot.query.planner.stage.WindowNode;
-
-
-public class StageFragmenter implements StageNodeVisitor<StageNode, StageFragmenter.Context> {
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.SortNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
+
+
+public class StageFragmenter implements PlanNodeVisitor<PlanNode, StageFragmenter.Context> {
   public static final StageFragmenter INSTANCE = new StageFragmenter();
 
-  private StageNode process(StageNode node, Context context) {
-    node.setStageId(context._currentStageId);
-    List<StageNode> inputs = node.getInputs();
+  private PlanNode process(PlanNode node, Context context) {
+    node.setPlanFragmentId(context._currentStageId);
+    List<PlanNode> inputs = node.getInputs();
     for (int i = 0; i < inputs.size(); i++) {
-      context._previousStageId = node.getStageId();
+      context._previousStageId = node.getPlanFragmentId();
       inputs.set(i, inputs.get(i).visit(this, context));
     }
     return node;
   }
 
   @Override
-  public StageNode visitAggregate(AggregateNode node, Context context) {
+  public PlanNode visitAggregate(AggregateNode node, Context context) {
     return process(node, context);
   }
 
   @Override
-  public StageNode visitFilter(FilterNode node, Context context) {
+  public PlanNode visitFilter(FilterNode node, Context context) {
     return process(node, context);
   }
 
   @Override
-  public StageNode visitJoin(JoinNode node, Context context) {
+  public PlanNode visitJoin(JoinNode node, Context context) {
     return process(node, context);
   }
 
   @Override
-  public StageNode visitMailboxReceive(MailboxReceiveNode node, Context context) {
+  public PlanNode visitMailboxReceive(MailboxReceiveNode node, Context context) {
     throw new UnsupportedOperationException("MailboxReceiveNode should not be visited by StageFragmenter");
   }
 
   @Override
-  public StageNode visitMailboxSend(MailboxSendNode node, Context context) {
+  public PlanNode visitMailboxSend(MailboxSendNode node, Context context) {
     throw new UnsupportedOperationException("MailboxSendNode should not be visited by StageFragmenter");
   }
 
   @Override
-  public StageNode visitProject(ProjectNode node, Context context) {
+  public PlanNode visitProject(ProjectNode node, Context context) {
     return process(node, context);
   }
 
   @Override
-  public StageNode visitSort(SortNode node, Context context) {
+  public PlanNode visitSort(SortNode node, Context context) {
     return process(node, context);
   }
 
   @Override
-  public StageNode visitTableScan(TableScanNode node, Context context) {
+  public PlanNode visitTableScan(TableScanNode node, Context context) {
     return process(node, context);
   }
 
   @Override
-  public StageNode visitValue(ValueNode node, Context context) {
+  public PlanNode visitValue(ValueNode node, Context context) {
     return process(node, context);
   }
 
   @Override
-  public StageNode visitWindow(WindowNode node, Context context) {
+  public PlanNode visitWindow(WindowNode node, Context context) {
     return process(node, context);
   }
 
   @Override
-  public StageNode visitSetOp(SetOpNode node, Context context) {
+  public PlanNode visitSetOp(SetOpNode node, Context context) {
     return process(node, context);
   }
 
   @Override
-  public StageNode visitExchange(ExchangeNode node, Context context) {
+  public PlanNode visitExchange(ExchangeNode node, Context context) {
     int nodeStageId = context._previousStageId;
 
     context._currentStageId++;
-    StageNode nextStageRoot = node.getInputs().get(0).visit(this, context);
+    PlanNode nextStageRoot = node.getInputs().get(0).visit(this, context);
 
     List<Integer> distributionKeys = node.getDistributionKeys();
     RelDistribution.Type exchangeType = node.getDistributionType();
@@ -122,10 +122,10 @@ public class StageFragmenter implements StageNodeVisitor<StageNode, StageFragmen
     KeySelector<Object[], Object[]> keySelector = exchangeType == RelDistribution.Type.HASH_DISTRIBUTED
         ? new FieldSelectionKeySelector(distributionKeys) : null;
 
-    StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), nextStageRoot.getDataSchema(),
+    PlanNode mailboxSender = new MailboxSendNode(nextStageRoot.getPlanFragmentId(), nextStageRoot.getDataSchema(),
         nodeStageId, exchangeType, keySelector, node.getCollations(), node.isSortOnSender());
-    StageNode mailboxReceiver = new MailboxReceiveNode(nodeStageId, nextStageRoot.getDataSchema(),
-        nextStageRoot.getStageId(), exchangeType, keySelector,
+    PlanNode mailboxReceiver = new MailboxReceiveNode(nodeStageId, nextStageRoot.getDataSchema(),
+        nextStageRoot.getPlanFragmentId(), exchangeType, keySelector,
         node.getCollations(), node.isSortOnSender(), node.isSortOnReceiver(), mailboxSender);
     mailboxSender.addInput(nextStageRoot);
 
@@ -134,7 +134,7 @@ public class StageFragmenter implements StageNodeVisitor<StageNode, StageFragmen
 
   public static class Context {
 
-    // Stage ID starts with 1, 0 will be reserved for ROOT stage.
+    // Stage ID starts with 1, 0 will be reserved for ROOT PlanFragment.
     Integer _currentStageId = 1;
     Integer _previousStageId = 1;
   }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index 91e1ba7a89..5ca7de8543 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -29,14 +29,14 @@ import org.apache.pinot.query.planner.QueryPlan;
 import org.apache.pinot.query.planner.physical.DispatchablePlanContext;
 import org.apache.pinot.query.planner.physical.DispatchablePlanVisitor;
 import org.apache.pinot.query.planner.physical.colocated.GreedyShuffleRewriteVisitor;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
 import org.apache.pinot.query.routing.WorkerManager;
 
 
 /**
- * QueryPlanMaker walks top-down from {@link RelRoot} and construct a forest of trees with {@link StageNode}.
+ * QueryPlanMaker walks top-down from {@link RelRoot} and construct a forest of trees with {@link PlanNode}.
  *
  * This class is non-threadsafe. Do not reuse the stage planner for multiple query plans.
  */
@@ -64,19 +64,20 @@ public class StagePlanner {
     RelNode relRootNode = relRoot.rel;
 
     // Walk through RelNode tree and construct a StageNode tree.
-    StageNode globalStageRoot = relNodeToStageNode(relRootNode);
+    PlanNode globalStageRoot = relNodeToStageNode(relRootNode);
 
     // Fragment the stage tree into multiple stages.
     globalStageRoot = globalStageRoot.visit(StageFragmenter.INSTANCE, new StageFragmenter.Context());
 
     // global root needs to send results back to the ROOT, a.k.a. the client response node. the last stage only has one
     // receiver so doesn't matter what the exchange type is. setting it to SINGLETON by default.
-    StageNode globalSenderNode = new MailboxSendNode(globalStageRoot.getStageId(), globalStageRoot.getDataSchema(),
-        0, RelDistribution.Type.RANDOM_DISTRIBUTED, null, null, false);
+    PlanNode globalSenderNode =
+        new MailboxSendNode(globalStageRoot.getPlanFragmentId(), globalStageRoot.getDataSchema(),
+            0, RelDistribution.Type.RANDOM_DISTRIBUTED, null, null, false);
     globalSenderNode.addInput(globalStageRoot);
 
-    StageNode globalReceiverNode =
-        new MailboxReceiveNode(0, globalStageRoot.getDataSchema(), globalStageRoot.getStageId(),
+    PlanNode globalReceiverNode =
+        new MailboxReceiveNode(0, globalStageRoot.getDataSchema(), globalStageRoot.getPlanFragmentId(),
             RelDistribution.Type.RANDOM_DISTRIBUTED, null, null, false, false, globalSenderNode);
 
     // perform physical plan conversion and assign workers to each stage.
@@ -93,13 +94,13 @@ public class StagePlanner {
 
   // non-threadsafe
   // TODO: add dataSchema (extracted from RelNode schema) to the StageNode.
-  private StageNode relNodeToStageNode(RelNode node) {
-    StageNode stageNode = RelToStageConverter.toStageNode(node, -1);
+  private PlanNode relNodeToStageNode(RelNode node) {
+    PlanNode planNode = RelToStageConverter.toStageNode(node, -1);
     List<RelNode> inputs = node.getInputs();
     for (RelNode input : inputs) {
-      stageNode.addInput(relNodeToStageNode(input));
+      planNode.addInput(relNodeToStageNode(input));
     }
-    return stageNode;
+    return planNode;
   }
 
   // TODO: Switch to Worker SPI to avoid multiple-places where workers are assigned.
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
index 227aa5429b..aa587cf974 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanContext.java
@@ -24,7 +24,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.calcite.util.Pair;
 import org.apache.pinot.query.context.PlannerContext;
-import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
 import org.apache.pinot.query.routing.WorkerManager;
 
 
@@ -37,7 +37,7 @@ public class DispatchablePlanContext {
 
   private final PlannerContext _plannerContext;
   private final Map<Integer, DispatchablePlanMetadata> _dispatchablePlanMetadataMap;
-  private final Map<Integer, StageNode> _dispatchablePlanStageRootMap;
+  private final Map<Integer, PlanNode> _dispatchablePlanStageRootMap;
 
   public DispatchablePlanContext(WorkerManager workerManager, long requestId, PlannerContext plannerContext,
       List<Pair<Integer, String>> resultFields, Set<String> tableNames) {
@@ -75,7 +75,7 @@ public class DispatchablePlanContext {
     return _dispatchablePlanMetadataMap;
   }
 
-  public Map<Integer, StageNode> getDispatchablePlanStageRootMap() {
+  public Map<Integer, PlanNode> getDispatchablePlanStageRootMap() {
     return _dispatchablePlanStageRootMap;
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
index 7575c4dc8e..36d677d362 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
@@ -29,7 +29,7 @@ import org.apache.pinot.query.routing.QueryServerInstance;
 
 
 /**
- * The {@code StageMetadata} info contains the information for dispatching a particular stage.
+ * The {@code DispatchablePlanMetadata} info contains the information for dispatching a particular plan fragment.
  *
  * <p>It contains information aboute:
  * <ul>
@@ -50,7 +50,7 @@ public class DispatchablePlanMetadata implements Serializable {
   private Map<Integer, Map<String, List<String>>> _workerIdToSegmentsMap;
 
   // used for build mailboxes between workers.
-  // workerId -> {stageId -> mailbox list}
+  // workerId -> {planFragmentId -> mailbox list}
   private Map<Integer, Map<Integer, MailboxMetadata>> _workerIdToMailboxesMap;
 
   // time boundary info
@@ -100,8 +100,9 @@ public class DispatchablePlanMetadata implements Serializable {
     _workerIdToMailboxesMap.putAll(workerIdToMailboxesMap);
   }
 
-  public void addWorkerIdToMailBoxIdsMap(int stageId, Map<Integer, MailboxMetadata> stageIdToMailboxesMap) {
-    _workerIdToMailboxesMap.put(stageId, stageIdToMailboxesMap);
+  public void addWorkerIdToMailBoxIdsMap(int planFragmentId,
+      Map<Integer, MailboxMetadata> planFragmentIdToMailboxesMap) {
+    _workerIdToMailboxesMap.put(planFragmentId, planFragmentIdToMailboxesMap);
   }
 
   public Map<QueryServerInstance, List<Integer>> getServerInstanceToWorkerIdMap() {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
index 5258fc5637..964d332127 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
@@ -19,23 +19,23 @@
 package org.apache.pinot.query.planner.physical;
 
 import org.apache.pinot.query.planner.QueryPlan;
-import org.apache.pinot.query.planner.stage.AggregateNode;
-import org.apache.pinot.query.planner.stage.ExchangeNode;
-import org.apache.pinot.query.planner.stage.FilterNode;
-import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.ProjectNode;
-import org.apache.pinot.query.planner.stage.SetOpNode;
-import org.apache.pinot.query.planner.stage.SortNode;
-import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.StageNodeVisitor;
-import org.apache.pinot.query.planner.stage.TableScanNode;
-import org.apache.pinot.query.planner.stage.ValueNode;
-import org.apache.pinot.query.planner.stage.WindowNode;
-
-
-public class DispatchablePlanVisitor implements StageNodeVisitor<Void, DispatchablePlanContext> {
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.SortNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
+
+
+public class DispatchablePlanVisitor implements PlanNodeVisitor<Void, DispatchablePlanContext> {
   public static final DispatchablePlanVisitor INSTANCE = new DispatchablePlanVisitor();
 
   private DispatchablePlanVisitor() {
@@ -43,17 +43,17 @@ public class DispatchablePlanVisitor implements StageNodeVisitor<Void, Dispatcha
 
   /**
    * Entry point for attaching dispatch metadata to a query plan. It walks through the plan via the global
-   * {@link StageNode} root of the query and:
+   * {@link PlanNode} root of the query and:
    * <ul>
-   *   <li>break down the {@link StageNode}s into Stages that can run on a single worker.</li>
-   *   <li>each stage is represented by a subset of {@link StageNode}s without data exchange.</li>
+   *   <li>break down the {@link PlanNode}s into Stages that can run on a single worker.</li>
+   *   <li>each stage is represented by a subset of {@link PlanNode}s without data exchange.</li>
    *   <li>attach worker execution information including physical server address, worker ID to each stage.</li>
    * </ul>
    *
    * @param globalReceiverNode the entrypoint of the stage plan.
    * @param dispatchablePlanContext dispatchable plan context used to record the walk of the stage node tree.
    */
-  public QueryPlan constructDispatchablePlan(StageNode globalReceiverNode,
+  public QueryPlan constructDispatchablePlan(PlanNode globalReceiverNode,
       DispatchablePlanContext dispatchablePlanContext) {
     // 1. start by visiting the stage root.
     globalReceiverNode.visit(DispatchablePlanVisitor.INSTANCE, dispatchablePlanContext);
@@ -80,16 +80,17 @@ public class DispatchablePlanVisitor implements StageNodeVisitor<Void, Dispatcha
         dispatchablePlanContext.getDispatchablePlanMetadataMap());
   }
 
-  private static DispatchablePlanMetadata getOrCreateDispatchablePlanMetadata(StageNode node,
+  private static DispatchablePlanMetadata getOrCreateDispatchablePlanMetadata(PlanNode node,
       DispatchablePlanContext context) {
-    return context.getDispatchablePlanMetadataMap().computeIfAbsent(node.getStageId(),
+    return context.getDispatchablePlanMetadataMap().computeIfAbsent(node.getPlanFragmentId(),
         (id) -> new DispatchablePlanMetadata());
   }
 
-  private static void computeWorkerAssignment(StageNode node, DispatchablePlanContext context) {
-    int stageId = node.getStageId();
-    context.getWorkerManager().assignWorkerToStage(stageId, context.getDispatchablePlanMetadataMap().get(stageId),
-        context.getRequestId(), context.getPlannerContext().getOptions(), context.getTableNames());
+  private static void computeWorkerAssignment(PlanNode node, DispatchablePlanContext context) {
+    int planFragmentId = node.getPlanFragmentId();
+    context.getWorkerManager()
+        .assignWorkerToStage(planFragmentId, context.getDispatchablePlanMetadataMap().get(planFragmentId),
+            context.getRequestId(), context.getPlannerContext().getOptions(), context.getTableNames());
   }
 
   @Override
@@ -150,7 +151,7 @@ public class DispatchablePlanVisitor implements StageNodeVisitor<Void, Dispatcha
     node.getInputs().get(0).visit(this, context);
     getOrCreateDispatchablePlanMetadata(node, context);
 
-    context.getDispatchablePlanStageRootMap().put(node.getStageId(), node);
+    context.getDispatchablePlanStageRootMap().put(node.getPlanFragmentId(), node);
     computeWorkerAssignment(node, context);
     return null;
   }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
index c41423cc04..5b3040fd35 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
@@ -21,10 +21,10 @@ package org.apache.pinot.query.planner.physical;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.pinot.query.planner.stage.DefaultPostOrderTraversalVisitor;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.plannode.DefaultPostOrderTraversalVisitor;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
 import org.apache.pinot.query.routing.MailboxMetadata;
 import org.apache.pinot.query.routing.QueryServerInstance;
 import org.apache.pinot.query.routing.VirtualServerAddress;
@@ -34,12 +34,12 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
   public static final MailboxAssignmentVisitor INSTANCE = new MailboxAssignmentVisitor();
 
   @Override
-  public Void process(StageNode node, DispatchablePlanContext context) {
+  public Void process(PlanNode node, DispatchablePlanContext context) {
     if (node instanceof MailboxSendNode || node instanceof MailboxReceiveNode) {
       int receiverStageId =
-          isMailboxReceiveNode(node) ? node.getStageId() : ((MailboxSendNode) node).getReceiverStageId();
+          isMailboxReceiveNode(node) ? node.getPlanFragmentId() : ((MailboxSendNode) node).getReceiverStageId();
       int senderStageId =
-          isMailboxReceiveNode(node) ? ((MailboxReceiveNode) node).getSenderStageId() : node.getStageId();
+          isMailboxReceiveNode(node) ? ((MailboxReceiveNode) node).getSenderStageId() : node.getPlanFragmentId();
       DispatchablePlanMetadata receiverStagePlanMetadata =
           context.getDispatchablePlanMetadataMap().get(receiverStageId);
       DispatchablePlanMetadata senderStagePlanMetadata = context.getDispatchablePlanMetadataMap().get(senderStageId);
@@ -71,20 +71,21 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
     return null;
   }
 
-  private static boolean isMailboxReceiveNode(StageNode node) {
+  private static boolean isMailboxReceiveNode(PlanNode node) {
     return node instanceof MailboxReceiveNode;
   }
 
-  private MailboxMetadata getMailboxMetadata(DispatchablePlanMetadata stagePlanMetadata, int stageId, int workerId) {
+  private MailboxMetadata getMailboxMetadata(DispatchablePlanMetadata dispatchablePlanMetadata, int planFragmentId,
+      int workerId) {
     Map<Integer, Map<Integer, MailboxMetadata>> workerIdToMailBoxIdsMap =
-        stagePlanMetadata.getWorkerIdToMailBoxIdsMap();
+        dispatchablePlanMetadata.getWorkerIdToMailBoxIdsMap();
     if (!workerIdToMailBoxIdsMap.containsKey(workerId)) {
       workerIdToMailBoxIdsMap.put(workerId, new HashMap<>());
     }
-    Map<Integer, MailboxMetadata> stageToMailboxMetadataMap = workerIdToMailBoxIdsMap.get(workerId);
-    if (!stageToMailboxMetadataMap.containsKey(stageId)) {
-      stageToMailboxMetadataMap.put(stageId, new MailboxMetadata());
+    Map<Integer, MailboxMetadata> planFragmentToMailboxMetadataMap = workerIdToMailBoxIdsMap.get(workerId);
+    if (!planFragmentToMailboxMetadataMap.containsKey(planFragmentId)) {
+      planFragmentToMailboxMetadataMap.put(planFragmentId, new MailboxMetadata());
     }
-    return stageToMailboxMetadataMap.get(stageId);
+    return planFragmentToMailboxMetadataMap.get(planFragmentId);
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteContext.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteContext.java
index 23a752a991..d8c6283335 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteContext.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteContext.java
@@ -24,21 +24,21 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
 
 
 /**
  * Context used for running the {@link GreedyShuffleRewriteVisitor}.
  */
 class GreedyShuffleRewriteContext {
-  private final Map<Integer, StageNode> _rootStageNode;
-  private final Map<Integer, List<StageNode>> _leafNodes;
+  private final Map<Integer, PlanNode> _rootStageNode;
+  private final Map<Integer, List<PlanNode>> _leafNodes;
   private final Set<Integer> _joinStages;
   private final Set<Integer> _setOpStages;
 
   /**
-   * A map to track the partition keys for the input to the MailboxSendNode of a given stageId. This is needed
+   * A map to track the partition keys for the input to the MailboxSendNode of a given planFragmentId. This is needed
    * because the {@link GreedyShuffleRewriteVisitor} doesn't determine the distribution of the sender if the receiver
    * is a join-stage.
    */
@@ -53,75 +53,75 @@ class GreedyShuffleRewriteContext {
   }
 
   /**
-   * Returns the root StageNode for a given stageId.
+   * Returns the root StageNode for a given planFragmentId.
    */
-  StageNode getRootStageNode(Integer stageId) {
-    return _rootStageNode.get(stageId);
+  PlanNode getRootStageNode(Integer planFragmentId) {
+    return _rootStageNode.get(planFragmentId);
   }
 
   /**
-   * Sets the root StageNode for a given stageId.
+   * Sets the root StageNode for a given planFragmentId.
    */
-  void setRootStageNode(Integer stageId, StageNode stageNode) {
-    _rootStageNode.put(stageId, stageNode);
+  void setRootStageNode(Integer planFragmentId, PlanNode planNode) {
+    _rootStageNode.put(planFragmentId, planNode);
   }
 
   /**
-   * Returns all the leaf StageNode for a given stageId.
+   * Returns all the leaf StageNode for a given planFragmentId.
    */
-  List<StageNode> getLeafNodes(Integer stageId) {
-    return _leafNodes.get(stageId);
+  List<PlanNode> getLeafNodes(Integer planFragmentId) {
+    return _leafNodes.get(planFragmentId);
   }
 
   /**
-   * Adds a leaf StageNode for a given stageId.
+   * Adds a leaf PlanNode for a given planFragmentId.
    */
-  void addLeafNode(Integer stageId, StageNode stageNode) {
-    _leafNodes.computeIfAbsent(stageId, (x) -> new ArrayList<>()).add(stageNode);
+  void addLeafNode(Integer planFragmentId, PlanNode planNode) {
+    _leafNodes.computeIfAbsent(planFragmentId, (x) -> new ArrayList<>()).add(planNode);
   }
 
   /**
-   * {@link GreedyShuffleRewriteContext} allows checking whether a given stageId has a JoinNode or not. During
-   * pre-computation, this method may be used to mark that the given stageId has a JoinNode.
+   * {@link GreedyShuffleRewriteContext} allows checking whether a given planFragmentId has a JoinNode or not. During
+   * pre-computation, this method may be used to mark that the given planFragmentId has a JoinNode.
    */
-  void markJoinStage(Integer stageId) {
-    _joinStages.add(stageId);
+  void markJoinStage(Integer planFragmentId) {
+    _joinStages.add(planFragmentId);
   }
 
   /**
-   * Returns true if the given stageId has a JoinNode.
+   * Returns true if the given planFragmentId has a JoinNode.
    */
-  boolean isJoinStage(Integer stageId) {
-    return _joinStages.contains(stageId);
+  boolean isJoinStage(Integer planFragmentId) {
+    return _joinStages.contains(planFragmentId);
   }
 
-
   /**
-   * {@link GreedyShuffleRewriteContext} allows checking whether a given stageId has a SetOpNode or not. During
-   * pre-computation, this method may be used to mark that the given stageId has a SetOpNode.
+   * {@link GreedyShuffleRewriteContext} allows checking whether a given planFragmentId has a SetOpNode or not. During
+   * pre-computation, this method may be used to mark that the given planFragmentId has a SetOpNode.
    */
-  void markSetOpStage(Integer stageId) {
-    _setOpStages.add(stageId);
+  void markSetOpStage(Integer planFragmentId) {
+    _setOpStages.add(planFragmentId);
   }
 
   /**
-   * Returns true if the given stageId has a SetOpNode.
+   * Returns true if the given planFragmentId has a SetOpNode.
    */
-  boolean isSetOpStage(Integer stageId) {
-    return _setOpStages.contains(stageId);
+  boolean isSetOpStage(Integer planFragmentId) {
+    return _setOpStages.contains(planFragmentId);
   }
 
   /**
-   * This returns the {@link Set<ColocationKey>} for the input to the {@link MailboxSendNode} of the given stageId.
+   * This returns the {@link Set<ColocationKey>} for the input to the {@link MailboxSendNode} of the given
+   * planFragmentId.
    */
-  Set<ColocationKey> getColocationKeys(Integer stageId) {
-    return _senderInputColocationKeys.get(stageId);
+  Set<ColocationKey> getColocationKeys(Integer planFragmentId) {
+    return _senderInputColocationKeys.get(planFragmentId);
   }
 
   /**
-   * This sets the {@link Set<ColocationKey>} for the input to the {@link MailboxSendNode} of the given stageId.
+   * This sets the {@link Set<ColocationKey>} for the input to the {@link MailboxSendNode} of the given planFragmentId.
    */
-  void setColocationKeys(Integer stageId, Set<ColocationKey> colocationKeys) {
-    _senderInputColocationKeys.put(stageId, colocationKeys);
+  void setColocationKeys(Integer planFragmentId, Set<ColocationKey> colocationKeys) {
+    _senderInputColocationKeys.put(planFragmentId, colocationKeys);
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewritePreComputeVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewritePreComputeVisitor.java
index d42fcfa4ce..f8d7e30842 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewritePreComputeVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewritePreComputeVisitor.java
@@ -18,12 +18,12 @@
  */
 package org.apache.pinot.query.planner.physical.colocated;
 
-import org.apache.pinot.query.planner.stage.DefaultPostOrderTraversalVisitor;
-import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.planner.stage.SetOpNode;
-import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.TableScanNode;
+import org.apache.pinot.query.planner.plannode.DefaultPostOrderTraversalVisitor;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
 
 
 /**
@@ -32,44 +32,44 @@ import org.apache.pinot.query.planner.stage.TableScanNode;
 class GreedyShuffleRewritePreComputeVisitor
     extends DefaultPostOrderTraversalVisitor<Integer, GreedyShuffleRewriteContext> {
 
-  static GreedyShuffleRewriteContext preComputeContext(StageNode rootStageNode) {
+  static GreedyShuffleRewriteContext preComputeContext(PlanNode rootPlanNode) {
     GreedyShuffleRewriteContext context = new GreedyShuffleRewriteContext();
-    rootStageNode.visit(new GreedyShuffleRewritePreComputeVisitor(), context);
+    rootPlanNode.visit(new GreedyShuffleRewritePreComputeVisitor(), context);
     return context;
   }
 
   @Override
-  public Integer process(StageNode stageNode, GreedyShuffleRewriteContext context) {
-    int currentStageId = stageNode.getStageId();
-    context.setRootStageNode(currentStageId, stageNode);
+  public Integer process(PlanNode planNode, GreedyShuffleRewriteContext context) {
+    int currentStageId = planNode.getPlanFragmentId();
+    context.setRootStageNode(currentStageId, planNode);
     return 0;
   }
 
   @Override
   public Integer visitJoin(JoinNode joinNode, GreedyShuffleRewriteContext context) {
     super.visitJoin(joinNode, context);
-    context.markJoinStage(joinNode.getStageId());
+    context.markJoinStage(joinNode.getPlanFragmentId());
     return 0;
   }
 
   @Override
-  public Integer visitMailboxReceive(MailboxReceiveNode stageNode, GreedyShuffleRewriteContext context) {
-    super.visitMailboxReceive(stageNode, context);
-    context.addLeafNode(stageNode.getStageId(), stageNode);
+  public Integer visitMailboxReceive(MailboxReceiveNode planNode, GreedyShuffleRewriteContext context) {
+    super.visitMailboxReceive(planNode, context);
+    context.addLeafNode(planNode.getPlanFragmentId(), planNode);
     return 0;
   }
 
   @Override
-  public Integer visitTableScan(TableScanNode stageNode, GreedyShuffleRewriteContext context) {
-    super.visitTableScan(stageNode, context);
-    context.addLeafNode(stageNode.getStageId(), stageNode);
+  public Integer visitTableScan(TableScanNode planNode, GreedyShuffleRewriteContext context) {
+    super.visitTableScan(planNode, context);
+    context.addLeafNode(planNode.getPlanFragmentId(), planNode);
     return 0;
   }
 
   @Override
   public Integer visitSetOp(SetOpNode setOpNode, GreedyShuffleRewriteContext context) {
     super.visitSetOp(setOpNode, context);
-    context.markSetOpStage(setOpNode.getStageId());
+    context.markSetOpStage(setOpNode.getPlanFragmentId());
     return 0;
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
index 6d58bde3af..99e2f2ab68 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
@@ -34,20 +34,20 @@ import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.planner.physical.DispatchablePlanMetadata;
-import org.apache.pinot.query.planner.stage.AggregateNode;
-import org.apache.pinot.query.planner.stage.ExchangeNode;
-import org.apache.pinot.query.planner.stage.FilterNode;
-import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.ProjectNode;
-import org.apache.pinot.query.planner.stage.SetOpNode;
-import org.apache.pinot.query.planner.stage.SortNode;
-import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.StageNodeVisitor;
-import org.apache.pinot.query.planner.stage.TableScanNode;
-import org.apache.pinot.query.planner.stage.ValueNode;
-import org.apache.pinot.query.planner.stage.WindowNode;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.SortNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
 import org.apache.pinot.query.routing.QueryServerInstance;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.IndexingConfig;
@@ -67,7 +67,7 @@ import org.slf4j.LoggerFactory;
  *
  * Also see: {@link ColocationKey} for its definition.
  */
-public class GreedyShuffleRewriteVisitor implements StageNodeVisitor<Set<ColocationKey>, GreedyShuffleRewriteContext> {
+public class GreedyShuffleRewriteVisitor implements PlanNodeVisitor<Set<ColocationKey>, GreedyShuffleRewriteContext> {
   private static final Logger LOGGER = LoggerFactory.getLogger(GreedyShuffleRewriteVisitor.class);
 
   private final TableCache _tableCache;
@@ -75,14 +75,14 @@ public class GreedyShuffleRewriteVisitor implements StageNodeVisitor<Set<Colocat
   private boolean _canSkipShuffleForJoin;
 
   public static void optimizeShuffles(QueryPlan queryPlan, TableCache tableCache) {
-    StageNode rootStageNode = queryPlan.getQueryStageMap().get(0);
+    PlanNode rootPlanNode = queryPlan.getQueryStageMap().get(0);
     Map<Integer, DispatchablePlanMetadata> dispatchablePlanMetadataMap = queryPlan.getDispatchablePlanMetadataMap();
-    GreedyShuffleRewriteContext context = GreedyShuffleRewritePreComputeVisitor.preComputeContext(rootStageNode);
-    // This assumes that if stageId(S1) > stageId(S2), then S1 is not an ancestor of S2.
+    GreedyShuffleRewriteContext context = GreedyShuffleRewritePreComputeVisitor.preComputeContext(rootPlanNode);
+    // This assumes that if planFragmentId(S1) > planFragmentId(S2), then S1 is not an ancestor of S2.
     // TODO: If this assumption is wrong, we can compute the reverse topological ordering explicitly.
-    for (int stageId = dispatchablePlanMetadataMap.size() - 1; stageId >= 0; stageId--) {
-      StageNode stageNode = context.getRootStageNode(stageId);
-      stageNode.visit(new GreedyShuffleRewriteVisitor(tableCache, dispatchablePlanMetadataMap), context);
+    for (int planFragmentId = dispatchablePlanMetadataMap.size() - 1; planFragmentId >= 0; planFragmentId--) {
+      PlanNode planNode = context.getRootStageNode(planFragmentId);
+      planNode.visit(new GreedyShuffleRewriteVisitor(tableCache, dispatchablePlanMetadataMap), context);
     }
   }
 
@@ -118,7 +118,8 @@ public class GreedyShuffleRewriteVisitor implements StageNodeVisitor<Set<Colocat
   @Override
   public Set<ColocationKey> visitJoin(JoinNode node, GreedyShuffleRewriteContext context) {
     List<MailboxReceiveNode> innerLeafNodes =
-        context.getLeafNodes(node.getStageId()).stream().map(x -> (MailboxReceiveNode) x).collect(Collectors.toList());
+        context.getLeafNodes(node.getPlanFragmentId()).stream().map(x -> (MailboxReceiveNode) x)
+            .collect(Collectors.toList());
     Preconditions.checkState(innerLeafNodes.size() == 2);
 
     // Multiple checks need to be made to ensure that shuffle can be skipped for a join.
@@ -127,7 +128,8 @@ public class GreedyShuffleRewriteVisitor implements StageNodeVisitor<Set<Colocat
     // Step-2: Only if the servers assigned to both left and right nodes are equal and the servers assigned to the join
     //         stage are a superset of those servers, can we skip shuffles.
     canColocate =
-        canColocate && canServerAssignmentAllowShuffleSkip(node.getStageId(), innerLeafNodes.get(0).getSenderStageId(),
+        canColocate && canServerAssignmentAllowShuffleSkip(node.getPlanFragmentId(),
+            innerLeafNodes.get(0).getSenderStageId(),
             innerLeafNodes.get(1).getSenderStageId());
     // Step-3: For both left/right MailboxReceiveNode/MailboxSendNode pairs, check whether the key partitioning can
     //         allow shuffle skip.
@@ -140,7 +142,7 @@ public class GreedyShuffleRewriteVisitor implements StageNodeVisitor<Set<Colocat
     canColocate = canColocate && checkPartitionScheme(innerLeafNodes.get(0), innerLeafNodes.get(1), context);
     if (canColocate) {
       // If shuffle can be skipped, reassign servers.
-      _dispatchablePlanMetadataMap.get(node.getStageId()).setServerInstanceToWorkerIdMap(
+      _dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).setServerInstanceToWorkerIdMap(
           _dispatchablePlanMetadataMap.get(innerLeafNodes.get(0).getSenderStageId()).getServerInstanceToWorkerIdMap());
       _canSkipShuffleForJoin = true;
     }
@@ -168,18 +170,19 @@ public class GreedyShuffleRewriteVisitor implements StageNodeVisitor<Set<Colocat
     KeySelector<Object[], Object[]> selector = node.getPartitionKeySelector();
     Set<ColocationKey> oldColocationKeys = context.getColocationKeys(node.getSenderStageId());
     // If the current stage is not a join-stage, then we already know sender's distribution
-    if (!context.isJoinStage(node.getStageId())) {
+    if (!context.isJoinStage(node.getPlanFragmentId())) {
       if (selector == null) {
         return new HashSet<>();
-      } else if (colocationKeyCondition(oldColocationKeys, selector) && areServersSuperset(node.getStageId(),
+      } else if (colocationKeyCondition(oldColocationKeys, selector) && areServersSuperset(node.getPlanFragmentId(),
           node.getSenderStageId())) {
         node.setExchangeType(RelDistribution.Type.SINGLETON);
-        _dispatchablePlanMetadataMap.get(node.getStageId()).setServerInstanceToWorkerIdMap(
+        _dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).setServerInstanceToWorkerIdMap(
             _dispatchablePlanMetadataMap.get(node.getSenderStageId()).getServerInstanceToWorkerIdMap());
         return oldColocationKeys;
       }
       // This means we can't skip shuffle and there's a partitioning enforced by receiver.
-      int numPartitions = _dispatchablePlanMetadataMap.get(node.getStageId()).getServerInstanceToWorkerIdMap().size();
+      int numPartitions =
+          _dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).getServerInstanceToWorkerIdMap().size();
       List<ColocationKey> colocationKeys = ((FieldSelectionKeySelector) selector).getColumnIndices().stream()
           .map(x -> new ColocationKey(x, numPartitions, selector.hashAlgorithm())).collect(Collectors.toList());
       return new HashSet<>(colocationKeys);
@@ -195,7 +198,8 @@ public class GreedyShuffleRewriteVisitor implements StageNodeVisitor<Set<Colocat
       return new HashSet<>();
     }
     // This means we can't skip shuffle and there's a partitioning enforced by receiver.
-    int numPartitions = _dispatchablePlanMetadataMap.get(node.getStageId()).getServerInstanceToWorkerIdMap().size();
+    int numPartitions =
+        _dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).getServerInstanceToWorkerIdMap().size();
     List<ColocationKey> colocationKeys = ((FieldSelectionKeySelector) selector).getColumnIndices().stream()
         .map(x -> new ColocationKey(x, numPartitions, selector.hashAlgorithm())).collect(Collectors.toList());
     return new HashSet<>(colocationKeys);
@@ -210,19 +214,19 @@ public class GreedyShuffleRewriteVisitor implements StageNodeVisitor<Set<Colocat
     // If receiver is not a join-stage, then we can determine distribution type now.
     if (!context.isJoinStage(node.getReceiverStageId())) {
       Set<ColocationKey> colocationKeys;
-      if (canSkipShuffleBasic && areServersSuperset(node.getReceiverStageId(), node.getStageId())) {
+      if (canSkipShuffleBasic && areServersSuperset(node.getReceiverStageId(), node.getPlanFragmentId())) {
         // Servers are not re-assigned on sender-side. If needed, they are re-assigned on the receiver side.
         node.setExchangeType(RelDistribution.Type.SINGLETON);
         colocationKeys = oldColocationKeys;
       } else {
         colocationKeys = new HashSet<>();
       }
-      context.setColocationKeys(node.getStageId(), colocationKeys);
+      context.setColocationKeys(node.getPlanFragmentId(), colocationKeys);
       return colocationKeys;
     }
     // If receiver is a join-stage, remember partition-keys of the child node of MailboxSendNode.
     Set<ColocationKey> mailboxSendColocationKeys = canSkipShuffleBasic ? oldColocationKeys : new HashSet<>();
-    context.setColocationKeys(node.getStageId(), mailboxSendColocationKeys);
+    context.setColocationKeys(node.getPlanFragmentId(), mailboxSendColocationKeys);
     return mailboxSendColocationKeys;
   }
 
@@ -371,7 +375,7 @@ public class GreedyShuffleRewriteVisitor implements StageNodeVisitor<Set<Colocat
   private static boolean partitionKeyConditionForJoin(MailboxReceiveNode mailboxReceiveNode,
       MailboxSendNode mailboxSendNode, GreedyShuffleRewriteContext context) {
     // First check ColocationKeyCondition for the sender <--> sender.getInputs().get(0) pair
-    Set<ColocationKey> oldColocationKeys = context.getColocationKeys(mailboxSendNode.getStageId());
+    Set<ColocationKey> oldColocationKeys = context.getColocationKeys(mailboxSendNode.getPlanFragmentId());
     KeySelector<Object[], Object[]> selector = mailboxSendNode.getPartitionKeySelector();
     if (!colocationKeyCondition(oldColocationKeys, selector)) {
       return false;
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/AbstractPlanNode.java
similarity index 72%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/AbstractPlanNode.java
index c2f0e4b1be..ae696ac7f9 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/AbstractPlanNode.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -26,40 +26,40 @@ import org.apache.pinot.query.planner.serde.ProtoSerializable;
 import org.apache.pinot.query.planner.serde.ProtoSerializationUtils;
 
 
-public abstract class AbstractStageNode implements StageNode, ProtoSerializable {
+public abstract class AbstractPlanNode implements PlanNode, ProtoSerializable {
 
-  protected int _stageId;
-  protected final List<StageNode> _inputs;
+  protected int _planFragmentId;
+  protected final List<PlanNode> _inputs;
   protected DataSchema _dataSchema;
 
-  public AbstractStageNode(int stageId) {
-    this(stageId, null);
+  public AbstractPlanNode(int planFragmentId) {
+    this(planFragmentId, null);
   }
 
-  public AbstractStageNode(int stageId, DataSchema dataSchema) {
-    _stageId = stageId;
+  public AbstractPlanNode(int planFragmentId, DataSchema dataSchema) {
+    _planFragmentId = planFragmentId;
     _dataSchema = dataSchema;
     _inputs = new ArrayList<>();
   }
 
   @Override
-  public int getStageId() {
-    return _stageId;
+  public int getPlanFragmentId() {
+    return _planFragmentId;
   }
 
   @Override
-  public void setStageId(int stageId) {
-    _stageId = stageId;
+  public void setPlanFragmentId(int planFragmentId) {
+    _planFragmentId = planFragmentId;
   }
 
   @Override
-  public List<StageNode> getInputs() {
+  public List<PlanNode> getInputs() {
     return _inputs;
   }
 
   @Override
-  public void addInput(StageNode stageNode) {
-    _inputs.add(stageNode);
+  public void addInput(PlanNode planNode) {
+    _inputs.add(planNode);
   }
 
   @Override
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/AggregateNode.java
similarity index 86%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/AggregateNode.java
index ed21c674d6..6776d9e769 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/AggregateNode.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
 
 import com.google.common.base.Preconditions;
 import java.util.List;
@@ -29,7 +29,7 @@ import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.serde.ProtoProperties;
 
 
-public class AggregateNode extends AbstractStageNode {
+public class AggregateNode extends AbstractPlanNode {
   public static final RelHint FINAL_STAGE_HINT = RelHint.builder(
       PinotHintStrategyTable.INTERNAL_AGG_FINAL_STAGE).build();
   public static final RelHint INTERMEDIATE_STAGE_HINT = RelHint.builder(
@@ -41,13 +41,14 @@ public class AggregateNode extends AbstractStageNode {
   @ProtoProperties
   private List<RexExpression> _groupSet;
 
-  public AggregateNode(int stageId) {
-    super(stageId);
+  public AggregateNode(int planFragmentId) {
+    super(planFragmentId);
   }
 
-  public AggregateNode(int stageId, DataSchema dataSchema, List<AggregateCall> aggCalls, List<RexExpression> groupSet,
+  public AggregateNode(int planFragmentId, DataSchema dataSchema, List<AggregateCall> aggCalls,
+      List<RexExpression> groupSet,
       List<RelHint> relHints) {
-    super(stageId, dataSchema);
+    super(planFragmentId, dataSchema);
     _aggCalls = aggCalls.stream().map(RexExpression::toRexExpression).collect(Collectors.toList());
     _groupSet = groupSet;
     _relHints = relHints;
@@ -81,7 +82,7 @@ public class AggregateNode extends AbstractStageNode {
   }
 
   @Override
-  public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+  public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {
     return visitor.visitAggregate(this, context);
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/DefaultPostOrderTraversalVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/DefaultPostOrderTraversalVisitor.java
similarity index 95%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/DefaultPostOrderTraversalVisitor.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/DefaultPostOrderTraversalVisitor.java
index 4c93f9d289..e1ca593b02 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/DefaultPostOrderTraversalVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/DefaultPostOrderTraversalVisitor.java
@@ -16,15 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
 
 /**
  * A base implementation of a visitor pattern where the children of a given node are visited first and after that the
  * node is processed (post-order traversal).
  */
-public abstract class DefaultPostOrderTraversalVisitor<T, C> implements StageNodeVisitor<T, C> {
+public abstract class DefaultPostOrderTraversalVisitor<T, C> implements PlanNodeVisitor<T, C> {
 
-  public abstract T process(StageNode stageNode, C context);
+  public abstract T process(PlanNode planNode, C context);
 
   @Override
   public T visitAggregate(AggregateNode node, C context) {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ExchangeNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
similarity index 90%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ExchangeNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
index 328dd8568c..04adca3eee 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ExchangeNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
 
 import java.util.List;
 import org.apache.calcite.rel.RelDistribution;
@@ -29,7 +29,7 @@ import org.apache.pinot.query.planner.serde.ProtoProperties;
  * ExchangeNode represents the exchange stage in the query plan.
  * It is used to exchange the data between the instances.
  */
-public class ExchangeNode extends AbstractStageNode {
+public class ExchangeNode extends AbstractPlanNode {
 
   @ProtoProperties
   private RelDistribution.Type _exchangeType;
@@ -46,8 +46,8 @@ public class ExchangeNode extends AbstractStageNode {
   @ProtoProperties
   private List<RelFieldCollation> _collations;
 
-  public ExchangeNode(int stageId) {
-    super(stageId);
+  public ExchangeNode(int planFragmentId) {
+    super(planFragmentId);
   }
 
   public ExchangeNode(int currentStageId, DataSchema dataSchema, RelDistribution distribution,
@@ -67,7 +67,7 @@ public class ExchangeNode extends AbstractStageNode {
   }
 
   @Override
-  public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+  public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {
     return visitor.visitExchange(this, context);
   }
 
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/FilterNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/FilterNode.java
similarity index 86%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/FilterNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/FilterNode.java
index 52ed004da1..15d4ff15c8 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/FilterNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/FilterNode.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
 
 import org.apache.calcite.rex.RexNode;
 import org.apache.pinot.common.utils.DataSchema;
@@ -24,12 +24,12 @@ import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.serde.ProtoProperties;
 
 
-public class FilterNode extends AbstractStageNode {
+public class FilterNode extends AbstractPlanNode {
   @ProtoProperties
   private RexExpression _condition;
 
-  public FilterNode(int stageId) {
-    super(stageId);
+  public FilterNode(int planFragmentId) {
+    super(planFragmentId);
   }
 
   public FilterNode(int currentStageId, DataSchema dataSchema, RexNode condition) {
@@ -47,7 +47,7 @@ public class FilterNode extends AbstractStageNode {
   }
 
   @Override
-  public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+  public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {
     return visitor.visitFilter(this, context);
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
similarity index 88%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
index 3b127d6568..6d089c6239 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
 
 import java.util.Arrays;
 import java.util.List;
@@ -28,7 +28,7 @@ import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.planner.serde.ProtoProperties;
 
 
-public class JoinNode extends AbstractStageNode {
+public class JoinNode extends AbstractPlanNode {
   @ProtoProperties
   private JoinRelType _joinRelType;
   @ProtoProperties
@@ -40,13 +40,13 @@ public class JoinNode extends AbstractStageNode {
   @ProtoProperties
   private List<String> _rightColumnNames;
 
-  public JoinNode(int stageId) {
-    super(stageId);
+  public JoinNode(int planFragmentId) {
+    super(planFragmentId);
   }
 
-  public JoinNode(int stageId, DataSchema dataSchema, DataSchema leftSchema, DataSchema rightSchema,
+  public JoinNode(int planFragmentId, DataSchema dataSchema, DataSchema leftSchema, DataSchema rightSchema,
       JoinRelType joinRelType, JoinKeys joinKeys, List<RexExpression> joinClause) {
-    super(stageId, dataSchema);
+    super(planFragmentId, dataSchema);
     _leftColumnNames = Arrays.asList(leftSchema.getColumnNames());
     _rightColumnNames = Arrays.asList(rightSchema.getColumnNames());
     _joinRelType = joinRelType;
@@ -80,7 +80,7 @@ public class JoinNode extends AbstractStageNode {
   }
 
   @Override
-  public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+  public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {
     return visitor.visitJoin(this, context);
   }
 
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java
similarity index 89%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java
index 13eb8b5296..3597651fdd 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
 
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
@@ -32,7 +32,7 @@ import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.planner.serde.ProtoProperties;
 
 
-public class MailboxReceiveNode extends AbstractStageNode {
+public class MailboxReceiveNode extends AbstractPlanNode {
   @ProtoProperties
   private int _senderStageId;
   @ProtoProperties
@@ -50,17 +50,17 @@ public class MailboxReceiveNode extends AbstractStageNode {
 
   // this is only available during planning and should not be relied
   // on in any post-serialization code
-  private transient StageNode _sender;
+  private transient PlanNode _sender;
 
-  public MailboxReceiveNode(int stageId) {
-    super(stageId);
+  public MailboxReceiveNode(int planFragmentId) {
+    super(planFragmentId);
   }
 
-  public MailboxReceiveNode(int stageId, DataSchema dataSchema, int senderStageId,
+  public MailboxReceiveNode(int planFragmentId, DataSchema dataSchema, int senderStageId,
       RelDistribution.Type exchangeType, @Nullable KeySelector<Object[], Object[]> partitionKeySelector,
       @Nullable List<RelFieldCollation> fieldCollations, boolean isSortOnSender, boolean isSortOnReceiver,
-      StageNode sender) {
-    super(stageId, dataSchema);
+      PlanNode sender) {
+    super(planFragmentId, dataSchema);
     _senderStageId = senderStageId;
     _exchangeType = exchangeType;
     _partitionKeySelector = partitionKeySelector;
@@ -117,7 +117,7 @@ public class MailboxReceiveNode extends AbstractStageNode {
     return _isSortOnReceiver;
   }
 
-  public StageNode getSender() {
+  public PlanNode getSender() {
     return _sender;
   }
 
@@ -127,7 +127,7 @@ public class MailboxReceiveNode extends AbstractStageNode {
   }
 
   @Override
-  public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+  public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {
     return visitor.visitMailboxReceive(this, context);
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
similarity index 90%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
index 54f80c6794..08624959b7 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
 
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
@@ -32,7 +32,7 @@ import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.planner.serde.ProtoProperties;
 
 
-public class MailboxSendNode extends AbstractStageNode {
+public class MailboxSendNode extends AbstractPlanNode {
   @ProtoProperties
   private int _receiverStageId;
   @ProtoProperties
@@ -46,14 +46,14 @@ public class MailboxSendNode extends AbstractStageNode {
   @ProtoProperties
   private boolean _isSortOnSender;
 
-  public MailboxSendNode(int stageId) {
-    super(stageId);
+  public MailboxSendNode(int planFragmentId) {
+    super(planFragmentId);
   }
 
-  public MailboxSendNode(int stageId, DataSchema dataSchema, int receiverStageId,
+  public MailboxSendNode(int planFragmentId, DataSchema dataSchema, int receiverStageId,
       RelDistribution.Type exchangeType, @Nullable KeySelector<Object[], Object[]> partitionKeySelector,
       @Nullable List<RelFieldCollation> fieldCollations, boolean isSortOnSender) {
-    super(stageId, dataSchema);
+    super(planFragmentId, dataSchema);
     _receiverStageId = receiverStageId;
     _exchangeType = exchangeType;
     _partitionKeySelector = partitionKeySelector;
@@ -111,7 +111,7 @@ public class MailboxSendNode extends AbstractStageNode {
   }
 
   @Override
-  public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+  public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {
     return visitor.visitMailboxSend(this, context);
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/PlanNode.java
similarity index 71%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/PlanNode.java
index ae851f449b..1e6aa32d0e 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/PlanNode.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
 
 import java.io.Serializable;
 import java.util.List;
@@ -24,22 +24,22 @@ import org.apache.pinot.common.utils.DataSchema;
 
 
 /**
- * Stage Node is a serializable version of the {@link org.apache.calcite.rel.RelNode}.
+ * PlanNode is a serializable version of the {@link org.apache.calcite.rel.RelNode}.
  *
- * TODO: stage node currently uses java.io.Serializable as its serialization format.
+ * TODO: PlanNode currently uses java.io.Serializable as its serialization format.
  * We should experiment with other type of serialization format for better performance.
  * Essentially what we need is a way to exclude the planner context from the RelNode but only keeps the
- * constructed relational content because we will no longer revisit the planner after stage is created.
+ * constructed relational content because we will no longer revisit the planner after PlanFragment is created.
  */
-public interface StageNode extends Serializable {
+public interface PlanNode extends Serializable {
 
-  int getStageId();
+  int getPlanFragmentId();
 
-  void setStageId(int stageId);
+  void setPlanFragmentId(int planFragmentId);
 
-  List<StageNode> getInputs();
+  List<PlanNode> getInputs();
 
-  void addInput(StageNode stageNode);
+  void addInput(PlanNode planNode);
 
   DataSchema getDataSchema();
 
@@ -47,5 +47,5 @@ public interface StageNode extends Serializable {
 
   String explain();
 
-  <T, C> T visit(StageNodeVisitor<T, C> visitor, C context);
+  <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context);
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/PlanNodeVisitor.java
similarity index 79%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/PlanNodeVisitor.java
index f72e9540ec..d917cab021 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/PlanNodeVisitor.java
@@ -16,26 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
 
+import org.apache.pinot.query.planner.ExplainPlanPlanVisitor;
 import org.apache.pinot.query.planner.QueryPlan;
 
 
 /**
- * {@code StageNodeVisitor} is a skeleton class that allows for implementations of {@code StageNode}
- * tree traversals using the {@link StageNode#visit(StageNodeVisitor, Object)} method. There is no
+ * {@code PlanNodeVisitor} is a skeleton class that allows for implementations of {@code PlanNode}
+ * tree traversals using the {@link PlanNode#visit(PlanNodeVisitor, Object)} method. There is no
  * enforced traversal order, and should be implemented by subclasses.
  *
  * <p>It is recommended that implementors use private constructors and static methods to access main
- * functionality (see {@link org.apache.pinot.query.planner.ExplainPlanStageVisitor#explain(QueryPlan)}
+ * functionality (see {@link ExplainPlanPlanVisitor#explain(QueryPlan)}
  * as an example of a usage of this pattern.
  *
- * @param <T> the return type for all visits
- * @param <C> a Context that will be passed as the second parameter to {@code StageNode#visit},
+ * @param <T> the return type for all visitsPlanNodeVisitor
+ * @param <C> a Context that will be passed as the second parameter to {@code PlanNode#visit},
  *            implementors can decide how they want to use this context (e.g. whether or not
  *            it can be modified in place or whether it's an immutable context)
  */
-public interface StageNodeVisitor<T, C> {
+public interface PlanNodeVisitor<T, C> {
 
   T visitAggregate(AggregateNode node, C context);
 
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ProjectNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ProjectNode.java
similarity index 86%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ProjectNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ProjectNode.java
index 8371dda609..9333e89893 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ProjectNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ProjectNode.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
 
 import java.util.List;
 import java.util.stream.Collectors;
@@ -26,12 +26,12 @@ import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.serde.ProtoProperties;
 
 
-public class ProjectNode extends AbstractStageNode {
+public class ProjectNode extends AbstractPlanNode {
   @ProtoProperties
   private List<RexExpression> _projects;
 
-  public ProjectNode(int stageId) {
-    super(stageId);
+  public ProjectNode(int planFragmentId) {
+    super(planFragmentId);
   }
   public ProjectNode(int currentStageId, DataSchema dataSchema, List<RexNode> projects) {
     super(currentStageId, dataSchema);
@@ -48,7 +48,7 @@ public class ProjectNode extends AbstractStageNode {
   }
 
   @Override
-  public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+  public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {
     return visitor.visitProject(this, context);
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SetOpNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SetOpNode.java
similarity index 84%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SetOpNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SetOpNode.java
index aa7003449f..1eefcb93de 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SetOpNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SetOpNode.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
 
 import org.apache.calcite.rel.core.SetOp;
 import org.apache.calcite.rel.logical.LogicalIntersect;
@@ -29,7 +29,7 @@ import org.apache.pinot.query.planner.serde.ProtoProperties;
 /**
  * Set operation node is used to represent UNION, INTERSECT, EXCEPT.
  */
-public class SetOpNode extends AbstractStageNode {
+public class SetOpNode extends AbstractPlanNode {
 
   @ProtoProperties
   private SetOpType _setOpType;
@@ -37,12 +37,12 @@ public class SetOpNode extends AbstractStageNode {
   @ProtoProperties
   private boolean _all;
 
-  public SetOpNode(int stageId) {
-    super(stageId);
+  public SetOpNode(int planFragmentId) {
+    super(planFragmentId);
   }
 
-  public SetOpNode(SetOpType setOpType, int stageId, DataSchema dataSchema, boolean all) {
-    super(stageId, dataSchema);
+  public SetOpNode(SetOpType setOpType, int planFragmentId, DataSchema dataSchema, boolean all) {
+    super(planFragmentId, dataSchema);
     _setOpType = setOpType;
     _all = all;
   }
@@ -61,7 +61,7 @@ public class SetOpNode extends AbstractStageNode {
   }
 
   @Override
-  public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+  public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {
     return visitor.visitSetOp(this, context);
   }
 
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SortNode.java
similarity index 84%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SortNode.java
index 4ffe901b20..6bc91430f3 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SortNode.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -26,7 +26,7 @@ import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.serde.ProtoProperties;
 
 
-public class SortNode extends AbstractStageNode {
+public class SortNode extends AbstractPlanNode {
   @ProtoProperties
   private List<RexExpression> _collationKeys;
   @ProtoProperties
@@ -36,12 +36,13 @@ public class SortNode extends AbstractStageNode {
   @ProtoProperties
   private int _offset;
 
-  public SortNode(int stageId) {
-    super(stageId);
+  public SortNode(int planFragmentId) {
+    super(planFragmentId);
   }
 
-  public SortNode(int stageId, List<RelFieldCollation> fieldCollations, int fetch, int offset, DataSchema dataSchema) {
-    super(stageId, dataSchema);
+  public SortNode(int planFragmentId, List<RelFieldCollation> fieldCollations, int fetch, int offset,
+      DataSchema dataSchema) {
+    super(planFragmentId, dataSchema);
     _collationDirections = new ArrayList<>(fieldCollations.size());
     _collationKeys = new ArrayList<>(fieldCollations.size());
     _fetch = fetch;
@@ -76,7 +77,7 @@ public class SortNode extends AbstractStageNode {
   }
 
   @Override
-  public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+  public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {
     return visitor.visitSort(this, context);
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/StageNodeSerDeUtils.java
similarity index 63%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/StageNodeSerDeUtils.java
index f96eadfc06..e803d4beee 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/StageNodeSerDeUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
 
 import org.apache.pinot.common.proto.Plan;
 import org.apache.pinot.common.utils.DataSchema;
@@ -27,28 +27,28 @@ public final class StageNodeSerDeUtils {
     // do not instantiate.
   }
 
-  public static AbstractStageNode deserializeStageNode(Plan.StageNode protoNode) {
-    AbstractStageNode stageNode = newNodeInstance(protoNode.getNodeName(), protoNode.getStageId());
-    stageNode.setDataSchema(extractDataSchema(protoNode));
-    stageNode.fromObjectField(protoNode.getObjectField());
+  public static AbstractPlanNode deserializeStageNode(Plan.StageNode protoNode) {
+    AbstractPlanNode planNode = newNodeInstance(protoNode.getNodeName(), protoNode.getStageId());
+    planNode.setDataSchema(extractDataSchema(protoNode));
+    planNode.fromObjectField(protoNode.getObjectField());
     for (Plan.StageNode protoChild : protoNode.getInputsList()) {
-      stageNode.addInput(deserializeStageNode(protoChild));
+      planNode.addInput(deserializeStageNode(protoChild));
     }
-    return stageNode;
+    return planNode;
   }
 
-  public static Plan.StageNode serializeStageNode(AbstractStageNode stageNode) {
+  public static Plan.StageNode serializeStageNode(AbstractPlanNode planNode) {
     Plan.StageNode.Builder builder = Plan.StageNode.newBuilder()
-        .setStageId(stageNode.getStageId())
-        .setNodeName(stageNode.getClass().getSimpleName())
-        .setObjectField(stageNode.toObjectField());
-    DataSchema dataSchema = stageNode.getDataSchema();
+        .setStageId(planNode.getPlanFragmentId())
+        .setNodeName(planNode.getClass().getSimpleName())
+        .setObjectField(planNode.toObjectField());
+    DataSchema dataSchema = planNode.getDataSchema();
     for (int i = 0; i < dataSchema.getColumnNames().length; i++) {
       builder.addColumnNames(dataSchema.getColumnName(i));
       builder.addColumnDataTypes(dataSchema.getColumnDataType(i).name());
     }
-    for (StageNode childNode : stageNode.getInputs()) {
-      builder.addInputs(serializeStageNode((AbstractStageNode) childNode));
+    for (PlanNode childNode : planNode.getInputs()) {
+      builder.addInputs(serializeStageNode((AbstractPlanNode) childNode));
     }
     return builder.build();
   }
@@ -63,30 +63,30 @@ public final class StageNodeSerDeUtils {
     return new DataSchema(columnNames, columnDataTypes);
   }
 
-  private static AbstractStageNode newNodeInstance(String nodeName, int stageId) {
+  private static AbstractPlanNode newNodeInstance(String nodeName, int planFragmentId) {
     switch (nodeName) {
       case "TableScanNode":
-        return new TableScanNode(stageId);
+        return new TableScanNode(planFragmentId);
       case "JoinNode":
-        return new JoinNode(stageId);
+        return new JoinNode(planFragmentId);
       case "ProjectNode":
-        return new ProjectNode(stageId);
+        return new ProjectNode(planFragmentId);
       case "FilterNode":
-        return new FilterNode(stageId);
+        return new FilterNode(planFragmentId);
       case "AggregateNode":
-        return new AggregateNode(stageId);
+        return new AggregateNode(planFragmentId);
       case "SortNode":
-        return new SortNode(stageId);
+        return new SortNode(planFragmentId);
       case "MailboxSendNode":
-        return new MailboxSendNode(stageId);
+        return new MailboxSendNode(planFragmentId);
       case "MailboxReceiveNode":
-        return new MailboxReceiveNode(stageId);
+        return new MailboxReceiveNode(planFragmentId);
       case "ValueNode":
-        return new ValueNode(stageId);
+        return new ValueNode(planFragmentId);
       case "WindowNode":
-        return new WindowNode(stageId);
+        return new WindowNode(planFragmentId);
       case "SetOpNode":
-        return new SetOpNode(stageId);
+        return new SetOpNode(planFragmentId);
       case "ExchangeNode":
         throw new IllegalArgumentException(
             "ExchangeNode should be already split into MailboxSendNode and MailboxReceiveNode");
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/TableScanNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/TableScanNode.java
similarity index 78%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/TableScanNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/TableScanNode.java
index 7711dd6235..a9c92ca387 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/TableScanNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/TableScanNode.java
@@ -16,25 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
 
 import java.util.List;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.serde.ProtoProperties;
 
 
-public class TableScanNode extends AbstractStageNode {
+public class TableScanNode extends AbstractPlanNode {
   @ProtoProperties
   private String _tableName;
   @ProtoProperties
   private List<String> _tableScanColumns;
 
-  public TableScanNode(int stageId) {
-    super(stageId);
+  public TableScanNode(int planFragmentId) {
+    super(planFragmentId);
   }
 
-  public TableScanNode(int stageId, DataSchema dataSchema, String tableName, List<String> tableScanColumns) {
-    super(stageId, dataSchema);
+  public TableScanNode(int planFragmentId, DataSchema dataSchema, String tableName, List<String> tableScanColumns) {
+    super(planFragmentId, dataSchema);
     _tableName = tableName;
     _tableScanColumns = tableScanColumns;
   }
@@ -53,7 +53,7 @@ public class TableScanNode extends AbstractStageNode {
   }
 
   @Override
-  public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+  public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {
     return visitor.visitTableScan(this, context);
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ValueNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ValueNode.java
similarity index 88%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ValueNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ValueNode.java
index b3ad0d40f6..248af5d8a2 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ValueNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ValueNode.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
 
 import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
@@ -27,12 +27,12 @@ import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.serde.ProtoProperties;
 
 
-public class ValueNode extends AbstractStageNode {
+public class ValueNode extends AbstractPlanNode {
   @ProtoProperties
   private List<List<RexExpression>> _literalRows;
 
-  public ValueNode(int stageId) {
-    super(stageId);
+  public ValueNode(int planFragmentId) {
+    super(planFragmentId);
   }
 
   public ValueNode(int currentStageId, DataSchema dataSchema,
@@ -58,7 +58,7 @@ public class ValueNode extends AbstractStageNode {
   }
 
   @Override
-  public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+  public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {
     return visitor.visitValue(this, context);
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/WindowNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/WindowNode.java
similarity index 92%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/WindowNode.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/WindowNode.java
index 8e122af119..810859ae22 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/WindowNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/WindowNode.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
 
 import com.clearspring.analytics.util.Preconditions;
 import java.util.ArrayList;
@@ -31,7 +31,7 @@ import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.serde.ProtoProperties;
 
 
-public class WindowNode extends AbstractStageNode {
+public class WindowNode extends AbstractPlanNode {
   @ProtoProperties
   private List<RexExpression> _groupSet;
   @ProtoProperties
@@ -61,12 +61,13 @@ public class WindowNode extends AbstractStageNode {
     RANGE
   }
 
-  public WindowNode(int stageId) {
-    super(stageId);
+  public WindowNode(int planFragmentId) {
+    super(planFragmentId);
   }
 
-  public WindowNode(int stageId, List<Window.Group> windowGroups, List<RexLiteral> constants, DataSchema dataSchema) {
-    super(stageId, dataSchema);
+  public WindowNode(int planFragmentId, List<Window.Group> windowGroups, List<RexLiteral> constants,
+      DataSchema dataSchema) {
+    super(planFragmentId, dataSchema);
     // Only a single Window Group should exist per WindowNode.
     Preconditions.checkState(windowGroups.size() == 1,
         String.format("Only a single window group is allowed! Number of window groups: %d", windowGroups.size()));
@@ -110,7 +111,7 @@ public class WindowNode extends AbstractStageNode {
   }
 
   @Override
-  public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+  public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {
     return visitor.visitWindow(this, context);
   }
 
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/ProtoProperties.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/ProtoProperties.java
index 5a10b91941..b3057db532 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/ProtoProperties.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/ProtoProperties.java
@@ -28,11 +28,12 @@ import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
+import org.apache.pinot.query.planner.plannode.PlanNode;
 
 
 /**
  * Annotation {@code ProtoProperties} indicates whether a field defined in a
- * {@link org.apache.pinot.query.planner.stage.StageNode} should be serialized.
+ * {@link PlanNode} should be serialized.
  */
 @Target({ElementType.ANNOTATION_TYPE, ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER})
 @Retention(RetentionPolicy.RUNTIME)
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StageMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/PlanFragmentMetadata.java
similarity index 84%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StageMetadata.java
rename to pinot-query-planner/src/main/java/org/apache/pinot/query/routing/PlanFragmentMetadata.java
index 4fc6c2ef50..f1da239417 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StageMetadata.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/PlanFragmentMetadata.java
@@ -25,13 +25,13 @@ import org.apache.pinot.core.routing.TimeBoundaryInfo;
 
 
 /**
- * {@code StageMetadata} is used to send stage-level info about how to execute a stage physically.
+ * {@code PlanFragmentMetadata} is used to send plan fragment-level info about how to execute a stage physically.
  */
-public class StageMetadata {
+public class PlanFragmentMetadata {
   private final List<WorkerMetadata> _workerMetadataList;
   private final Map<String, String> _customProperties;
 
-  public StageMetadata(List<WorkerMetadata> workerMetadataList, Map<String, String> customProperties) {
+  public PlanFragmentMetadata(List<WorkerMetadata> workerMetadataList, Map<String, String> customProperties) {
     _workerMetadataList = workerMetadataList;
     _customProperties = customProperties;
   }
@@ -71,8 +71,8 @@ public class StageMetadata {
       return this;
     }
 
-    public StageMetadata build() {
-      return new StageMetadata(_workerMetadataList, _customProperties);
+    public PlanFragmentMetadata build() {
+      return new PlanFragmentMetadata(_workerMetadataList, _customProperties);
     }
 
     public void putAllCustomProperties(Map<String, String> customPropertyMap) {
@@ -80,11 +80,11 @@ public class StageMetadata {
     }
   }
 
-  public static String getTableName(StageMetadata metadata) {
+  public static String getTableName(PlanFragmentMetadata metadata) {
     return metadata.getCustomProperties().get(Builder.TABLE_NAME_KEY);
   }
 
-  public static TimeBoundaryInfo getTimeBoundary(StageMetadata metadata) {
+  public static TimeBoundaryInfo getTimeBoundary(PlanFragmentMetadata metadata) {
     String timeColumn = metadata.getCustomProperties().get(Builder.TIME_BOUNDARY_COLUMN_KEY);
     String timeValue = metadata.getCustomProperties().get(Builder.TIME_BOUNDARY_VALUE_KEY);
     return timeColumn != null && timeValue != null ? new TimeBoundaryInfo(timeColumn, timeValue) : null;
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index c1193a9c5b..7ebc6e96d8 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -62,9 +62,9 @@ public class WorkerManager {
     _routingManager = routingManager;
   }
 
-  public void assignWorkerToStage(int stageId, DispatchablePlanMetadata dispatchablePlanMetadata, long requestId,
+  public void assignWorkerToStage(int planFragmentId, DispatchablePlanMetadata dispatchablePlanMetadata, long requestId,
       Map<String, String> options, Set<String> tableNames) {
-    if (PlannerUtils.isRootStage(stageId)) {
+    if (PlannerUtils.isRootPlanFragment(planFragmentId)) {
       // --- ROOT STAGE / BROKER REDUCE STAGE ---
       // ROOT stage doesn't have a QueryServer as it is strictly only reducing results.
       // here we simply assign the worker instance with identical server/mailbox port number.
@@ -177,7 +177,7 @@ public class WorkerManager {
   }
 
   /**
-   * Acquire routing table for items listed in {@link org.apache.pinot.query.planner.stage.TableScanNode}.
+   * Acquire routing table for items listed in {@link org.apache.pinot.query.planner.plannode.TableScanNode}.
    *
    * @param logicalTableName it can either be a hybrid table name or a physical table name with table type.
    * @return keyed-map from table type(s) to routing table(s).
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java
index 5295782229..9d92bfb697 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerMetadata.java
@@ -85,8 +85,8 @@ public class WorkerMetadata {
       return this;
     }
 
-    public Builder addMailBoxInfoMap(Integer stageId, MailboxMetadata mailBoxMetadata) {
-      _mailBoxInfosMap.put(stageId, mailBoxMetadata);
+    public Builder addMailBoxInfoMap(Integer planFragmentId, MailboxMetadata mailBoxMetadata) {
+      _mailBoxInfosMap.put(planFragmentId, mailBoxMetadata);
       return this;
     }
 
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index 6d6c4937a0..41a88ef1b6 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -28,17 +28,17 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import org.apache.calcite.rel.RelDistribution;
-import org.apache.pinot.query.planner.ExplainPlanStageVisitor;
+import org.apache.pinot.query.planner.ExplainPlanPlanVisitor;
 import org.apache.pinot.query.planner.PlannerUtils;
 import org.apache.pinot.query.planner.QueryPlan;
 import org.apache.pinot.query.planner.physical.DispatchablePlanMetadata;
-import org.apache.pinot.query.planner.stage.AbstractStageNode;
-import org.apache.pinot.query.planner.stage.AggregateNode;
-import org.apache.pinot.query.planner.stage.FilterNode;
-import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.planner.stage.ProjectNode;
-import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
 import org.testng.Assert;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -78,10 +78,11 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
     }
   }
 
-  private static void assertGroupBySingletonAfterJoin(QueryPlan queryPlan, boolean shouldRewrite) throws Exception {
+  private static void assertGroupBySingletonAfterJoin(QueryPlan queryPlan, boolean shouldRewrite)
+      throws Exception {
     for (Map.Entry<Integer, DispatchablePlanMetadata> e : queryPlan.getDispatchablePlanMetadataMap().entrySet()) {
-      if (e.getValue().getScannedTables().size() == 0 && !PlannerUtils.isRootStage(e.getKey())) {
-        StageNode node = queryPlan.getQueryStageMap().get(e.getKey());
+      if (e.getValue().getScannedTables().size() == 0 && !PlannerUtils.isRootPlanFragment(e.getKey())) {
+        PlanNode node = queryPlan.getQueryStageMap().get(e.getKey());
         while (node != null) {
           if (node instanceof JoinNode) {
             // JOIN is exchanged with hash distribution (data shuffle)
@@ -120,22 +121,22 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
         // table scan stages; for tableA it should have 2 hosts, for tableB it should have only 1
         Assert.assertEquals(
             e.getValue().getServerInstanceToWorkerIdMap().entrySet().stream()
-                .map(ExplainPlanStageVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
+                .map(ExplainPlanPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
                 .collect(Collectors.toSet()),
             tables.get(0).equals("a") ? ImmutableList.of("localhost@{1,1}|[1]", "localhost@{2,2}|[0]")
                 : ImmutableList.of("localhost@{1,1}|[0]"));
-      } else if (!PlannerUtils.isRootStage(e.getKey())) {
+      } else if (!PlannerUtils.isRootPlanFragment(e.getKey())) {
         // join stage should have both servers used.
         Assert.assertEquals(
             e.getValue().getServerInstanceToWorkerIdMap().entrySet().stream()
-                .map(ExplainPlanStageVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
+                .map(ExplainPlanPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
                 .collect(Collectors.toSet()),
             ImmutableSet.of("localhost@{1,1}|[1]", "localhost@{2,2}|[0]"));
       } else {
         // reduce stage should have the reducer instance.
         Assert.assertEquals(
             e.getValue().getServerInstanceToWorkerIdMap().entrySet().stream()
-                .map(ExplainPlanStageVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
+                .map(ExplainPlanPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
                 .collect(Collectors.toSet()),
             ImmutableSet.of("localhost@{3,3}|[0]"));
       }
@@ -147,12 +148,12 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
     String query = "SELECT a.col1, a.ts, b.col2, b.col3 FROM a JOIN b ON a.col1 = b.col2 "
         + "WHERE a.col3 >= 0 AND a.col2 IN ('b') AND b.col3 < 0";
     QueryPlan queryPlan = _queryEnvironment.planQuery(query);
-    List<StageNode> intermediateStageRoots =
+    List<PlanNode> intermediateStageRoots =
         queryPlan.getDispatchablePlanMetadataMap().entrySet().stream()
             .filter(e -> e.getValue().getScannedTables().size() == 0)
             .map(e -> queryPlan.getQueryStageMap().get(e.getKey())).collect(Collectors.toList());
     // Assert that no project of filter node for any intermediate stage because all should've been pushed down.
-    for (StageNode roots : intermediateStageRoots) {
+    for (PlanNode roots : intermediateStageRoots) {
       assertNodeTypeNotIn(roots, ImmutableList.of(ProjectNode.class, FilterNode.class));
     }
   }
@@ -162,21 +163,24 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
     String query = "SELECT * FROM d_OFFLINE";
     QueryPlan queryPlan = _queryEnvironment.planQuery(query);
     List<DispatchablePlanMetadata> tableScanMetadataList = queryPlan.getDispatchablePlanMetadataMap().values().stream()
-        .filter(stageMetadata -> stageMetadata.getScannedTables().size() != 0).collect(Collectors.toList());
+        .filter(planFragmentMetadata -> planFragmentMetadata.getScannedTables().size() != 0)
+        .collect(Collectors.toList());
     Assert.assertEquals(tableScanMetadataList.size(), 1);
     Assert.assertEquals(tableScanMetadataList.get(0).getServerInstanceToWorkerIdMap().size(), 2);
 
     query = "SELECT * FROM d_REALTIME";
     queryPlan = _queryEnvironment.planQuery(query);
     tableScanMetadataList = queryPlan.getDispatchablePlanMetadataMap().values().stream()
-        .filter(stageMetadata -> stageMetadata.getScannedTables().size() != 0).collect(Collectors.toList());
+        .filter(planFragmentMetadata -> planFragmentMetadata.getScannedTables().size() != 0)
+        .collect(Collectors.toList());
     Assert.assertEquals(tableScanMetadataList.size(), 1);
     Assert.assertEquals(tableScanMetadataList.get(0).getServerInstanceToWorkerIdMap().size(), 1);
 
     query = "SELECT * FROM d";
     queryPlan = _queryEnvironment.planQuery(query);
     tableScanMetadataList = queryPlan.getDispatchablePlanMetadataMap().values().stream()
-        .filter(stageMetadata -> stageMetadata.getScannedTables().size() != 0).collect(Collectors.toList());
+        .filter(planFragmentMetadata -> planFragmentMetadata.getScannedTables().size() != 0)
+        .collect(Collectors.toList());
     Assert.assertEquals(tableScanMetadataList.size(), 1);
     Assert.assertEquals(tableScanMetadataList.get(0).getServerInstanceToWorkerIdMap().size(), 2);
   }
@@ -245,13 +249,13 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
       if (tables.size() != 0) {
         // table scan stages; for tableB it should have only 1
         Assert.assertEquals(e.getValue().getServerInstanceToWorkerIdMap().entrySet().stream()
-                .map(ExplainPlanStageVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
+                .map(ExplainPlanPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
                 .collect(Collectors.toSet()),
             ImmutableList.of("localhost@{1,1}|[0]"));
-      } else if (!PlannerUtils.isRootStage(e.getKey())) {
+      } else if (!PlannerUtils.isRootPlanFragment(e.getKey())) {
         // join stage should have both servers used.
         Assert.assertEquals(e.getValue().getServerInstanceToWorkerIdMap().entrySet().stream()
-                .map(ExplainPlanStageVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
+                .map(ExplainPlanPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
                 .collect(Collectors.toSet()),
             ImmutableList.of("localhost@{1,1}|[1]", "localhost@{2,2}|[0]"));
       }
@@ -262,16 +266,16 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
   // Test Utils.
   // --------------------------------------------------------------------------
 
-  private static void assertNodeTypeNotIn(StageNode node, List<Class<? extends AbstractStageNode>> bannedNodeType) {
+  private static void assertNodeTypeNotIn(PlanNode node, List<Class<? extends AbstractPlanNode>> bannedNodeType) {
     Assert.assertFalse(isOneOf(bannedNodeType, node));
-    for (StageNode child : node.getInputs()) {
+    for (PlanNode child : node.getInputs()) {
       assertNodeTypeNotIn(child, bannedNodeType);
     }
   }
 
-  private static boolean isOneOf(List<Class<? extends AbstractStageNode>> allowedNodeTypes,
-      StageNode node) {
-    for (Class<? extends AbstractStageNode> allowedNodeType : allowedNodeTypes) {
+  private static boolean isOneOf(List<Class<? extends AbstractPlanNode>> allowedNodeTypes,
+      PlanNode node) {
+    for (Class<? extends AbstractPlanNode> allowedNodeType : allowedNodeTypes) {
       if (node.getClass() == allowedNodeType) {
         return true;
       }
@@ -281,7 +285,7 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
 
   @DataProvider(name = "testQueryExceptionDataProvider")
   private Object[][] provideQueriesWithException() {
-    return new Object[][] {
+    return new Object[][]{
         // wrong table is being used after JOIN
         new Object[]{"SELECT b.col1 - a.col3 FROM a JOIN c ON a.col1 = c.col3", "Table 'b' not found"},
         // non-agg column not being grouped
@@ -291,8 +295,10 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
         // AT TIME ZONE should fail
         new Object[]{"SELECT a.col1 AT TIME ZONE 'PST' FROM a", "No match found for function signature AT_TIME_ZONE"},
         // CASE WHEN with non-consolidated result type at compile time.
-        new Object[]{"SELECT SUM(CASE WHEN col3 > 10 THEN 1 WHEN col3 > 20 THEN 2 WHEN col3 > 30 THEN 3 "
-            + "WHEN col3 > 40 THEN 4 WHEN col3 > 50 THEN '5' ELSE 0 END) FROM a", "while converting CASE WHEN"},
+        new Object[]{
+            "SELECT SUM(CASE WHEN col3 > 10 THEN 1 WHEN col3 > 20 THEN 2 WHEN col3 > 30 THEN 3 "
+                + "WHEN col3 > 40 THEN 4 WHEN col3 > 50 THEN '5' ELSE 0 END) FROM a", "while converting CASE WHEN"
+        },
     };
   }
 
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/stage/SerDeUtilsTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/plannode/SerDeUtilsTest.java
similarity index 82%
rename from pinot-query-planner/src/test/java/org/apache/pinot/query/planner/stage/SerDeUtilsTest.java
rename to pinot-query-planner/src/test/java/org/apache/pinot/query/planner/plannode/SerDeUtilsTest.java
index 7bd9fcb6ad..111c83bd66 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/stage/SerDeUtilsTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/plannode/SerDeUtilsTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.planner.stage;
+package org.apache.pinot.query.planner.plannode;
 
 import java.lang.reflect.Field;
 import java.util.List;
@@ -35,13 +35,13 @@ public class SerDeUtilsTest extends QueryEnvironmentTestBase {
   public void testQueryStagePlanSerDe(String query)
       throws Exception {
     QueryPlan queryPlan = _queryEnvironment.planQuery(query);
-    for (StageNode stageNode : queryPlan.getQueryStageMap().values()) {
-      Plan.StageNode serializedStageNode = StageNodeSerDeUtils.serializeStageNode((AbstractStageNode) stageNode);
-      StageNode deserializedStageNode = StageNodeSerDeUtils.deserializeStageNode(serializedStageNode);
-      Assert.assertTrue(isObjectEqual(stageNode, deserializedStageNode));
-      Assert.assertEquals(deserializedStageNode.getStageId(), stageNode.getStageId());
-      Assert.assertEquals(deserializedStageNode.getDataSchema(), stageNode.getDataSchema());
-      Assert.assertEquals(deserializedStageNode.getInputs().size(), stageNode.getInputs().size());
+    for (PlanNode planNode : queryPlan.getQueryStageMap().values()) {
+      Plan.StageNode serializedStageNode = StageNodeSerDeUtils.serializeStageNode((AbstractPlanNode) planNode);
+      PlanNode deserializedPlanNode = StageNodeSerDeUtils.deserializeStageNode(serializedStageNode);
+      Assert.assertTrue(isObjectEqual(planNode, deserializedPlanNode));
+      Assert.assertEquals(deserializedPlanNode.getPlanFragmentId(), planNode.getPlanFragmentId());
+      Assert.assertEquals(deserializedPlanNode.getDataSchema(), planNode.getDataSchema());
+      Assert.assertEquals(deserializedPlanNode.getInputs().size(), planNode.getInputs().size());
     }
   }
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index b1e6bc8c58..6f1e0618c7 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -40,9 +40,9 @@ import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
 import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -170,9 +170,9 @@ public class QueryRunner {
     if (isLeafStage(distributedStagePlan)) {
       runLeafStage(distributedStagePlan, requestMetadataMap, timeoutMs, deadlineMs, requestId);
     } else {
-      StageNode stageRoot = distributedStagePlan.getStageRoot();
+      PlanNode stageRoot = distributedStagePlan.getStageRoot();
       OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot,
-          new PlanRequestContext(_mailboxService, requestId, stageRoot.getStageId(), timeoutMs, deadlineMs,
+          new PlanRequestContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs,
               distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), isTraceEnabled));
       _scheduler.register(rootOperator);
     }
@@ -222,7 +222,7 @@ public class QueryRunner {
               + (System.currentTimeMillis() - leafStageStartMillis) + " ms");
       MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot();
       OpChainExecutionContext opChainExecutionContext =
-          new OpChainExecutionContext(_mailboxService, requestId, sendNode.getStageId(),
+          new OpChainExecutionContext(_mailboxService, requestId, sendNode.getPlanFragmentId(),
               distributedStagePlan.getServer(), timeoutMs, deadlineMs, distributedStagePlan.getStageMetadata(),
               isTraceEnabled);
       MultiStageOperator leafStageOperator =
@@ -247,9 +247,9 @@ public class QueryRunner {
   private static List<ServerPlanRequestContext> constructServerQueryRequests(DistributedStagePlan distributedStagePlan,
       Map<String, String> requestMetadataMap, ZkHelixPropertyStore<ZNRecord> helixPropertyStore,
       MailboxService mailboxService, long deadlineMs) {
-    StageMetadata stageMetadata = distributedStagePlan.getStageMetadata();
+    PlanFragmentMetadata planFragmentMetadata = distributedStagePlan.getStageMetadata();
     WorkerMetadata workerMetadata = distributedStagePlan.getCurrentWorkerMetadata();
-    String rawTableName = StageMetadata.getTableName(stageMetadata);
+    String rawTableName = PlanFragmentMetadata.getTableName(planFragmentMetadata);
     Map<String, List<String>> tableToSegmentListMap = WorkerMetadata.getTableSegmentsMap(workerMetadata);
     List<ServerPlanRequestContext> requests = new ArrayList<>();
     for (Map.Entry<String, List<String>> tableEntry : tableToSegmentListMap.entrySet()) {
@@ -263,7 +263,7 @@ public class QueryRunner {
         Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
             TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
         requests.add(ServerRequestPlanVisitor.build(mailboxService, distributedStagePlan, requestMetadataMap,
-            tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.OFFLINE,
+            tableConfig, schema, PlanFragmentMetadata.getTimeBoundary(planFragmentMetadata), TableType.OFFLINE,
             tableEntry.getValue(), deadlineMs));
       } else if (TableType.REALTIME.name().equals(tableType)) {
         TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore,
@@ -271,7 +271,7 @@ public class QueryRunner {
         Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
             TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
         requests.add(ServerRequestPlanVisitor.build(mailboxService, distributedStagePlan, requestMetadataMap,
-            tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.REALTIME,
+            tableConfig, schema, PlanFragmentMetadata.getTimeBoundary(planFragmentMetadata), TableType.REALTIME,
             tableEntry.getValue(), deadlineMs));
       } else {
         throw new IllegalArgumentException("Unsupported table type key: " + tableType);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 4d60969b48..61798f3dc7 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -34,7 +34,7 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
-import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
index 845bec418a..6b29cdc82a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
@@ -38,7 +38,7 @@ import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.query.planner.logical.RexExpression;
-import org.apache.pinot.query.planner.stage.WindowNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.utils.AggregationUtils;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
index 341c0b5969..62cb7c3d45 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
@@ -25,7 +25,7 @@ import java.util.Map;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.datatable.DataTable;
-import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.operator.OperatorStats;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -67,10 +67,10 @@ public class OperatorUtils {
     return functionName;
   }
 
-  public static void recordTableName(OperatorStats operatorStats, StageMetadata stageMetadata) {
-    if (StageMetadata.getTableName(stageMetadata) != null) {
+  public static void recordTableName(OperatorStats operatorStats, PlanFragmentMetadata planFragmentMetadata) {
+    if (PlanFragmentMetadata.getTableName(planFragmentMetadata) != null) {
       operatorStats.recordSingleStat(DataTable.MetadataKey.TABLE.getName(),
-          StageMetadata.getTableName(stageMetadata));
+          PlanFragmentMetadata.getTableName(planFragmentMetadata));
     }
   }
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
index b00aefab9d..da987d7b9b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pinot.query.runtime.plan;
 
-import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
 
@@ -33,19 +33,19 @@ import org.apache.pinot.query.routing.WorkerMetadata;
 public class DistributedStagePlan {
   private int _stageId;
   private VirtualServerAddress _server;
-  private StageNode _stageRoot;
-  private StageMetadata _stageMetadata;
+  private PlanNode _stageRoot;
+  private PlanFragmentMetadata _planFragmentMetadata;
 
   public DistributedStagePlan(int stageId) {
     _stageId = stageId;
   }
 
-  public DistributedStagePlan(int stageId, VirtualServerAddress server, StageNode stageRoot,
-      StageMetadata stageMetadata) {
+  public DistributedStagePlan(int stageId, VirtualServerAddress server, PlanNode stageRoot,
+      PlanFragmentMetadata planFragmentMetadata) {
     _stageId = stageId;
     _server = server;
     _stageRoot = stageRoot;
-    _stageMetadata = stageMetadata;
+    _planFragmentMetadata = planFragmentMetadata;
   }
 
   public int getStageId() {
@@ -56,27 +56,27 @@ public class DistributedStagePlan {
     return _server;
   }
 
-  public StageNode getStageRoot() {
+  public PlanNode getStageRoot() {
     return _stageRoot;
   }
 
-  public StageMetadata getStageMetadata() {
-    return _stageMetadata;
+  public PlanFragmentMetadata getStageMetadata() {
+    return _planFragmentMetadata;
   }
 
   public void setServer(VirtualServerAddress serverAddress) {
     _server = serverAddress;
   }
 
-  public void setStageRoot(StageNode stageRoot) {
+  public void setStageRoot(PlanNode stageRoot) {
     _stageRoot = stageRoot;
   }
 
-  public void setStageMetadata(StageMetadata stageMetadata) {
-    _stageMetadata = stageMetadata;
+  public void setStageMetadata(PlanFragmentMetadata planFragmentMetadata) {
+    _planFragmentMetadata = planFragmentMetadata;
   }
 
   public WorkerMetadata getCurrentWorkerMetadata() {
-    return _stageMetadata.getWorkerMetadataList().get(_server.workerId());
+    return _planFragmentMetadata.getWorkerMetadataList().get(_server.workerId());
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index 11e8107996..0fd764c00e 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.query.runtime.plan;
 
 import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.operator.OpChainId;
 import org.apache.pinot.query.runtime.operator.OpChainStats;
@@ -37,13 +37,13 @@ public class OpChainExecutionContext {
   private final VirtualServerAddress _server;
   private final long _timeoutMs;
   private final long _deadlineMs;
-  private final StageMetadata _stageMetadata;
+  private final PlanFragmentMetadata _planFragmentMetadata;
   private final OpChainId _id;
   private final OpChainStats _stats;
   private final boolean _traceEnabled;
 
   public OpChainExecutionContext(MailboxService mailboxService, long requestId, int stageId,
-      VirtualServerAddress server, long timeoutMs, long deadlineMs, StageMetadata stageMetadata,
+      VirtualServerAddress server, long timeoutMs, long deadlineMs, PlanFragmentMetadata planFragmentMetadata,
       boolean traceEnabled) {
     _mailboxService = mailboxService;
     _requestId = requestId;
@@ -51,7 +51,7 @@ public class OpChainExecutionContext {
     _server = server;
     _timeoutMs = timeoutMs;
     _deadlineMs = deadlineMs;
-    _stageMetadata = stageMetadata;
+    _planFragmentMetadata = planFragmentMetadata;
     _id = new OpChainId(requestId, server.workerId(), stageId);
     _stats = new OpChainStats(_id.toString());
     _traceEnabled = traceEnabled;
@@ -87,8 +87,8 @@ public class OpChainExecutionContext {
     return _deadlineMs;
   }
 
-  public StageMetadata getStageMetadata() {
-    return _stageMetadata;
+  public PlanFragmentMetadata getStageMetadata() {
+    return _planFragmentMetadata;
   }
 
   public OpChainId getId() {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 27c4302197..f8dfe58e26 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -20,20 +20,20 @@ package org.apache.pinot.query.runtime.plan;
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.pinot.query.planner.stage.AggregateNode;
-import org.apache.pinot.query.planner.stage.ExchangeNode;
-import org.apache.pinot.query.planner.stage.FilterNode;
-import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.ProjectNode;
-import org.apache.pinot.query.planner.stage.SetOpNode;
-import org.apache.pinot.query.planner.stage.SortNode;
-import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.StageNodeVisitor;
-import org.apache.pinot.query.planner.stage.TableScanNode;
-import org.apache.pinot.query.planner.stage.ValueNode;
-import org.apache.pinot.query.planner.stage.WindowNode;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.SortNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
 import org.apache.pinot.query.runtime.operator.AggregateOperator;
 import org.apache.pinot.query.runtime.operator.FilterOperator;
 import org.apache.pinot.query.runtime.operator.HashJoinOperator;
@@ -52,17 +52,17 @@ import org.apache.pinot.query.runtime.operator.WindowAggregateOperator;
 
 
 /**
- * This visitor constructs a physical plan of operators from a {@link StageNode} tree. Note that
+ * This visitor constructs a physical plan of operators from a {@link PlanNode} tree. Note that
  * this works only for the intermediate stage nodes, leaf stage nodes are expected to compile into
  * v1 operators at this point in time.
  *
- * <p>This class should be used statically via {@link #build(StageNode, PlanRequestContext)}
+ * <p>This class should be used statically via {@link #build(PlanNode, PlanRequestContext)}
  */
-public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator, PlanRequestContext> {
+public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator, PlanRequestContext> {
 
   private static final PhysicalPlanVisitor INSTANCE = new PhysicalPlanVisitor();
 
-  public static OpChain build(StageNode node, PlanRequestContext context) {
+  public static OpChain build(PlanNode node, PlanRequestContext context) {
     MultiStageOperator root = node.visit(INSTANCE, context);
     return new OpChain(context.getOpChainExecutionContext(), root, context.getReceivingMailboxIds());
   }
@@ -112,7 +112,7 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
   @Override
   public MultiStageOperator visitSetOp(SetOpNode setOpNode, PlanRequestContext context) {
     List<MultiStageOperator> inputs = new ArrayList<>();
-    for (StageNode input : setOpNode.getInputs()) {
+    for (PlanNode input : setOpNode.getInputs()) {
       MultiStageOperator visited = input.visit(this, context);
       inputs.add(visited);
     }
@@ -145,8 +145,8 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
 
   @Override
   public MultiStageOperator visitJoin(JoinNode node, PlanRequestContext context) {
-    StageNode left = node.getInputs().get(0);
-    StageNode right = node.getInputs().get(1);
+    PlanNode left = node.getInputs().get(0);
+    PlanNode right = node.getInputs().get(1);
 
     MultiStageOperator leftOperator = left.visit(this, context);
     MultiStageOperator rightOperator = right.visit(this, context);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
index d3d890d9d5..4383fa65c1 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
@@ -21,7 +21,7 @@ package org.apache.pinot.query.runtime.plan;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 
 
@@ -33,20 +33,20 @@ public class PlanRequestContext {
   private final long _timeoutMs;
   private final long _deadlineMs;
   protected final VirtualServerAddress _server;
-  protected final StageMetadata _stageMetadata;
+  protected final PlanFragmentMetadata _planFragmentMetadata;
   protected final List<String> _receivingMailboxIds = new ArrayList<>();
   private final OpChainExecutionContext _opChainExecutionContext;
   private final boolean _traceEnabled;
 
   public PlanRequestContext(MailboxService mailboxService, long requestId, int stageId, long timeoutMs, long deadlineMs,
-      VirtualServerAddress server, StageMetadata stageMetadata, boolean traceEnabled) {
+      VirtualServerAddress server, PlanFragmentMetadata planFragmentMetadata, boolean traceEnabled) {
     _mailboxService = mailboxService;
     _requestId = requestId;
     _stageId = stageId;
     _timeoutMs = timeoutMs;
     _deadlineMs = deadlineMs;
     _server = server;
-    _stageMetadata = stageMetadata;
+    _planFragmentMetadata = planFragmentMetadata;
     _traceEnabled = traceEnabled;
     _opChainExecutionContext = new OpChainExecutionContext(this);
   }
@@ -71,8 +71,8 @@ public class PlanRequestContext {
     return _server;
   }
 
-  public StageMetadata getStageMetadata() {
-    return _stageMetadata;
+  public PlanFragmentMetadata getStageMetadata() {
+    return _planFragmentMetadata;
   }
 
   public MailboxService getMailboxService() {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
index 512e8717e5..d712e524ab 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
@@ -37,20 +37,20 @@ import org.apache.pinot.core.query.optimizer.QueryOptimizer;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.parser.CalciteRexExpressionParser;
-import org.apache.pinot.query.planner.stage.AggregateNode;
-import org.apache.pinot.query.planner.stage.ExchangeNode;
-import org.apache.pinot.query.planner.stage.FilterNode;
-import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.ProjectNode;
-import org.apache.pinot.query.planner.stage.SetOpNode;
-import org.apache.pinot.query.planner.stage.SortNode;
-import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.StageNodeVisitor;
-import org.apache.pinot.query.planner.stage.TableScanNode;
-import org.apache.pinot.query.planner.stage.ValueNode;
-import org.apache.pinot.query.planner.stage.WindowNode;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.SortNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
 import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
 import org.apache.pinot.query.service.QueryConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -76,7 +76,7 @@ import org.slf4j.LoggerFactory;
  * As of now, the reason why we use the plan visitor for server request is for additional support such as dynamic
  * filtering and other auxiliary functionalities.
  */
-public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, ServerPlanRequestContext> {
+public class ServerRequestPlanVisitor implements PlanNodeVisitor<Void, ServerPlanRequestContext> {
   private static final int DEFAULT_LEAF_NODE_LIMIT = 10_000_000;
   private static final Logger LOGGER = LoggerFactory.getLogger(ServerRequestPlanVisitor.class);
   private static final List<String> QUERY_REWRITERS_CLASS_NAMES =
@@ -162,7 +162,7 @@ public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, ServerPl
     pinotQuery.setQueryOptions(queryOptions);
   }
 
-  private static void walkStageNode(StageNode node, ServerPlanRequestContext context) {
+  private static void walkStageNode(PlanNode node, ServerPlanRequestContext context) {
     node.visit(INSTANCE, context);
   }
 
@@ -264,8 +264,8 @@ public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, ServerPl
     return null;
   }
 
-  private void visitChildren(StageNode node, ServerPlanRequestContext context) {
-    for (StageNode child : node.getInputs()) {
+  private void visitChildren(PlanNode node, ServerPlanRequestContext context) {
+    for (PlanNode child : node.getInputs()) {
       child.visit(this, context);
     }
   }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
index ee2037ec2c..1844df632c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
@@ -25,10 +25,10 @@ import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import org.apache.pinot.common.proto.Worker;
-import org.apache.pinot.query.planner.stage.AbstractStageNode;
-import org.apache.pinot.query.planner.stage.StageNodeSerDeUtils;
+import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
+import org.apache.pinot.query.planner.plannode.StageNodeSerDeUtils;
 import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
@@ -55,7 +55,7 @@ public class QueryPlanSerDeUtils {
     return Worker.StagePlan.newBuilder()
         .setStageId(distributedStagePlan.getStageId())
         .setVirtualAddress(addressToProto(distributedStagePlan.getServer()))
-        .setStageRoot(StageNodeSerDeUtils.serializeStageNode((AbstractStageNode) distributedStagePlan.getStageRoot()))
+        .setStageRoot(StageNodeSerDeUtils.serializeStageNode((AbstractPlanNode) distributedStagePlan.getStageRoot()))
         .setStageMetadata(toProtoStageMetadata(distributedStagePlan.getStageMetadata())).build();
   }
 
@@ -79,8 +79,8 @@ public class QueryPlanSerDeUtils {
     return String.format("%s@%s:%s", serverAddress.workerId(), serverAddress.hostname(), serverAddress.port());
   }
 
-  private static StageMetadata fromProtoStageMetadata(Worker.StageMetadata protoStageMetadata) {
-    StageMetadata.Builder builder = new StageMetadata.Builder();
+  private static PlanFragmentMetadata fromProtoStageMetadata(Worker.StageMetadata protoStageMetadata) {
+    PlanFragmentMetadata.Builder builder = new PlanFragmentMetadata.Builder();
     List<WorkerMetadata> workerMetadataList = new ArrayList<>();
     for (Worker.WorkerMetadata protoWorkerMetadata : protoStageMetadata.getWorkerMetadataList()) {
       workerMetadataList.add(fromProtoWorkerMetadata(protoWorkerMetadata));
@@ -119,12 +119,12 @@ public class QueryPlanSerDeUtils {
     return mailboxMetadata;
   }
 
-  private static Worker.StageMetadata toProtoStageMetadata(StageMetadata stageMetadata) {
+  private static Worker.StageMetadata toProtoStageMetadata(PlanFragmentMetadata planFragmentMetadata) {
     Worker.StageMetadata.Builder builder = Worker.StageMetadata.newBuilder();
-    for (WorkerMetadata workerMetadata : stageMetadata.getWorkerMetadataList()) {
+    for (WorkerMetadata workerMetadata : planFragmentMetadata.getWorkerMetadataList()) {
       builder.addWorkerMetadata(toProtoWorkerMetadata(workerMetadata));
     }
-    builder.putAllCustomProperty(stageMetadata.getCustomProperties());
+    builder.putAllCustomProperty(planFragmentMetadata.getCustomProperties());
     return builder.build();
   }
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
index 1c0f7168ff..b1ba1624b5 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
@@ -22,7 +22,7 @@ import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.plan.PlanRequestContext;
 import org.apache.pinot.spi.config.table.TableType;
@@ -40,9 +40,9 @@ public class ServerPlanRequestContext extends PlanRequestContext {
   protected InstanceRequest _instanceRequest;
 
   public ServerPlanRequestContext(MailboxService mailboxService, long requestId, int stageId, long timeoutMs,
-      long deadlineMs, VirtualServerAddress server, StageMetadata stageMetadata, PinotQuery pinotQuery,
+      long deadlineMs, VirtualServerAddress server, PlanFragmentMetadata planFragmentMetadata, PinotQuery pinotQuery,
       TableType tableType, TimeBoundaryInfo timeBoundaryInfo, boolean traceEnabled) {
-    super(mailboxService, requestId, stageId, timeoutMs, deadlineMs, server, stageMetadata, traceEnabled);
+    super(mailboxService, requestId, stageId, timeoutMs, deadlineMs, server, planFragmentMetadata, traceEnabled);
     _pinotQuery = pinotQuery;
     _tableType = tableType;
     _timeBoundaryInfo = timeBoundaryInfo;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 3524c95693..5454031a9d 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -45,12 +45,12 @@ import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.query.reduce.ExecutionStatsAggregator;
 import org.apache.pinot.core.util.trace.TracedThreadFactory;
 import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.planner.ExplainPlanStageVisitor;
+import org.apache.pinot.query.planner.ExplainPlanPlanVisitor;
 import org.apache.pinot.query.planner.QueryPlan;
 import org.apache.pinot.query.planner.physical.DispatchablePlanMetadata;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
 import org.apache.pinot.query.routing.QueryServerInstance;
-import org.apache.pinot.query.routing.StageMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -96,7 +96,7 @@ public class QueryDispatcher {
           traceEnabled);
     } catch (Exception e) {
       cancel(requestId, queryPlan);
-      throw new RuntimeException("Error executing query: " + ExplainPlanStageVisitor.explain(queryPlan), e);
+      throw new RuntimeException("Error executing query: " + ExplainPlanPlanVisitor.explain(queryPlan), e);
     }
   }
 
@@ -231,8 +231,8 @@ public class QueryDispatcher {
             rootStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>());
             if (stageStatsAggregator != null) {
               if (queryPlan != null) {
-                StageMetadata stageMetadata = queryPlan.getStageMetadata(operatorStats.getStageId());
-                OperatorUtils.recordTableName(operatorStats, stageMetadata);
+                PlanFragmentMetadata planFragmentMetadata = queryPlan.getStageMetadata(operatorStats.getStageId());
+                OperatorUtils.recordTableName(operatorStats, planFragmentMetadata);
               }
               stageStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>());
             }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 0d32e8f7bd..37465dc211 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -31,7 +31,7 @@ import org.apache.pinot.query.QueryEnvironmentTestBase;
 import org.apache.pinot.query.QueryServerEnclosure;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.QueryPlan;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
 import org.apache.pinot.query.routing.QueryServerInstance;
 import org.apache.pinot.query.service.QueryConfig;
 import org.apache.pinot.query.service.dispatch.QueryDispatcher;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index 22d26fe4fb..55e43dae11 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -48,7 +48,7 @@ import org.apache.pinot.query.QueryServerEnclosure;
 import org.apache.pinot.query.QueryTestSet;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.QueryPlan;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
 import org.apache.pinot.query.routing.QueryServerInstance;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index c9a48f9ba9..cf691b2f09 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -29,7 +29,7 @@ import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
-import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index 8c602da156..1764165b1f 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -31,7 +31,7 @@ import org.apache.pinot.query.mailbox.MailboxIdUtils;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.ReceivingMailbox;
 import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -64,8 +64,8 @@ public class MailboxReceiveOperatorTest {
   private ReceivingMailbox _mailbox1;
   @Mock
   private ReceivingMailbox _mailbox2;
-  private StageMetadata _stageMetadataBoth;
-  private StageMetadata _stageMetadata1;
+  private PlanFragmentMetadata _planFragmentMetadataBoth;
+  private PlanFragmentMetadata _planFragmentMetadata1;
 
   @BeforeMethod
   public void setUp() {
@@ -74,7 +74,7 @@ public class MailboxReceiveOperatorTest {
     when(_mailboxService.getPort()).thenReturn(123);
     VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0);
     VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1);
-    _stageMetadataBoth = new StageMetadata.Builder()
+    _planFragmentMetadataBoth = new PlanFragmentMetadata.Builder()
         .setWorkerMetadataList(Stream.of(server1, server2).map(
                 s -> new WorkerMetadata.Builder()
                     .setVirtualServerAddress(s)
@@ -94,7 +94,7 @@ public class MailboxReceiveOperatorTest {
             .collect(Collectors.toList()))
         .build();
     // sending stage is 0, receiving stage is 1
-    _stageMetadata1 = new StageMetadata.Builder()
+    _planFragmentMetadata1 = new PlanFragmentMetadata.Builder()
         .setWorkerMetadataList(Stream.of(server1).map(
             s -> new WorkerMetadata.Builder()
                 .setVirtualServerAddress(s)
@@ -120,13 +120,13 @@ public class MailboxReceiveOperatorTest {
   public void shouldThrowSingletonNoMatchMailboxServer() {
     VirtualServerAddress server1 = new VirtualServerAddress("localhost", 456, 0);
     VirtualServerAddress server2 = new VirtualServerAddress("localhost", 789, 1);
-    StageMetadata stageMetadata = new StageMetadata.Builder()
+    PlanFragmentMetadata planFragmentMetadata = new PlanFragmentMetadata.Builder()
         .setWorkerMetadataList(Stream.of(server1, server2).map(
             s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build()).collect(Collectors.toList()))
         .build();
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            stageMetadata, false);
+            planFragmentMetadata, false);
     //noinspection resource
     new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1);
   }
@@ -135,7 +135,7 @@ public class MailboxReceiveOperatorTest {
   public void shouldThrowReceiveSingletonFromMultiMatchMailboxServer() {
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadataBoth, false);
+            _planFragmentMetadataBoth, false);
     //noinspection resource
     new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1);
   }
@@ -157,7 +157,7 @@ public class MailboxReceiveOperatorTest {
     // Short timeoutMs should result in timeout
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10L, System.currentTimeMillis() + 10L,
-            _stageMetadata1, false);
+            _planFragmentMetadata1, false);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
       Thread.sleep(100L);
       TransferableBlock mailbox = receiveOp.nextBlock();
@@ -168,7 +168,7 @@ public class MailboxReceiveOperatorTest {
 
     // Longer timeout or default timeout (10s) doesn't result in timeout
     context = new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10_000L,
-        System.currentTimeMillis() + 10_000L, _stageMetadata1, false);
+        System.currentTimeMillis() + 10_000L, _planFragmentMetadata1, false);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
       Thread.sleep(100L);
       TransferableBlock mailbox = receiveOp.nextBlock();
@@ -182,7 +182,7 @@ public class MailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadata1, false);
+            _planFragmentMetadata1, false);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
       assertTrue(receiveOp.nextBlock().isNoOpBlock());
     }
@@ -195,7 +195,7 @@ public class MailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadata1, false);
+            _planFragmentMetadata1, false);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
       assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
     }
@@ -210,7 +210,7 @@ public class MailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadata1, false);
+            _planFragmentMetadata1, false);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
       List<Object[]> actualRows = receiveOp.nextBlock().getContainer();
       assertEquals(actualRows.size(), 1);
@@ -228,7 +228,7 @@ public class MailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadata1, false);
+            _planFragmentMetadata1, false);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
       TransferableBlock block = receiveOp.nextBlock();
       assertTrue(block.isErrorBlock());
@@ -247,7 +247,7 @@ public class MailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadataBoth, false);
+            _planFragmentMetadataBoth, false);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
         1)) {
       List<Object[]> actualRows = receiveOp.nextBlock().getContainer();
@@ -271,7 +271,7 @@ public class MailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadataBoth, false);
+            _planFragmentMetadataBoth, false);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
         1)) {
       // Receive first block from server1
@@ -297,7 +297,7 @@ public class MailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadataBoth, false);
+            _planFragmentMetadataBoth, false);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
         1)) {
       TransferableBlock block = receiveOp.nextBlock();
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 5080ad8454..ec7c00d513 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -23,7 +23,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -165,12 +165,12 @@ public class MailboxSendOperatorTest {
   }
 
   private MailboxSendOperator getMailboxSendOperator() {
-    StageMetadata stageMetadata = new StageMetadata.Builder()
+    PlanFragmentMetadata planFragmentMetadata = new PlanFragmentMetadata.Builder()
         .setWorkerMetadataList(Collections.singletonList(
             new WorkerMetadata.Builder().setVirtualServerAddress(_server).build())).build();
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 0, SENDER_STAGE_ID, _server, Long.MAX_VALUE, Long.MAX_VALUE,
-            stageMetadata, false);
+            planFragmentMetadata, false);
     return new MailboxSendOperator(context, _sourceOperator, _exchange, null, null, false);
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
index 6d03bb6354..3ec9fc71de 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
@@ -41,7 +41,7 @@ import org.apache.pinot.query.mailbox.ReceivingMailbox;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.physical.MailboxIdUtils;
 import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -82,13 +82,13 @@ public class OpChainTest {
   private BlockExchange _exchange;
 
   private VirtualServerAddress _serverAddress;
-  private StageMetadata _receivingStageMetadata;
+  private PlanFragmentMetadata _receivingPlanFragmentMetadata;
 
   @BeforeMethod
   public void setUp() {
     _mocks = MockitoAnnotations.openMocks(this);
     _serverAddress = new VirtualServerAddress("localhost", 123, 0);
-    _receivingStageMetadata = new StageMetadata.Builder()
+    _receivingPlanFragmentMetadata = new PlanFragmentMetadata.Builder()
         .setWorkerMetadataList(Stream.of(_serverAddress).map(
             s -> new WorkerMetadata.Builder()
                 .setVirtualServerAddress(s)
@@ -197,7 +197,7 @@ public class OpChainTest {
     int senderStageId = 1;
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress, 1000,
-            System.currentTimeMillis() + 1000, _receivingStageMetadata, true);
+            System.currentTimeMillis() + 1000, _receivingPlanFragmentMetadata, true);
 
     Stack<MultiStageOperator> operators =
         getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime);
@@ -211,7 +211,7 @@ public class OpChainTest {
 
     OpChainExecutionContext secondStageContext =
         new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress, 1000,
-            System.currentTimeMillis() + 1000, _receivingStageMetadata, true);
+            System.currentTimeMillis() + 1000, _receivingPlanFragmentMetadata, true);
 
     MailboxReceiveOperator secondStageReceiveOp =
         new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId + 1);
@@ -237,7 +237,7 @@ public class OpChainTest {
     int senderStageId = 1;
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress, 1000,
-            System.currentTimeMillis() + 1000, _receivingStageMetadata, false);
+            System.currentTimeMillis() + 1000, _receivingPlanFragmentMetadata, false);
 
     Stack<MultiStageOperator> operators =
         getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime);
@@ -249,7 +249,7 @@ public class OpChainTest {
 
     OpChainExecutionContext secondStageContext =
         new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress, 1000,
-            System.currentTimeMillis() + 1000, _receivingStageMetadata, false);
+            System.currentTimeMillis() + 1000, _receivingPlanFragmentMetadata, false);
     MailboxReceiveOperator secondStageReceiveOp =
         new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId);
 
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
index 1a65a8457f..c17739e789 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
@@ -35,7 +35,7 @@ import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.ReceivingMailbox;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -73,8 +73,8 @@ public class SortedMailboxReceiveOperatorTest {
   @Mock
   private ReceivingMailbox _mailbox2;
 
-  private StageMetadata _stageMetadataBoth;
-  private StageMetadata _stageMetadata1;
+  private PlanFragmentMetadata _planFragmentMetadataBoth;
+  private PlanFragmentMetadata _planFragmentMetadata1;
 
   @BeforeMethod
   public void setUp() {
@@ -83,7 +83,7 @@ public class SortedMailboxReceiveOperatorTest {
     when(_mailboxService.getPort()).thenReturn(123);
     VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0);
     VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1);
-    _stageMetadataBoth = new StageMetadata.Builder()
+    _planFragmentMetadataBoth = new PlanFragmentMetadata.Builder()
         .setWorkerMetadataList(Stream.of(server1, server2).map(
                 s -> new WorkerMetadata.Builder()
                     .setVirtualServerAddress(s)
@@ -103,7 +103,7 @@ public class SortedMailboxReceiveOperatorTest {
             .collect(Collectors.toList()))
         .build();
     // sending stage is 0, receiving stage is 1
-    _stageMetadata1 = new StageMetadata.Builder()
+    _planFragmentMetadata1 = new PlanFragmentMetadata.Builder()
         .setWorkerMetadataList(Stream.of(server1).map(
             s -> new WorkerMetadata.Builder()
                 .setVirtualServerAddress(s)
@@ -129,13 +129,13 @@ public class SortedMailboxReceiveOperatorTest {
   public void shouldThrowSingletonNoMatchMailboxServer() {
     VirtualServerAddress server1 = new VirtualServerAddress("localhost", 456, 0);
     VirtualServerAddress server2 = new VirtualServerAddress("localhost", 789, 1);
-    StageMetadata stageMetadata = new StageMetadata.Builder()
+    PlanFragmentMetadata planFragmentMetadata = new PlanFragmentMetadata.Builder()
         .setWorkerMetadataList(Stream.of(server1, server2).map(
             s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build()).collect(Collectors.toList()))
         .build();
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            stageMetadata, false);
+            planFragmentMetadata, false);
     //noinspection resource
     new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS,
         COLLATION_DIRECTIONS, false, 1);
@@ -145,7 +145,7 @@ public class SortedMailboxReceiveOperatorTest {
   public void shouldThrowReceiveSingletonFromMultiMatchMailboxServer() {
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadataBoth, false);
+            _planFragmentMetadataBoth, false);
     //noinspection resource
     new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS,
         COLLATION_DIRECTIONS, false, 1);
@@ -166,7 +166,7 @@ public class SortedMailboxReceiveOperatorTest {
     when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10L, System.currentTimeMillis() + 10L,
-            _stageMetadata1, false);
+            _planFragmentMetadata1, false);
     //noinspection resource
     new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, Collections.emptyList(),
         Collections.emptyList(), false, 1);
@@ -179,7 +179,7 @@ public class SortedMailboxReceiveOperatorTest {
     // Short timeoutMs should result in timeout
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10L, System.currentTimeMillis() + 10L,
-            _stageMetadata1, false);
+            _planFragmentMetadata1, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
       Thread.sleep(100L);
@@ -191,7 +191,7 @@ public class SortedMailboxReceiveOperatorTest {
 
     // Longer timeout or default timeout (10s) doesn't result in timeout
     context = new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10_000L,
-        System.currentTimeMillis() + 10_000L, _stageMetadata1, false);
+        System.currentTimeMillis() + 10_000L, _planFragmentMetadata1, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
       Thread.sleep(100L);
@@ -205,7 +205,7 @@ public class SortedMailboxReceiveOperatorTest {
     when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadata1, false);
+            _planFragmentMetadata1, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
       assertTrue(receiveOp.nextBlock().isNoOpBlock());
@@ -218,7 +218,7 @@ public class SortedMailboxReceiveOperatorTest {
     when(_mailbox1.poll()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadata1, false);
+            _planFragmentMetadata1, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
       assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
@@ -233,7 +233,7 @@ public class SortedMailboxReceiveOperatorTest {
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadata1, false);
+            _planFragmentMetadata1, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
       List<Object[]> actualRows = receiveOp.nextBlock().getContainer();
@@ -251,7 +251,7 @@ public class SortedMailboxReceiveOperatorTest {
         TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(errorMessage)));
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadata1, false);
+            _planFragmentMetadata1, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
       TransferableBlock block = receiveOp.nextBlock();
@@ -270,7 +270,7 @@ public class SortedMailboxReceiveOperatorTest {
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadataBoth, false);
+            _planFragmentMetadataBoth, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
       assertTrue(receiveOp.nextBlock().isNoOpBlock());
@@ -293,7 +293,7 @@ public class SortedMailboxReceiveOperatorTest {
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadataBoth, false);
+            _planFragmentMetadataBoth, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
       TransferableBlock block = receiveOp.nextBlock();
@@ -318,7 +318,7 @@ public class SortedMailboxReceiveOperatorTest {
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadataBoth, false);
+            _planFragmentMetadataBoth, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
       assertEquals(receiveOp.nextBlock().getContainer(), Arrays.asList(row5, row2, row4, row1, row3));
@@ -349,7 +349,7 @@ public class SortedMailboxReceiveOperatorTest {
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadataBoth, false);
+            _planFragmentMetadataBoth, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.HASH_DISTRIBUTED, dataSchema, collationKeys, collationDirection, false, 1)) {
       assertEquals(receiveOp.nextBlock().getContainer(), Arrays.asList(row1, row2, row3, row5, row4));
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
index 7b28eca581..817c7239d6 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
@@ -30,7 +30,7 @@ import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.logical.RexExpression;
-import org.apache.pinot.query.planner.stage.WindowNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
index ee1d337166..ab7748850f 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
@@ -37,9 +37,9 @@ import org.apache.pinot.query.QueryEnvironment;
 import org.apache.pinot.query.QueryEnvironmentTestBase;
 import org.apache.pinot.query.QueryTestSet;
 import org.apache.pinot.query.planner.QueryPlan;
-import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.routing.PlanFragmentMetadata;
 import org.apache.pinot.query.routing.QueryServerInstance;
-import org.apache.pinot.query.routing.StageMetadata;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.QueryRunner;
@@ -118,7 +118,7 @@ public class QueryServerTest extends QueryTestSet {
         // submit the request for testing.
         submitRequest(queryRequest);
 
-        StageMetadata stageMetadata = queryPlan.getStageMetadata(stageId);
+        PlanFragmentMetadata planFragmentMetadata = queryPlan.getStageMetadata(stageId);
 
         // ensure mock query runner received correctly deserialized payload.
         QueryRunner mockRunner = _queryRunnerMap.get(
@@ -129,9 +129,9 @@ public class QueryServerTest extends QueryTestSet {
         TestUtils.waitForCondition(aVoid -> {
           try {
             Mockito.verify(mockRunner).processQuery(Mockito.argThat(distributedStagePlan -> {
-              StageNode stageNode = queryPlan.getQueryStageMap().get(stageId);
-              return isStageNodesEqual(stageNode, distributedStagePlan.getStageRoot())
-                  && isStageMetadataEqual(stageMetadata, distributedStagePlan.getStageMetadata());
+              PlanNode planNode = queryPlan.getQueryStageMap().get(stageId);
+              return isStageNodesEqual(planNode, distributedStagePlan.getStageRoot())
+                  && isStageMetadataEqual(planFragmentMetadata, distributedStagePlan.getStageMetadata());
             }), Mockito.argThat(requestMetadataMap ->
                 requestIdStr.equals(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID))));
             return true;
@@ -146,12 +146,13 @@ public class QueryServerTest extends QueryTestSet {
     }
   }
 
-  private boolean isStageMetadataEqual(StageMetadata expected, StageMetadata actual) {
-    if (!EqualityUtils.isEqual(StageMetadata.getTableName(expected), StageMetadata.getTableName(actual))) {
+  private boolean isStageMetadataEqual(PlanFragmentMetadata expected, PlanFragmentMetadata actual) {
+    if (!EqualityUtils.isEqual(PlanFragmentMetadata.getTableName(expected),
+        PlanFragmentMetadata.getTableName(actual))) {
       return false;
     }
-    TimeBoundaryInfo expectedTimeBoundaryInfo = StageMetadata.getTimeBoundary(expected);
-    TimeBoundaryInfo actualTimeBoundaryInfo = StageMetadata.getTimeBoundary(actual);
+    TimeBoundaryInfo expectedTimeBoundaryInfo = PlanFragmentMetadata.getTimeBoundary(expected);
+    TimeBoundaryInfo actualTimeBoundaryInfo = PlanFragmentMetadata.getTimeBoundary(actual);
     if (expectedTimeBoundaryInfo == null && actualTimeBoundaryInfo != null
         || expectedTimeBoundaryInfo != null && actualTimeBoundaryInfo == null) {
       return false;
@@ -184,15 +185,15 @@ public class QueryServerTest extends QueryTestSet {
         WorkerMetadata.getTableSegmentsMap(actual));
   }
 
-  private static boolean isStageNodesEqual(StageNode left, StageNode right) {
+  private static boolean isStageNodesEqual(PlanNode left, PlanNode right) {
     // This only checks the stage tree structure is correct. because the input/stageId fields are not
     // part of the generic proto ser/de; which is tested in query planner.
-    if (left.getStageId() != right.getStageId() || left.getClass() != right.getClass()
+    if (left.getPlanFragmentId() != right.getPlanFragmentId() || left.getClass() != right.getClass()
         || left.getInputs().size() != right.getInputs().size()) {
       return false;
     }
-    left.getInputs().sort(Comparator.comparingInt(StageNode::getStageId));
-    right.getInputs().sort(Comparator.comparingInt(StageNode::getStageId));
+    left.getInputs().sort(Comparator.comparingInt(PlanNode::getPlanFragmentId));
+    right.getInputs().sort(Comparator.comparingInt(PlanNode::getPlanFragmentId));
     for (int i = 0; i < left.getInputs().size(); i++) {
       if (!isStageNodesEqual(left.getInputs().get(i), right.getInputs().get(i))) {
         return false;
@@ -221,8 +222,8 @@ public class QueryServerTest extends QueryTestSet {
     int workerId = serverInstanceToWorkerIdMap.get(serverInstance).get(0);
 
     return Worker.QueryRequest.newBuilder().setStagePlan(QueryPlanSerDeUtils.serialize(
-        QueryDispatcher.constructDistributedStagePlan(queryPlan, stageId,
-            new VirtualServerAddress(serverInstance, workerId))))
+            QueryDispatcher.constructDistributedStagePlan(queryPlan, stageId,
+                new VirtualServerAddress(serverInstance, workerId))))
         // the default configurations that must exist.
         .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong()))
         .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS,
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
index c7a1fbaf81..7e98f1a7ae 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
@@ -102,7 +102,7 @@ public class QueryDispatcherTest extends QueryTestSet {
     QueryPlan queryPlan = _queryEnvironment.planQuery(sql);
     QueryDispatcher dispatcher = new QueryDispatcher();
     int reducerStageId = dispatcher.submit(RANDOM_REQUEST_ID_GEN.nextLong(), queryPlan, 10_000L, new HashMap<>());
-    Assert.assertTrue(PlannerUtils.isRootStage(reducerStageId));
+    Assert.assertTrue(PlannerUtils.isRootPlanFragment(reducerStageId));
     dispatcher.shutdown();
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org