You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/11/13 19:34:30 UTC

(pinot) branch master updated: [multistage][feature] leaf planning with multi-semi join support (#11937)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f30fdee7ba [multistage][feature] leaf planning with multi-semi join support (#11937)
f30fdee7ba is described below

commit f30fdee7ba5843251485d54aa121281ae03330e1
Author: Rong Rong <ro...@apache.org>
AuthorDate: Mon Nov 13 11:34:22 2023 -0800

    [multistage][feature] leaf planning with multi-semi join support (#11937)
    
    currently pinotQuery only supports limited amount of PlanNodes
    this means an Exchange must be inserted in order to ensure the leaf stage can be converted into a PinotQuery. However this is a misuse of Exchange
    
    This PR plans to split the ServerRequest planning into 2 stages
    - first plan as much as possible into PinotQuery
    - for any remainder nodes that cannot be planned into PinotQuery, will be run together with the LeafStageTransferrableBlockOperator as the input locally.
    
    This PR also follows up with the refactoring from https://github.com/apache/pinot/pull/11439.
    - removed server plan request context entirely and replace it with fields used by server side.
    - extends OpChainExecutionContext with additional info from ServerOpChainExecutionContext
    - break down steps from ServerPlanRequestUtils.build method into 3 parts -->
        - compile pinot query
        - convert pinot query into instance requests (based on physical segments)
        - construct an OpChain that sits on top
    
    Context: alternative PR to #11843
    
    ---------
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../rel/rules/PinotJoinToDynamicBroadcastRule.java |   7 +-
 .../apache/calcite/rel/rules/PinotRuleUtils.java   |  48 ++-
 .../src/test/resources/queries/JoinPlans.json      |  17 +-
 .../test/resources/queries/PinotHintablePlans.json | 399 ++++++++++++++++++++-
 .../apache/pinot/query/runtime/QueryRunner.java    |  31 +-
 .../runtime/plan/OpChainExecutionContext.java      |  11 +
 .../query/runtime/plan/PhysicalPlanVisitor.java    |  31 +-
 .../plan/server/ServerPlanRequestContext.java      |  68 ++--
 .../plan/server/ServerPlanRequestUtils.java        | 144 +++++---
 .../plan/server/ServerPlanRequestVisitor.java      | 135 ++++---
 .../test/resources/queries/FromExpressions.json    |  10 +
 .../src/test/resources/queries/QueryHints.json     | 185 +++++++++-
 12 files changed, 882 insertions(+), 204 deletions(-)

diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
index 3ae53f8cc9..e4cc9852c0 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
@@ -142,7 +142,7 @@ public class PinotJoinToDynamicBroadcastRule extends RelOptRule {
     RelNode right = join.getRight() instanceof HepRelVertex ? ((HepRelVertex) join.getRight()).getCurrentRel()
         : join.getRight();
     return left instanceof Exchange && right instanceof Exchange
-        && PinotRuleUtils.noExchangeInSubtree(left.getInput(0))
+        && PinotRuleUtils.canPushDynamicBroadcastToLeaf(left.getInput(0))
         // default enable dynamic broadcast for SEMI join unless other join strategy were specified
         && (!explicitOtherStrategy && join.getJoinType() == JoinRelType.SEMI && joinInfo.nonEquiConditions.isEmpty());
   }
@@ -162,9 +162,6 @@ public class PinotJoinToDynamicBroadcastRule extends RelOptRule {
         new LogicalJoin(join.getCluster(), join.getTraitSet(), left.getInput(), dynamicBroadcastExchange,
             join.getCondition(), join.getVariablesSet(), join.getJoinType(), join.isSemiJoinDone(),
             ImmutableList.copyOf(join.getSystemFieldList()));
-    // adding pass-through exchange after join b/c currently leaf-stage doesn't support chaining operator(s) after JOIN
-    PinotLogicalExchange passThroughAfterJoinExchange =
-        PinotLogicalExchange.create(dynamicFilterJoin, RelDistributions.hash(join.analyzeCondition().leftKeys));
-    call.transformTo(passThroughAfterJoinExchange);
+    call.transformTo(dynamicFilterJoin);
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRuleUtils.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRuleUtils.java
index 24f1820229..68c0dc4745 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRuleUtils.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRuleUtils.java
@@ -23,9 +23,11 @@ import org.apache.calcite.plan.hep.HepRelVertex;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.tools.RelBuilder;
@@ -67,20 +69,42 @@ public class PinotRuleUtils {
     return unboxRel(rel) instanceof Aggregate;
   }
 
-  // TODO: optimize this part out as it is not efficient to scan the entire subtree for exchanges.
-  public static boolean noExchangeInSubtree(RelNode relNode) {
-    if (relNode instanceof HepRelVertex) {
-      relNode = ((HepRelVertex) relNode).getCurrentRel();
-    }
-    if (relNode instanceof Exchange) {
+  /**
+   * utility logic to determine if a JOIN can be pushed down to the leaf-stage execution and leverage the
+   * segment-local info (indexing and others) to speed up the execution.
+   *
+   * <p>The logic here is that the "row-representation" of the relation must not have changed. E.g. </p>
+   * <ul>
+   *   <li>`RelNode` that are single-in, single-out are possible (Project/Filter/)</li>
+   *   <li>`Join` can be stacked on top if we only consider SEMI-JOIN</li>
+   *   <li>`Window` should be allowed but we dont have impl for Window on leaf, so not yet included.</li>
+   *   <li>`Sort` should be allowed but we need to reorder Sort and Join first, so not yet included.</li>
+   * </ul>
+   */
+  public static boolean canPushDynamicBroadcastToLeaf(RelNode relNode) {
+    // TODO 1: optimize this part out as it is not efficient to scan the entire subtree for exchanges;
+    //    we should cache the stats in the node (potentially using Trait, e.g. marking LeafTrait & IntermediateTrait)
+    // TODO 2: this part is similar to how ServerPlanRequestVisitor determines leaf-stage boundary;
+    //    we should refactor and merge both logic
+    // TODO 3: for JoinNode, currently this only works towards left-side;
+    //    we should support both left and right.
+    // TODO 4: for JoinNode, currently this only works for SEMI-JOIN, INNER-JOIN can bring in rows from both sides;
+    //    we should check only the non-pipeline-breaker side columns are accessed.
+    relNode = PinotRuleUtils.unboxRel(relNode);
+
+    if (relNode instanceof TableScan) {
+      // reaching table means it is plan-able.
+      return true;
+    } else if (relNode instanceof Project || relNode instanceof Filter) {
+      // reaching single-in, single-out RelNode means we can continue downward.
+      return canPushDynamicBroadcastToLeaf(relNode.getInput(0));
+    } else if (relNode instanceof Join) {
+      // always check only the left child for dynamic broadcast
+      return canPushDynamicBroadcastToLeaf(((Join) relNode).getLeft());
+    } else {
+      // for all others we don't allow dynamic broadcast
       return false;
     }
-    for (RelNode child : relNode.getInputs()) {
-      if (!noExchangeInSubtree(child)) {
-        return false;
-      }
-    }
-    return true;
   }
 
   public static String extractFunctionName(RexCall function) {
diff --git a/pinot-query-planner/src/test/resources/queries/JoinPlans.json b/pinot-query-planner/src/test/resources/queries/JoinPlans.json
index 6f655ec6e4..46489d8737 100644
--- a/pinot-query-planner/src/test/resources/queries/JoinPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/JoinPlans.json
@@ -428,15 +428,14 @@
         "output": [
           "Execution Plan",
           "\nLogicalJoin(condition=[=($2, $8)], joinType=[semi])",
-          "\n  PinotLogicalExchange(distribution=[hash[2]])",
-          "\n    LogicalJoin(condition=[=($0, $7)], joinType=[semi])",
-          "\n      LogicalFilter(condition=[<($2, 100)])",
-          "\n        LogicalTableScan(table=[[b]])",
-          "\n      PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
-          "\n        LogicalProject(col1=[$0], col2=[$1])",
-          "\n          LogicalFilter(condition=[AND(<>($1, 'bar'), <>($1, 'foo'))])",
-          "\n            LogicalTableScan(table=[[a]])",
-          "\n  PinotLogicalExchange(distribution=[hash[1]])",
+          "\n  LogicalJoin(condition=[=($0, $7)], joinType=[semi])",
+          "\n    LogicalFilter(condition=[<($2, 100)])",
+          "\n      LogicalTableScan(table=[[b]])",
+          "\n    PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
+          "\n      LogicalProject(col1=[$0], col2=[$1])",
+          "\n        LogicalFilter(condition=[AND(<>($1, 'bar'), <>($1, 'foo'))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n  PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
           "\n    LogicalProject(col2=[$1], col3=[$2])",
           "\n      LogicalFilter(condition=[AND(<>($1, 'bar'), <>($1, 'foo'))])",
           "\n        LogicalTableScan(table=[[a]])",
diff --git a/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json b/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json
index 0fd9088c11..6b07bdf9ee 100644
--- a/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json
@@ -25,7 +25,22 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, a.col2 FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0)",
         "output": [
           "Execution Plan",
-          "\nPinotLogicalExchange(distribution=[hash[0]])",
+          "\nLogicalJoin(condition=[=($0, $2)], joinType=[semi])",
+          "\n  LogicalProject(col1=[$0], col2=[$1])",
+          "\n    LogicalTableScan(table=[[a]])",
+          "\n  PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
+          "\n    LogicalProject(col2=[$1], col3=[$2])",
+          "\n      LogicalFilter(condition=[>($2, 0)])",
+          "\n        LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "semi-join with multi-ple dynamic_broadcast join strategy then group-by on same key",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, a.col2 FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0) AND a.col2 IN (select col1 FROM c WHERE c.col3 > 0)",
+        "output": [
+          "Execution Plan",
+          "\nLogicalJoin(condition=[=($1, $2)], joinType=[semi])",
           "\n  LogicalJoin(condition=[=($0, $2)], joinType=[semi])",
           "\n    LogicalProject(col1=[$0], col2=[$1])",
           "\n      LogicalTableScan(table=[[a]])",
@@ -33,6 +48,33 @@
           "\n      LogicalProject(col2=[$1], col3=[$2])",
           "\n        LogicalFilter(condition=[>($2, 0)])",
           "\n          LogicalTableScan(table=[[b]])",
+          "\n  PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
+          "\n    LogicalProject(col1=[$0], col3=[$2])",
+          "\n      LogicalFilter(condition=[>($2, 0)])",
+          "\n        LogicalTableScan(table=[[c]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "semi-join with multi-ple dynamic_broadcast join strategy then group-by",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0) AND a.col2 IN (select col1 FROM c WHERE c.col3 > 0) GROUP BY 1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
+          "\n    LogicalAggregate(group=[{0}], agg#0=[$SUM0($2)])",
+          "\n      LogicalJoin(condition=[=($1, $3)], joinType=[semi])",
+          "\n        LogicalJoin(condition=[=($0, $3)], joinType=[semi])",
+          "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n            LogicalTableScan(table=[[a]])",
+          "\n          PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
+          "\n            LogicalProject(col2=[$1], col3=[$2])",
+          "\n              LogicalFilter(condition=[>($2, 0)])",
+          "\n                LogicalTableScan(table=[[b]])",
+          "\n        PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
+          "\n          LogicalProject(col1=[$0], col3=[$2])",
+          "\n            LogicalFilter(condition=[>($2, 0)])",
+          "\n              LogicalTableScan(table=[[c]])",
           "\n"
         ]
       },
@@ -42,14 +84,13 @@
         "output": [
           "Execution Plan",
           "\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
-          "\n  PinotLogicalExchange(distribution=[hash[0]])",
-          "\n    LogicalJoin(condition=[=($0, $2)], joinType=[semi])",
-          "\n      LogicalProject(col1=[$0], col3=[$2])",
-          "\n        LogicalTableScan(table=[[a]])",
-          "\n      PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
-          "\n        LogicalProject(col2=[$1], col3=[$2])",
-          "\n          LogicalFilter(condition=[>($2, 0)])",
-          "\n            LogicalTableScan(table=[[b]])",
+          "\n  LogicalJoin(condition=[=($0, $2)], joinType=[semi])",
+          "\n    LogicalProject(col1=[$0], col3=[$2])",
+          "\n      LogicalTableScan(table=[[a]])",
+          "\n    PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
+          "\n      LogicalProject(col2=[$1], col3=[$2])",
+          "\n        LogicalFilter(condition=[>($2, 0)])",
+          "\n          LogicalTableScan(table=[[b]])",
           "\n"
         ]
       },
@@ -61,14 +102,13 @@
           "\nLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
           "\n  PinotLogicalExchange(distribution=[hash[0]])",
           "\n    LogicalAggregate(group=[{1}], agg#0=[$SUM0($2)])",
-          "\n      PinotLogicalExchange(distribution=[hash[0]])",
-          "\n        LogicalJoin(condition=[=($0, $3)], joinType=[semi])",
-          "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
-          "\n            LogicalTableScan(table=[[a]])",
-          "\n          PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
-          "\n            LogicalProject(col2=[$1], col3=[$2])",
-          "\n              LogicalFilter(condition=[>($2, 0)])",
-          "\n                LogicalTableScan(table=[[b]])",
+          "\n      LogicalJoin(condition=[=($0, $3)], joinType=[semi])",
+          "\n        LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n        PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
+          "\n          LogicalProject(col2=[$1], col3=[$2])",
+          "\n            LogicalFilter(condition=[>($2, 0)])",
+          "\n              LogicalTableScan(table=[[b]])",
           "\n"
         ]
       },
@@ -127,5 +167,330 @@
         ]
       }
     ]
+  },
+  "pinot_table_partition_option_tests": {
+    "queries": [
+      {
+        "description": "join with colocated tables",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col2, a.col3, b.col3 FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ JOIN b /*+ tableOptions(partition_key='col1', partition_size='4') */ ON a.col2 = b.col1 WHERE b.col3 > 0",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col2=[$0], col3=[$1], col30=[$3])",
+          "\n  LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$1], col3=[$2])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col1=[$0], col3=[$2])",
+          "\n        LogicalFilter(condition=[>($2, 0)])",
+          "\n          LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "group by with pre-partitioned tables on partition column",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col2, SUM(a.col3) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ GROUP BY 1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
+          "\n    LogicalAggregate(group=[{1}], agg#0=[$SUM0($2)])",
+          "\n      LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "group by with pre-partitioned tables on non-partition column",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ GROUP BY 1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
+          "\n    LogicalAggregate(group=[{0}], agg#0=[$SUM0($2)])",
+          "\n      LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "join with colocated tables then group-by left join key column",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col2, SUM(a.col3) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ JOIN b /*+ tableOptions(partition_key='col1', partition_size='4') */ ON a.col2 = b.col1 WHERE b.col3 > 0 GROUP BY 1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
+          "\n    LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
+          "\n      LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
+          "\n          LogicalProject(col2=[$1], col3=[$2])",
+          "\n            LogicalTableScan(table=[[a]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
+          "\n          LogicalProject(col1=[$0])",
+          "\n            LogicalFilter(condition=[>($2, 0)])",
+          "\n              LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "join with colocated tables then group-by left join key column and other columns",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col2, b.col2, SUM(a.col3) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ JOIN b /*+ tableOptions(partition_key='col1', partition_size='4') */ ON a.col2 = b.col1 WHERE b.col3 > 0 GROUP BY 1, 2",
+        "output": [
+          "Execution Plan",
+          "\nLogicalAggregate(group=[{0, 1}], agg#0=[$SUM0($2)])",
+          "\n  PinotLogicalExchange(distribution=[hash[0, 1]])",
+          "\n    LogicalAggregate(group=[{0, 3}], agg#0=[$SUM0($1)])",
+          "\n      LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
+          "\n          LogicalProject(col2=[$1], col3=[$2])",
+          "\n            LogicalTableScan(table=[[a]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
+          "\n          LogicalProject(col1=[$0], col2=[$1])",
+          "\n            LogicalFilter(condition=[>($2, 0)])",
+          "\n              LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "join with colocated tables then group-by other columns",
+        "sql": "EXPLAIN PLAN FOR SELECT b.col2, SUM(a.col3) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ JOIN b /*+ tableOptions(partition_key='col1', partition_size='4') */ ON a.col2 = b.col1 WHERE b.col3 > 0 GROUP BY 1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
+          "\n    LogicalAggregate(group=[{3}], agg#0=[$SUM0($1)])",
+          "\n      LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
+          "\n          LogicalProject(col2=[$1], col3=[$2])",
+          "\n            LogicalTableScan(table=[[a]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
+          "\n          LogicalProject(col1=[$0], col2=[$1])",
+          "\n            LogicalFilter(condition=[>($2, 0)])",
+          "\n              LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "agg + semi-join on colocated tables then group by on partition column",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col2, SUM(a.col3) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ WHERE a.col2 IN (SELECT col1 FROM b /*+ tableOptions(partition_key='col1', partition_size='4') */ WHERE b.col3 > 0) GROUP BY 1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
+          "\n    LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
+          "\n      LogicalJoin(condition=[=($0, $2)], joinType=[semi])",
+          "\n        LogicalProject(col2=[$1], col3=[$2])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n        PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
+          "\n          LogicalProject(col1=[$0], col3=[$2])",
+          "\n            LogicalFilter(condition=[>($2, 0)])",
+          "\n              LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "agg + semi-join on pre-partitioned main tables then group by on partition column",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col2, SUM(a.col3) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ WHERE a.col2 IN (SELECT col1 FROM b WHERE b.col3 > 0) GROUP BY 1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
+          "\n    LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
+          "\n      LogicalJoin(condition=[=($0, $2)], joinType=[semi])",
+          "\n        LogicalProject(col2=[$1], col3=[$2])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n        PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
+          "\n          LogicalProject(col1=[$0], col3=[$2])",
+          "\n            LogicalFilter(condition=[>($2, 0)])",
+          "\n              LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "agg + semi-join on pre-partitioned main tables then group by on non-partitioned column",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ WHERE a.col2 IN (SELECT col1 FROM b WHERE b.col3 > 0) GROUP BY 1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
+          "\n    LogicalAggregate(group=[{0}], agg#0=[$SUM0($2)])",
+          "\n      LogicalJoin(condition=[=($1, $3)], joinType=[semi])",
+          "\n        LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n        PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
+          "\n          LogicalProject(col1=[$0], col3=[$2])",
+          "\n            LogicalFilter(condition=[>($2, 0)])",
+          "\n              LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "agg + semi-join on pre-partitioned main tables with group by on partitioned column on semi table",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ WHERE a.col2 IN (SELECT col1 FROM b /*+ tableOptions(partition_key='col1', partition_size='4') */ WHERE b.col3 > 0 GROUP BY 1 HAVING COUNT(*) > 1) GROUP BY 1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
+          "\n    LogicalAggregate(group=[{0}], agg#0=[$SUM0($2)])",
+          "\n      LogicalJoin(condition=[=($1, $3)], joinType=[semi])",
+          "\n        LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n        PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
+          "\n          LogicalFilter(condition=[>($1, 1)])",
+          "\n            LogicalAggregate(group=[{0}], agg#0=[COUNT($1)])",
+          "\n              PinotLogicalExchange(distribution=[hash[0]])",
+          "\n                LogicalAggregate(group=[{0}], agg#0=[COUNT()])",
+          "\n                  LogicalProject(col1=[$0], col3=[$2])",
+          "\n                    LogicalFilter(condition=[>($2, 0)])",
+          "\n                      LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "agg + semi-join on pre-partitioned main tables with group by on partitioned column with having filter on top of semi join",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ WHERE a.col2 IN (SELECT col1 FROM b /*+ tableOptions(partition_key='col1', partition_size='4') */ WHERE b.col3 > 0) GROUP BY 1 HAVING COUNT(*) > 5",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$0], EXPR$1=[$1])",
+          "\n  LogicalFilter(condition=[>($2, 5)])",
+          "\n    LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[COUNT($2)])",
+          "\n      PinotLogicalExchange(distribution=[hash[0]])",
+          "\n        LogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], agg#1=[COUNT()])",
+          "\n          LogicalJoin(condition=[=($1, $3)], joinType=[semi])",
+          "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n            PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
+          "\n              LogicalProject(col1=[$0], col3=[$2])",
+          "\n                LogicalFilter(condition=[>($2, 0)])",
+          "\n                  LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "agg + semi-join on pre-partitioned main tables with group by on partitioned column with sorting on top of semi join",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ WHERE a.col2 IN (SELECT col1 FROM b /*+ tableOptions(partition_key='col1', partition_size='4') */ WHERE b.col3 > 0) GROUP BY 1 ORDER BY 2 DESC",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$1], dir0=[DESC], offset=[0])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[1 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n    LogicalSort(sort0=[$1], dir0=[DESC])",
+          "\n      LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
+          "\n          LogicalAggregate(group=[{0}], agg#0=[$SUM0($2)])",
+          "\n            LogicalJoin(condition=[=($1, $3)], joinType=[semi])",
+          "\n              LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n                LogicalTableScan(table=[[a]])",
+          "\n              PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
+          "\n                LogicalProject(col1=[$0], col3=[$2])",
+          "\n                  LogicalFilter(condition=[>($2, 0)])",
+          "\n                    LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "co-partition agg + semi-join with colocated tables & agg hint",
+        "sql": "EXPLAIN PLAN FOR SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ a.col2, SUM(a.col3) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ WHERE a.col2 IN (SELECT col1 FROM b /*+ tableOptions(partition_key='col1', partition_size='4') */ WHERE b.col3 > 0) GROUP BY 1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
+          "\n  LogicalJoin(condition=[=($0, $2)], joinType=[semi])",
+          "\n    LogicalProject(col2=[$1], col3=[$2])",
+          "\n      LogicalTableScan(table=[[a]])",
+          "\n    PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
+          "\n      LogicalProject(col1=[$0], col3=[$2])",
+          "\n        LogicalFilter(condition=[>($2, 0)])",
+          "\n          LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "co-partition agg + semi-join with single table partition & agg hint",
+        "sql": "EXPLAIN PLAN FOR SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ a.col2, SUM(a.col3) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ WHERE a.col2 IN (SELECT col1 FROM b WHERE b.col3 > 0) GROUP BY 1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
+          "\n  LogicalJoin(condition=[=($0, $2)], joinType=[semi])",
+          "\n    LogicalProject(col2=[$1], col3=[$2])",
+          "\n      LogicalTableScan(table=[[a]])",
+          "\n    PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
+          "\n      LogicalProject(col1=[$0], col3=[$2])",
+          "\n        LogicalFilter(condition=[>($2, 0)])",
+          "\n          LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "co-partition agg + semi-join on pre-partitioned main tables with group by on partitioned column with having filter on top of semi join",
+        "sql": "EXPLAIN PLAN FOR SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ a.col2, SUM(a.col3) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ WHERE a.col2 IN (SELECT col1 FROM b /*+ tableOptions(partition_key='col1', partition_size='4') */ WHERE b.col3 > 0) GROUP BY 1 HAVING COUNT(*) > 5",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col2=[$0], EXPR$1=[$1])",
+          "\n  LogicalFilter(condition=[>($2, 5)])",
+          "\n    LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)], agg#1=[COUNT()])",
+          "\n      LogicalJoin(condition=[=($0, $2)], joinType=[semi])",
+          "\n        LogicalProject(col2=[$1], col3=[$2])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n        PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
+          "\n          LogicalProject(col1=[$0], col3=[$2])",
+          "\n            LogicalFilter(condition=[>($2, 0)])",
+          "\n              LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "co-partition agg + semi-join on pre-partitioned main tables with group by on partitioned column with sorting on top of semi join colocated on partition key",
+        "sql": "EXPLAIN PLAN FOR SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ a.col2, SUM(a.col3) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ WHERE a.col2 IN (SELECT col1 FROM b /*+ tableOptions(partition_key='col1', partition_size='4') */ WHERE b.col3 > 0) GROUP BY 1 ORDER BY 2 DESC",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$1], dir0=[DESC], offset=[0])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[1 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n    LogicalSort(sort0=[$1], dir0=[DESC])",
+          "\n      LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
+          "\n        LogicalJoin(condition=[=($0, $2)], joinType=[semi])",
+          "\n          LogicalProject(col2=[$1], col3=[$2])",
+          "\n            LogicalTableScan(table=[[a]])",
+          "\n          PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
+          "\n            LogicalProject(col1=[$0], col3=[$2])",
+          "\n              LogicalFilter(condition=[>($2, 0)])",
+          "\n                LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "semi-join on pre-partitioned main tables with sorting on top of semi join on join key",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col2 FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ WHERE a.col2 IN (SELECT col1 FROM b /*+ tableOptions(partition_key='col1', partition_size='4') */ WHERE b.col3 > 0) ORDER BY 1 DESC",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$0], dir0=[DESC], offset=[0])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n    LogicalSort(sort0=[$0], dir0=[DESC])",
+          "\n      LogicalJoin(condition=[=($0, $1)], joinType=[semi])",
+          "\n        LogicalProject(col2=[$1])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n        PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
+          "\n          LogicalProject(col1=[$0], col3=[$2])",
+          "\n            LogicalFilter(condition=[>($2, 0)])",
+          "\n              LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "semi-join on pre-partitioned main tables with sorting on top of semi join with non-join columns",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, a.col3 FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ WHERE a.col2 IN (SELECT col1 FROM b /*+ tableOptions(partition_key='col1', partition_size='4') */ WHERE b.col3 > 0) ORDER BY 2 DESC",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$1], dir0=[DESC], offset=[0])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[1 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n    LogicalSort(sort0=[$1], dir0=[DESC])",
+          "\n      LogicalProject(col1=[$0], col3=[$2])",
+          "\n        LogicalJoin(condition=[=($1, $3)], joinType=[semi])",
+          "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n            LogicalTableScan(table=[[a]])",
+          "\n          PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
+          "\n            LogicalProject(col1=[$0], col3=[$2])",
+          "\n              LogicalFilter(condition=[>($2, 0)])",
+          "\n                LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      }
+    ]
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 8c441e6524..526a489fb6 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.query.runtime;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -32,7 +31,6 @@ import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
-import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.query.mailbox.MailboxIdUtils;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.plannode.MailboxSendNode;
@@ -40,16 +38,12 @@ import org.apache.pinot.query.routing.MailboxMetadata;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.executor.ExecutorServiceUtils;
 import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
-import org.apache.pinot.query.runtime.operator.LeafStageTransferableBlockOperator;
-import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
-import org.apache.pinot.query.runtime.operator.MultiStageOperator;
 import org.apache.pinot.query.runtime.operator.OpChain;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
 import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerExecutor;
 import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
-import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
 import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -193,7 +187,8 @@ public class QueryRunner {
             pipelineBreakerResult);
     OpChain opChain;
     if (DistributedStagePlan.isLeafStage(distributedStagePlan)) {
-      opChain = compileLeafStage(executionContext, distributedStagePlan);
+      opChain = ServerPlanRequestUtils.compileLeafStage(executionContext, distributedStagePlan, _helixManager,
+          _serverMetrics, _leafQueryExecutor, _executorService);
     } else {
       opChain = PhysicalPlanVisitor.walkPlanNode(distributedStagePlan.getStageRoot(), executionContext);
     }
@@ -246,26 +241,4 @@ public class QueryRunner {
   public void cancel(long requestId) {
     _opChainScheduler.cancel(requestId);
   }
-
-  private OpChain compileLeafStage(OpChainExecutionContext executionContext,
-      DistributedStagePlan distributedStagePlan) {
-    List<ServerPlanRequestContext> serverPlanRequestContexts =
-        ServerPlanRequestUtils.constructServerQueryRequests(executionContext, distributedStagePlan,
-            _helixManager.getHelixPropertyStore());
-    List<ServerQueryRequest> serverQueryRequests = new ArrayList<>(serverPlanRequestContexts.size());
-    long queryArrivalTimeMs = System.currentTimeMillis();
-    for (ServerPlanRequestContext requestContext : serverPlanRequestContexts) {
-      serverQueryRequests.add(
-          new ServerQueryRequest(requestContext.getInstanceRequest(), _serverMetrics, queryArrivalTimeMs, true));
-    }
-    MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot();
-    MultiStageOperator leafStageOperator =
-        new LeafStageTransferableBlockOperator(executionContext, serverQueryRequests, sendNode.getDataSchema(),
-            _leafQueryExecutor, _executorService);
-    MailboxSendOperator mailboxSendOperator =
-        new MailboxSendOperator(executionContext, leafStageOperator, sendNode.getDistributionType(),
-            sendNode.getDistributionKeys(), sendNode.getCollationKeys(), sendNode.getCollationDirections(),
-            sendNode.isSortOnSender(), sendNode.getReceiverStageId());
-    return new OpChain(executionContext, mailboxSendOperator);
-  }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index 73d1b6ee05..10069167d6 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -25,6 +25,7 @@ import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.operator.OpChainId;
 import org.apache.pinot.query.runtime.operator.OpChainStats;
 import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
+import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
 import org.apache.pinot.spi.utils.CommonConstants;
 
 
@@ -46,6 +47,8 @@ public class OpChainExecutionContext {
   private final PipelineBreakerResult _pipelineBreakerResult;
   private final boolean _traceEnabled;
 
+  private ServerPlanRequestContext _leafStageContext;
+
   public OpChainExecutionContext(MailboxService mailboxService, long requestId, int stageId,
       VirtualServerAddress server, long deadlineMs, Map<String, String> opChainMetadata, StageMetadata stageMetadata,
       PipelineBreakerResult pipelineBreakerResult) {
@@ -108,4 +111,12 @@ public class OpChainExecutionContext {
   public boolean isTraceEnabled() {
     return _traceEnabled;
   }
+
+  public ServerPlanRequestContext getLeafStageContext() {
+    return _leafStageContext;
+  }
+
+  public void setLeafStageContext(ServerPlanRequestContext leafStageContext) {
+    _leafStageContext = leafStageContext;
+  }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 861cb4abc7..9c0063ebc6 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -38,6 +38,7 @@ import org.apache.pinot.query.runtime.operator.AggregateOperator;
 import org.apache.pinot.query.runtime.operator.FilterOperator;
 import org.apache.pinot.query.runtime.operator.HashJoinOperator;
 import org.apache.pinot.query.runtime.operator.IntersectOperator;
+import org.apache.pinot.query.runtime.operator.LeafStageTransferableBlockOperator;
 import org.apache.pinot.query.runtime.operator.LiteralValueOperator;
 import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
 import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
@@ -49,6 +50,7 @@ import org.apache.pinot.query.runtime.operator.SortedMailboxReceiveOperator;
 import org.apache.pinot.query.runtime.operator.TransformOperator;
 import org.apache.pinot.query.runtime.operator.UnionOperator;
 import org.apache.pinot.query.runtime.operator.WindowAggregateOperator;
+import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
 
 
 /**
@@ -67,6 +69,17 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
     return new OpChain(context, root);
   }
 
+  private <T extends PlanNode> MultiStageOperator visit(T node, OpChainExecutionContext context) {
+    if (context.getLeafStageContext() != null && context.getLeafStageContext().getLeafStageBoundaryNode() == node) {
+      ServerPlanRequestContext leafStageContext = context.getLeafStageContext();
+      return new LeafStageTransferableBlockOperator(context, leafStageContext.getServerQueryRequests(),
+          leafStageContext.getLeafStageBoundaryNode().getDataSchema(), leafStageContext.getLeafQueryExecutor(),
+          leafStageContext.getExecutorService());
+    } else {
+      return node.visit(this, context);
+    }
+  }
+
   @Override
   public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node, OpChainExecutionContext context) {
     if (node.isSortOnReceiver()) {
@@ -80,21 +93,21 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
 
   @Override
   public MultiStageOperator visitMailboxSend(MailboxSendNode node, OpChainExecutionContext context) {
-    MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
+    MultiStageOperator nextOperator = visit(node.getInputs().get(0), context);
     return new MailboxSendOperator(context, nextOperator, node.getDistributionType(), node.getDistributionKeys(),
         node.getCollationKeys(), node.getCollationDirections(), node.isSortOnSender(), node.getReceiverStageId());
   }
 
   @Override
   public MultiStageOperator visitAggregate(AggregateNode node, OpChainExecutionContext context) {
-    MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
+    MultiStageOperator nextOperator = visit(node.getInputs().get(0), context);
     return new AggregateOperator(context, nextOperator, node.getDataSchema(), node.getAggCalls(),
         node.getGroupSet(), node.getAggType(), node.getFilterArgIndices(), node.getNodeHint());
   }
 
   @Override
   public MultiStageOperator visitWindow(WindowNode node, OpChainExecutionContext context) {
-    MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
+    MultiStageOperator nextOperator = visit(node.getInputs().get(0), context);
     return new WindowAggregateOperator(context, nextOperator, node.getGroupSet(), node.getOrderSet(),
         node.getOrderSetDirection(), node.getOrderSetNullDirection(), node.getAggCalls(), node.getLowerBound(),
         node.getUpperBound(), node.getWindowFrameType(), node.getConstants(), node.getDataSchema(),
@@ -105,7 +118,7 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
   public MultiStageOperator visitSetOp(SetOpNode setOpNode, OpChainExecutionContext context) {
     List<MultiStageOperator> inputs = new ArrayList<>();
     for (PlanNode input : setOpNode.getInputs()) {
-      MultiStageOperator visited = input.visit(this, context);
+      MultiStageOperator visited = visit(input, context);
       inputs.add(visited);
     }
     switch (setOpNode.getSetOpType()) {
@@ -127,7 +140,7 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
 
   @Override
   public MultiStageOperator visitFilter(FilterNode node, OpChainExecutionContext context) {
-    MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
+    MultiStageOperator nextOperator = visit(node.getInputs().get(0), context);
     return new FilterOperator(context, nextOperator, node.getDataSchema(), node.getCondition());
   }
 
@@ -136,22 +149,22 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
     PlanNode left = node.getInputs().get(0);
     PlanNode right = node.getInputs().get(1);
 
-    MultiStageOperator leftOperator = left.visit(this, context);
-    MultiStageOperator rightOperator = right.visit(this, context);
+    MultiStageOperator leftOperator = visit(left, context);
+    MultiStageOperator rightOperator = visit(right, context);
 
     return new HashJoinOperator(context, leftOperator, rightOperator, left.getDataSchema(), node);
   }
 
   @Override
   public MultiStageOperator visitProject(ProjectNode node, OpChainExecutionContext context) {
-    MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
+    MultiStageOperator nextOperator = visit(node.getInputs().get(0), context);
     return new TransformOperator(context, nextOperator, node.getDataSchema(), node.getProjects(),
         node.getInputs().get(0).getDataSchema());
   }
 
   @Override
   public MultiStageOperator visitSort(SortNode node, OpChainExecutionContext context) {
-    MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
+    MultiStageOperator nextOperator = visit(node.getInputs().get(0), context);
     boolean isInputSorted = nextOperator instanceof SortedMailboxReceiveOperator;
     return new SortOperator(context, nextOperator, node.getCollationKeys(), node.getCollationDirections(),
         node.getCollationNullDirections(), node.getFetch(), node.getOffset(), node.getDataSchema(), isInputSorted);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
index bc78678fa3..33a955f709 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
@@ -18,47 +18,75 @@
  */
 package org.apache.pinot.query.runtime.plan.server;
 
-import org.apache.pinot.common.request.InstanceRequest;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
 import org.apache.pinot.common.request.PinotQuery;
-import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
 
 
 /**
  * Context class for converting a {@link org.apache.pinot.query.runtime.plan.DistributedStagePlan} into
  * {@link PinotQuery} to execute on server.
+ *
+ * On leaf-stage server node, {@link PlanNode} are split into {@link PinotQuery} part and
+ *     {@link org.apache.pinot.query.runtime.operator.OpChain} part.
  */
 public class ServerPlanRequestContext {
-  private final OpChainExecutionContext _executionContext;
-  private final TableType _tableType;
+  private final DistributedStagePlan _stagePlan;
+  private final QueryExecutor _leafQueryExecutor;
+  private final ExecutorService _executorService;
+  private final PipelineBreakerResult _pipelineBreakerResult;
+
+  private final PinotQuery _pinotQuery;
+  private PlanNode _leafStageBoundaryNode;
+  private List<ServerQueryRequest> _serverQueryRequests;
 
-  private PinotQuery _pinotQuery;
-  private InstanceRequest _instanceRequest;
+  public ServerPlanRequestContext(DistributedStagePlan stagePlan, QueryExecutor leafQueryExecutor,
+      ExecutorService executorService, PipelineBreakerResult pipelineBreakerResult) {
+    _stagePlan = stagePlan;
+    _leafQueryExecutor = leafQueryExecutor;
+    _executorService = executorService;
+    _pipelineBreakerResult = pipelineBreakerResult;
+    _pinotQuery = new PinotQuery();
+  }
+
+  public DistributedStagePlan getStagePlan() {
+    return _stagePlan;
+  }
 
-  public ServerPlanRequestContext(OpChainExecutionContext executionContext, PinotQuery pinotQuery,
-      TableType tableType) {
-    _executionContext = executionContext;
-    _pinotQuery = pinotQuery;
-    _tableType = tableType;
+  public QueryExecutor getLeafQueryExecutor() {
+    return _leafQueryExecutor;
   }
 
-  public OpChainExecutionContext getExecutionContext() {
-    return _executionContext;
+  public ExecutorService getExecutorService() {
+    return _executorService;
   }
 
-  public TableType getTableType() {
-    return _tableType;
+  public PipelineBreakerResult getPipelineBreakerResult() {
+    return _pipelineBreakerResult;
   }
 
   public PinotQuery getPinotQuery() {
     return _pinotQuery;
   }
 
-  public void setInstanceRequest(InstanceRequest instanceRequest) {
-    _instanceRequest = instanceRequest;
+  public PlanNode getLeafStageBoundaryNode() {
+    return _leafStageBoundaryNode;
+  }
+
+  public void setLeafStageBoundaryNode(PlanNode leafStageBoundaryNode) {
+    _leafStageBoundaryNode = leafStageBoundaryNode;
+  }
+
+  public List<ServerQueryRequest> getServerQueryRequests() {
+    return _serverQueryRequests;
   }
 
-  public InstanceRequest getInstanceRequest() {
-    return _instanceRequest;
+  public void setServerQueryRequests(List<ServerQueryRequest> serverQueryRequests) {
+    _serverQueryRequests = serverQueryRequests;
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
index 897e3c3508..b2ead5a041 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
@@ -25,9 +25,13 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.DataSource;
 import org.apache.pinot.common.request.Expression;
@@ -37,12 +41,17 @@ import org.apache.pinot.common.request.QuerySource;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.optimizer.QueryOptimizer;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
 import org.apache.pinot.query.routing.WorkerMetadata;
+import org.apache.pinot.query.runtime.operator.OpChain;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
 import org.apache.pinot.query.runtime.plan.StageMetadata;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -73,22 +82,66 @@ public class ServerPlanRequestUtils {
   private static final QueryOptimizer QUERY_OPTIMIZER = new QueryOptimizer();
 
   /**
-   * Entry point to construct a {@link ServerPlanRequestContext} for executing leaf-stage runner.
+   * main entry point for compiling leaf-stage {@link DistributedStagePlan}.
    *
-   * @param executionContext execution context of the stage.
-   * @param distributedStagePlan distributed stage plan of the stage.
+   * @param executionContext the execution context used by the leaf-stage execution engine.
+   * @param distributedStagePlan the distribute stage plan on the leaf.
+   * @return an opChain that executes the leaf-stage, with the leaf-stage execution encapsulated within.
+   */
+  public static OpChain compileLeafStage(OpChainExecutionContext executionContext,
+      DistributedStagePlan distributedStagePlan, HelixManager helixManager, ServerMetrics serverMetrics,
+      QueryExecutor leafQueryExecutor, ExecutorService executorService) {
+    long queryArrivalTimeMs = System.currentTimeMillis();
+    ServerPlanRequestContext serverContext = new ServerPlanRequestContext(distributedStagePlan, leafQueryExecutor,
+        executorService, executionContext.getPipelineBreakerResult());
+    // 1. compile the PinotQuery
+    constructPinotQueryPlan(serverContext);
+    // 2. convert PinotQuery into InstanceRequest list (one for each physical table)
+    List<InstanceRequest> instanceRequestList =
+        ServerPlanRequestUtils.constructServerQueryRequests(executionContext, serverContext, distributedStagePlan,
+            helixManager.getHelixPropertyStore());
+    serverContext.setServerQueryRequests(instanceRequestList.stream()
+        .map(instanceRequest -> new ServerQueryRequest(instanceRequest, serverMetrics, queryArrivalTimeMs, true))
+        .collect(Collectors.toList()));
+    // compile the OpChain
+    executionContext.setLeafStageContext(serverContext);
+    return PhysicalPlanVisitor.walkPlanNode(distributedStagePlan.getStageRoot(), executionContext);
+  }
+
+  /**
+   * First step of Server physical plan - construct {@link PinotQuery} and determine the leaf-stage boundary
+   * {@link PlanNode}.
+   *
+   * It constructs the content for {@link ServerPlanRequestContext#getPinotQuery()} and set the boundary via:
+   *   {@link ServerPlanRequestContext#setLeafStageBoundaryNode(PlanNode)}.
+   */
+  private static void constructPinotQueryPlan(ServerPlanRequestContext serverContext) {
+    DistributedStagePlan stagePlan = serverContext.getStagePlan();
+    PinotQuery pinotQuery = serverContext.getPinotQuery();
+    pinotQuery.setExplain(false);
+    // visit the plan and create PinotQuery and determine the leaf stage boundary PlanNode.
+    ServerPlanRequestVisitor.walkStageNode(stagePlan.getStageRoot(), serverContext);
+  }
+
+  /**
+   * Entry point to construct a list of {@link InstanceRequest}s for executing leaf-stage v1 runner.
+   *
+   * @param serverContext the server opChain execution context of the stage.
    * @param helixPropertyStore helix property store used to fetch table config and schema for leaf-stage execution.
-   * @return a list of server plan request context to be run
+   * @return a list of server instance request to be run.
    */
-  public static List<ServerPlanRequestContext> constructServerQueryRequests(OpChainExecutionContext executionContext,
-      DistributedStagePlan distributedStagePlan, ZkHelixPropertyStore<ZNRecord> helixPropertyStore) {
+  public static List<InstanceRequest> constructServerQueryRequests(OpChainExecutionContext executionContext,
+      ServerPlanRequestContext serverContext, DistributedStagePlan distributedStagePlan,
+      ZkHelixPropertyStore<ZNRecord> helixPropertyStore) {
     StageMetadata stageMetadata = distributedStagePlan.getStageMetadata();
     WorkerMetadata workerMetadata = distributedStagePlan.getCurrentWorkerMetadata();
     String rawTableName = StageMetadata.getTableName(stageMetadata);
+    int stageId = distributedStagePlan.getStageId();
     Map<String, List<String>> tableToSegmentListMap = WorkerMetadata.getTableSegmentsMap(workerMetadata);
-    List<ServerPlanRequestContext> requests = new ArrayList<>();
+    List<InstanceRequest> requests = new ArrayList<>();
     for (Map.Entry<String, List<String>> tableEntry : tableToSegmentListMap.entrySet()) {
       String tableType = tableEntry.getKey();
+      List<String> segmentList = tableEntry.getValue();
       // ZkHelixPropertyStore extends from ZkCacheBaseDataAccessor so it should not cause too much out-of-the-box
       // network traffic. but there's chance to improve this:
       // TODO: use TableDataManager: it is already getting tableConfig and Schema when processing segments.
@@ -97,15 +150,15 @@ public class ServerPlanRequestUtils {
             TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
         Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
             TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
-        requests.add(ServerPlanRequestUtils.build(executionContext, distributedStagePlan, tableConfig, schema,
-            StageMetadata.getTimeBoundary(stageMetadata), TableType.OFFLINE, tableEntry.getValue()));
+        requests.add(ServerPlanRequestUtils.compileInstanceRequest(executionContext, serverContext, stageId,
+            tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.OFFLINE, segmentList));
       } else if (TableType.REALTIME.name().equals(tableType)) {
         TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore,
             TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
         Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
             TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
-        requests.add(ServerPlanRequestUtils.build(executionContext, distributedStagePlan, tableConfig, schema,
-            StageMetadata.getTimeBoundary(stageMetadata), TableType.REALTIME, tableEntry.getValue()));
+        requests.add(ServerPlanRequestUtils.compileInstanceRequest(executionContext, serverContext, stageId,
+            tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.REALTIME, segmentList));
       } else {
         throw new IllegalArgumentException("Unsupported table type key: " + tableType);
       }
@@ -113,32 +166,35 @@ public class ServerPlanRequestUtils {
     return requests;
   }
 
-  private static ServerPlanRequestContext build(OpChainExecutionContext executionContext,
-      DistributedStagePlan stagePlan, TableConfig tableConfig, Schema schema, TimeBoundaryInfo timeBoundaryInfo,
-      TableType tableType, List<String> segmentList) {
-    // Before-visit: construct the ServerPlanRequestContext baseline
+  /**
+   * Convert {@link PinotQuery} into an {@link InstanceRequest}.
+   */
+  private static InstanceRequest compileInstanceRequest(OpChainExecutionContext executionContext,
+      ServerPlanRequestContext serverContext, int stageId, TableConfig tableConfig, Schema schema,
+      TimeBoundaryInfo timeBoundaryInfo, TableType tableType, List<String> segmentList) {
     // Making a unique requestId for leaf stages otherwise it causes problem on stats/metrics/tracing.
-    long requestId = (executionContext.getRequestId() << 16) + ((long) stagePlan.getStageId() << 8) + (
-        tableType == TableType.REALTIME ? 1 : 0);
-    PinotQuery pinotQuery = new PinotQuery();
+    long requestId =
+        (executionContext.getRequestId() << 16) + ((long) stageId << 8) + (tableType == TableType.REALTIME ? 1 : 0);
+    // 1. make a deep copy of the pinotQuery and modify the PinotQuery accordingly
+    PinotQuery pinotQuery = new PinotQuery(serverContext.getPinotQuery());
+    //  - attach leaf node limit
     Integer leafNodeLimit = QueryOptionsUtils.getMultiStageLeafLimit(executionContext.getOpChainMetadata());
     if (leafNodeLimit != null) {
       pinotQuery.setLimit(leafNodeLimit);
     } else {
       pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT);
     }
-    LOGGER.debug("QueryID" + requestId + " leafNodeLimit:" + leafNodeLimit);
-    pinotQuery.setExplain(false);
-    ServerPlanRequestContext serverContext = new ServerPlanRequestContext(executionContext, pinotQuery, tableType);
-
-    // visit the plan and create query physical plan.
-    ServerPlanRequestVisitor.walkStageNode(stagePlan.getStageRoot(), serverContext);
-
-    // Post-visit: finalize context.
-    // 1. global rewrite/optimize
+    //   - attach table type
+    DataSource dataSource = pinotQuery.getDataSource();
+    String rawTableName = dataSource.getTableName();
+    String tableNameWithType = TableNameBuilder.forType(tableType).tableNameWithType(rawTableName);
+    dataSource.setTableName(tableNameWithType);
+    pinotQuery.setDataSource(dataSource);
+    //   - attach time boundary.
     if (timeBoundaryInfo != null) {
       attachTimeBoundary(pinotQuery, timeBoundaryInfo, tableType == TableType.OFFLINE);
     }
+    //   - perform global rewrite/optimize
     for (QueryRewriter queryRewriter : QUERY_REWRITERS) {
       pinotQuery = queryRewriter.rewrite(pinotQuery);
     }
@@ -147,15 +203,12 @@ public class ServerPlanRequestUtils {
     // 2. set pinot query options according to requestMetadataMap
     updateQueryOptions(pinotQuery, executionContext);
 
-    // 3. wrapped around in broker request
+    // 3. wrapped around in broker request and replace with actual table name with type.
     BrokerRequest brokerRequest = new BrokerRequest();
     brokerRequest.setPinotQuery(pinotQuery);
-    DataSource dataSource = pinotQuery.getDataSource();
-    if (dataSource != null) {
-      QuerySource querySource = new QuerySource();
-      querySource.setTableName(dataSource.getTableName());
-      brokerRequest.setQuerySource(querySource);
-    }
+    QuerySource querySource = new QuerySource();
+    querySource.setTableName(dataSource.getTableName());
+    brokerRequest.setQuerySource(querySource);
 
     // 3. create instance request with segmentList
     InstanceRequest instanceRequest = new InstanceRequest();
@@ -165,8 +218,7 @@ public class ServerPlanRequestUtils {
     instanceRequest.setSearchSegments(segmentList);
     instanceRequest.setQuery(brokerRequest);
 
-    serverContext.setInstanceRequest(instanceRequest);
-    return serverContext;
+    return instanceRequest;
   }
 
   /**
@@ -211,13 +263,19 @@ public class ServerPlanRequestUtils {
     List<Expression> expressions = new ArrayList<>();
     for (int i = 0; i < leftJoinKeys.size(); i++) {
       Expression leftExpr = pinotQuery.getSelectList().get(leftJoinKeys.get(i));
-      int rightIdx = rightJoinKeys.get(i);
-      Expression inFilterExpr = RequestUtils.getFunctionExpression(FilterKind.IN.name());
-      List<Expression> operands = new ArrayList<>(dataContainer.size() + 1);
-      operands.add(leftExpr);
-      operands.addAll(computeInOperands(dataContainer, dataSchema, rightIdx));
-      inFilterExpr.getFunctionCall().setOperands(operands);
-      expressions.add(inFilterExpr);
+      if (dataContainer.size() == 0) {
+        // put a constant false expression
+        Expression constantFalseExpr = RequestUtils.getLiteralExpression(false);
+        expressions.add(constantFalseExpr);
+      } else {
+        int rightIdx = rightJoinKeys.get(i);
+        Expression inFilterExpr = RequestUtils.getFunctionExpression(FilterKind.IN.name());
+        List<Expression> operands = new ArrayList<>(dataContainer.size() + 1);
+        operands.add(leftExpr);
+        operands.addAll(computeInOperands(dataContainer, dataSchema, rightIdx));
+        inFilterExpr.getFunctionCall().setOperands(operands);
+        expressions.add(inFilterExpr);
+      }
     }
     attachFilterExpression(pinotQuery, FilterKind.AND, expressions);
   }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
index e451c42815..afab2a8259 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
@@ -44,6 +44,7 @@ import org.apache.pinot.query.planner.plannode.ValueNode;
 import org.apache.pinot.query.planner.plannode.WindowNode;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 
@@ -65,38 +66,60 @@ public class ServerPlanRequestVisitor implements PlanNodeVisitor<Void, ServerPla
 
   @Override
   public Void visitAggregate(AggregateNode node, ServerPlanRequestContext context) {
-    visitChildren(node, context);
-    // set group-by list
-    context.getPinotQuery()
-        .setGroupByList(CalciteRexExpressionParser.convertGroupByList(node.getGroupSet(), context.getPinotQuery()));
-    // set agg list
-    context.getPinotQuery().setSelectList(
-        CalciteRexExpressionParser.convertAggregateList(context.getPinotQuery().getGroupByList(), node.getAggCalls(),
-            node.getFilterArgIndices(), context.getPinotQuery()));
+    if (visit(node.getInputs().get(0), context)) {
+      PinotQuery pinotQuery = context.getPinotQuery();
+      if (pinotQuery.getGroupByList() == null) {
+        // set group-by list
+        pinotQuery.setGroupByList(CalciteRexExpressionParser.convertGroupByList(node.getGroupSet(), pinotQuery));
+        // set agg list
+        pinotQuery.setSelectList(
+            CalciteRexExpressionParser.convertAggregateList(pinotQuery.getGroupByList(), node.getAggCalls(),
+                node.getFilterArgIndices(), pinotQuery));
+        if (node.getAggType() == AggregateNode.AggType.DIRECT) {
+          pinotQuery.putToQueryOptions(CommonConstants.Broker.Request.QueryOptionKey.SERVER_RETURN_FINAL_RESULT,
+              "true");
+        }
+        // there cannot be any more modification of PinotQuery post agg, thus this is the last one possible.
+        context.setLeafStageBoundaryNode(node);
+      }
+    }
     return null;
   }
 
   @Override
   public Void visitWindow(WindowNode node, ServerPlanRequestContext context) {
-    throw new UnsupportedOperationException("Window not yet supported!");
+    if (visit(node.getInputs().get(0), context)) {
+      // window node is not runnable on leaf, setting it to boundary directly
+      context.setLeafStageBoundaryNode(node.getInputs().get(0));
+    }
+    return null;
   }
 
   @Override
   public Void visitSetOp(SetOpNode node, ServerPlanRequestContext context) {
-    visitChildren(node, context);
+    if (visit(node.getInputs().get(0), context)) {
+      // Set node is not runnable on leaf, setting it to boundary directly
+      context.setLeafStageBoundaryNode(node.getInputs().get(0));
+    }
     return null;
   }
 
   @Override
   public Void visitExchange(ExchangeNode exchangeNode, ServerPlanRequestContext context) {
-    throw new UnsupportedOperationException("Exchange not yet supported!");
+    throw new UnsupportedOperationException("Leaf stage should not visit ExchangeNode!");
   }
 
   @Override
   public Void visitFilter(FilterNode node, ServerPlanRequestContext context) {
-    visitChildren(node, context);
-    context.getPinotQuery()
-        .setFilterExpression(CalciteRexExpressionParser.toExpression(node.getCondition(), context.getPinotQuery()));
+    if (visit(node.getInputs().get(0), context)) {
+      PinotQuery pinotQuery = context.getPinotQuery();
+      if (pinotQuery.getFilterExpression() == null) {
+        pinotQuery.setFilterExpression(CalciteRexExpressionParser.toExpression(node.getCondition(), pinotQuery));
+      } else {
+        // if filter is already applied then it cannot have another one on leaf.
+        context.setLeafStageBoundaryNode(node.getInputs().get(0));
+      }
+    }
     return null;
   }
 
@@ -109,62 +132,63 @@ public class ServerPlanRequestVisitor implements PlanNodeVisitor<Void, ServerPla
       dynamicSide = node.getInputs().get(0);
       staticSide = node.getInputs().get(1);
     }
-    staticSide.visit(this, context);
-    PipelineBreakerResult pipelineBreakerResult = context.getExecutionContext().getPipelineBreakerResult();
-    int resultMapId = pipelineBreakerResult.getNodeIdMap().get(dynamicSide);
-    List<TransferableBlock> transferableBlocks = pipelineBreakerResult.getResultMap().getOrDefault(
-        resultMapId, Collections.emptyList());
-    List<Object[]> resultDataContainer = new ArrayList<>();
-    DataSchema dataSchema = dynamicSide.getDataSchema();
-    for (TransferableBlock block : transferableBlocks) {
-      if (block.getType() == DataBlock.Type.ROW) {
-        resultDataContainer.addAll(block.getContainer());
+    if (visit(staticSide, context)) {
+      PipelineBreakerResult pipelineBreakerResult = context.getPipelineBreakerResult();
+      int resultMapId = pipelineBreakerResult.getNodeIdMap().get(dynamicSide);
+      List<TransferableBlock> transferableBlocks =
+          pipelineBreakerResult.getResultMap().getOrDefault(resultMapId, Collections.emptyList());
+      List<Object[]> resultDataContainer = new ArrayList<>();
+      DataSchema dataSchema = dynamicSide.getDataSchema();
+      for (TransferableBlock block : transferableBlocks) {
+        if (block.getType() == DataBlock.Type.ROW) {
+          resultDataContainer.addAll(block.getContainer());
+        }
       }
-    }
-
-    if (!resultDataContainer.isEmpty()) {
-      // rewrite SEMI-JOIN as filter clause.
       ServerPlanRequestUtils.attachDynamicFilter(context.getPinotQuery(), node.getJoinKeys(), resultDataContainer,
           dataSchema);
-    } else {
-      // do not pull any data out, this is constant false filter.
-      context.getPinotQuery().setLimit(0);
     }
     return null;
   }
 
   @Override
   public Void visitMailboxReceive(MailboxReceiveNode node, ServerPlanRequestContext context) {
-    visitChildren(node, context);
-    return null;
+    throw new UnsupportedOperationException("Leaf stage should not visit MailboxReceiveNode!");
   }
 
   @Override
   public Void visitMailboxSend(MailboxSendNode node, ServerPlanRequestContext context) {
-    visitChildren(node, context);
+    if (visit(node.getInputs().get(0), context)) {
+      context.setLeafStageBoundaryNode(node.getInputs().get(0));
+    }
     return null;
   }
 
   @Override
   public Void visitProject(ProjectNode node, ServerPlanRequestContext context) {
-    visitChildren(node, context);
-    context.getPinotQuery()
-        .setSelectList(CalciteRexExpressionParser.convertProjectList(node.getProjects(), context.getPinotQuery()));
+    if (visit(node.getInputs().get(0), context)) {
+      PinotQuery pinotQuery = context.getPinotQuery();
+      pinotQuery.setSelectList(CalciteRexExpressionParser.convertProjectList(node.getProjects(), pinotQuery));
+    }
     return null;
   }
 
   @Override
   public Void visitSort(SortNode node, ServerPlanRequestContext context) {
-    visitChildren(node, context);
-    PinotQuery pinotQuery = context.getPinotQuery();
-    if (!node.getCollationKeys().isEmpty()) {
-      pinotQuery.setOrderByList(CalciteRexExpressionParser.convertOrderByList(node, pinotQuery));
-    }
-    if (node.getFetch() >= 0) {
-      pinotQuery.setLimit(node.getFetch());
-    }
-    if (node.getOffset() >= 0) {
-      pinotQuery.setOffset(node.getOffset());
+    if (visit(node.getInputs().get(0), context)) {
+      PinotQuery pinotQuery = context.getPinotQuery();
+      if (pinotQuery.getOrderByList() == null) {
+        if (!node.getCollationKeys().isEmpty()) {
+          pinotQuery.setOrderByList(CalciteRexExpressionParser.convertOrderByList(node, pinotQuery));
+        }
+        if (node.getFetch() >= 0) {
+          pinotQuery.setLimit(node.getFetch());
+        }
+        if (node.getOffset() >= 0) {
+          pinotQuery.setOffset(node.getOffset());
+        }
+      } else {
+        context.setLeafStageBoundaryNode(node.getInputs().get(0));
+      }
     }
     return null;
   }
@@ -172,9 +196,10 @@ public class ServerPlanRequestVisitor implements PlanNodeVisitor<Void, ServerPla
   @Override
   public Void visitTableScan(TableScanNode node, ServerPlanRequestContext context) {
     DataSource dataSource = new DataSource();
-    String tableNameWithType = TableNameBuilder.forType(context.getTableType())
-        .tableNameWithType(TableNameBuilder.extractRawTableName(node.getTableName()));
-    dataSource.setTableName(tableNameWithType);
+    // construct the PinotQuery object with raw table name.
+    // later it will be converted into the actual table name with type.
+    String rawTableName = TableNameBuilder.extractRawTableName(node.getTableName());
+    dataSource.setTableName(rawTableName);
     context.getPinotQuery().setDataSource(dataSource);
     context.getPinotQuery().setSelectList(
         node.getTableScanColumns().stream().map(RequestUtils::getIdentifierExpression).collect(Collectors.toList()));
@@ -183,13 +208,11 @@ public class ServerPlanRequestVisitor implements PlanNodeVisitor<Void, ServerPla
 
   @Override
   public Void visitValue(ValueNode node, ServerPlanRequestContext context) {
-    visitChildren(node, context);
-    return null;
+    throw new UnsupportedOperationException("Leaf stage should not visit ValueNode!");
   }
 
-  private void visitChildren(PlanNode node, ServerPlanRequestContext context) {
-    for (PlanNode child : node.getInputs()) {
-      child.visit(this, context);
-    }
+  private boolean visit(PlanNode node, ServerPlanRequestContext context) {
+    node.visit(this, context);
+    return context.getLeafStageBoundaryNode() == null;
   }
 }
diff --git a/pinot-query-runtime/src/test/resources/queries/FromExpressions.json b/pinot-query-runtime/src/test/resources/queries/FromExpressions.json
index 7e91824304..bdde6a38c9 100644
--- a/pinot-query-runtime/src/test/resources/queries/FromExpressions.json
+++ b/pinot-query-runtime/src/test/resources/queries/FromExpressions.json
@@ -136,6 +136,16 @@
         "description": "sub-query to semi-join syntax with star results, using IN clause",
         "sql": "SELECT * FROM {tbl1} WHERE num IN (SELECT num FROM {tbl2})"
       },
+      {
+        "psql": "7.2.1.3",
+        "description": "sub-query to semi-join syntax with star results, using IN clause multiple times",
+        "sql": "SELECT * FROM {tbl1} WHERE num IN (SELECT num FROM {tbl2}) AND name IN (SELECT val FROM {tbl2} WHERE num = 3)"
+      },
+      {
+        "psql": "7.2.1.3",
+        "description": "sub-query to semi-join syntax with star results, using IN clause multiple times with grouping",
+        "sql": "SELECT num, COUNT(*) FROM {tbl1} WHERE num IN (SELECT num FROM {tbl2}) AND name IN (SELECT val FROM {tbl2} WHERE num = 3) GROUP BY num"
+      },
       {
         "psql": "7.2.1.3",
         "description": "sub-query to multiple anti semi-join queries with star results, using NOT IN clause",
diff --git a/pinot-query-runtime/src/test/resources/queries/QueryHints.json b/pinot-query-runtime/src/test/resources/queries/QueryHints.json
index 84079a33eb..cd4856a7e7 100644
--- a/pinot-query-runtime/src/test/resources/queries/QueryHints.json
+++ b/pinot-query-runtime/src/test/resources/queries/QueryHints.json
@@ -69,6 +69,10 @@
         "description": "Group by partition column with partition parallelism",
         "sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4', partition_parallelism='2') */ GROUP BY {tbl1}.num"
       },
+      {
+        "description": "Skip leaf stage aggregation with GROUP BY hint",
+        "sql": "SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */ {tbl1}.name, COUNT(*), SUM({tbl1}.num), MIN({tbl1}.num) FROM {tbl1} WHERE {tbl1}.num >= 0 GROUP BY {tbl1}.name"
+      },
       {
         "description": "Colocated JOIN with partition column",
         "sql": "SELECT {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num"
@@ -112,10 +116,6 @@
       {
         "description": "Colocated, Dynamic broadcast SEMI-JOIN with partially empty right table result for some servers",
         "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ {tbl1}.name, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val = 'z') GROUP BY {tbl1}.name"
-      },
-      {
-        "description": "Skip leaf stage aggregation with GROUP BY hint",
-        "sql": "SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */ {tbl1}.name, COUNT(*), SUM({tbl1}.num), MIN({tbl1}.num) FROM {tbl1} WHERE {tbl1}.num >= 0 GROUP BY {tbl1}.name"
       }
     ]
   },
@@ -194,5 +194,182 @@
         "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ {tbl1}.name, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='2') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val = 'z') GROUP BY {tbl1}.name"
       }
     ]
+  },
+  "hint_option_join_queries": {
+    "comments": "this section specifically dealing with mixing INNER-JOIN/SEMI-JOIN coupled with SORT/AGG on top or partition table scan on bottom",
+    "tables": {
+      "tbl1": {
+        "schema": [
+          {"name": "num", "type": "INT"},
+          {"name": "name", "type": "STRING"},
+          {"name": "val", "type": "LONG"}
+        ],
+        "inputs": [
+          [1, "a", 23],
+          [2, "b", 34],
+          [3, "c", 45],
+          [3, "yyy", 56],
+          [4, "e", 12],
+          [4, "e", 23],
+          [6, "e", 34],
+          [7, "d", 45],
+          [7, "f", 56],
+          [8, "z", 67]
+        ],
+        "partitionColumns": [
+          "num"
+        ]
+      },
+      "tbl2": {
+        "schema": [
+          {"name": "num", "type": "INT"},
+          {"name": "id", "type": "STRING"},
+          {"name": "data", "type": "INT"}
+        ],
+        "inputs": [
+          [1, "xxx", 23],
+          [1, "xxx", 34],
+          [3, "yyy", 45],
+          [3, "zzz", 56],
+          [5, "zzz", 12],
+          [6, "e", 23],
+          [7, "d", 34],
+          [8, "z", 45]
+        ],
+        "partitionColumns": [
+          "num"
+        ]
+      },
+      "tbl3": {
+        "schema": [
+          {"name": "num", "type": "INT"},
+          {"name": "id", "type": "STRING"},
+          {"name": "data", "type": "INT"}
+        ],
+        "inputs": [
+          [1, "xxx", 12],
+          [2, "xxx", 23],
+          [3, "yyy", 34],
+          [4, "yyy", 45],
+          [5, "zzz", 12]
+        ]
+      }
+    },
+    "queries": [
+      {
+        "description": "Inner join with group by",
+        "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.name, AVG({tbl2}.data) FROM {tbl1} JOIN {tbl2} ON {tbl1}.name = {tbl2}.id  WHERE {tbl1}.val >= 0 AND {tbl1}.name != 'a' AND {tbl2}.data < 0 GROUP BY {tbl1}.name"
+      },
+      {
+        "description": "semi-join with dynamic_broadcast join strategy",
+        "sql": "SELECT {tbl1}.name, {tbl1}.num FROM {tbl1} WHERE {tbl1}.name IN (SELECT id FROM {tbl2} WHERE {tbl2}.data > 0)"
+      },
+      {
+        "description": "semi-join with multi-ple dynamic_broadcast join strategy then group-by on same key",
+        "sql": "SELECT {tbl1}.name, {tbl1}.num FROM {tbl1} WHERE {tbl1}.name IN (SELECT id FROM {tbl2} WHERE {tbl2}.data > 0) AND {tbl1}.num IN (select num FROM {tbl3} WHERE {tbl3}.data > 0)"
+      },
+      {
+        "description": "semi-join with multi-ple dynamic_broadcast join strategy then group-by",
+        "sql": "SELECT {tbl1}.name, SUM({tbl1}.val) FROM {tbl1} WHERE {tbl1}.name IN (SELECT id FROM {tbl2} WHERE {tbl2}.data > 0) AND {tbl1}.num IN (select num FROM {tbl3} WHERE {tbl3}.data > 0) GROUP BY {tbl1}.name"
+      },
+      {
+        "description": "semi-join with dynamic_broadcast join strategy then group-by on same key",
+        "sql": "SELECT /*+ aggOptionsInternal(agg_type='DIRECT') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} WHERE {tbl1}.name IN (SELECT id FROM {tbl2} WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num"
+      },
+      {
+        "description": "semi-join with dynamic_broadcast join strategy then group-by on different key",
+        "sql": "SELECT {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} WHERE {tbl1}.name IN (SELECT id FROM {tbl2} WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num"
+      },
+      {
+        "description": "aggregate with skip leaf stage hint, group by aggregate",
+        "sql": "SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */ {tbl1}.num, {tbl1}.name, SUM({tbl1}.val) FROM {tbl1} WHERE {tbl1}.val >= 0 AND {tbl1}.name != 'a' GROUP BY {tbl1}.name, {tbl1}.num"
+      },
+      {
+        "description": "aggregate with skip leaf stage hint, group by aggregate with having clause",
+        "sql": "SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */ {tbl1}.num, COUNT(*), SUM({tbl1}.val), SUM({tbl1}.num) FROM {tbl1} WHERE {tbl1}.val >= 0 AND {tbl1}.name != 'a' GROUP BY {tbl1}.num HAVING COUNT(*) > 10 AND MAX({tbl1}.val) >= 0 AND MIN({tbl1}.val) < 20 AND SUM({tbl1}.val) <= 10 AND AVG({tbl1}.val) = 5"
+      },
+      {
+        "description": "aggregate with skip intermediate stage hint (via hinting the leaf stage group by as final stage_",
+        "sql": "SELECT /*+ aggOptionsInternal(agg_type='DIRECT') */ {tbl1}.num, COUNT(*), SUM({tbl1}.val), SUM({tbl1}.num) FROM {tbl1} WHERE {tbl1}.val >= 0 AND {tbl1}.name != 'a' GROUP BY {tbl1}.num HAVING COUNT(*) > 10"
+      },
+      {
+        "description": "aggregate with skip leaf stage hint (via hint option is_partitioned_by_group_by_keys",
+        "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, COUNT(*), SUM({tbl1}.val), SUM({tbl1}.num) FROM {tbl1} WHERE {tbl1}.val >= 0 AND {tbl1}.name != 'a' GROUP BY {tbl1}.num"
+      },
+      {
+        "description": "join with colocated tables",
+        "sql": "SELECT {tbl1}.num, {tbl1}.val, {tbl2}.data FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0"
+      },
+      {
+        "description": "group by with pre-partitioned tables on partition column",
+        "sql": "SELECT {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ GROUP BY {tbl1}.num"
+      },
+      {
+        "description": "group by with pre-partitioned tables on non-partition column",
+        "ignored": true,
+        "comment": "tableOption usage implies direct exchange (e.g. without shuffle) this should be fix in the future",
+        "sql": "SELECT {tbl1}.name, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ GROUP BY {tbl1}.name"
+      },
+      {
+        "description": "join with colocated tables then group-by left join key column",
+        "sql": "SELECT {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0 GROUP BY {tbl1}.num"
+      },
+      {
+        "description": "join with colocated tables then group-by left join key column and other columns",
+        "sql": "SELECT {tbl1}.num, {tbl2}.id, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0 GROUP BY {tbl1}.num, {tbl2}.id"
+      },
+      {
+        "description": "join with colocated tables then group-by other columns",
+        "sql": "SELECT {tbl2}.id, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0 GROUP BY {tbl2}.id"
+      },
+      {
+        "description": "agg + semi-join on colocated tables then group by on partition column",
+        "sql": "SELECT {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num"
+      },
+      {
+        "description": "agg + semi-join on pre-partitioned main tables then group by on partition column",
+        "sql": "SELECT {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num"
+      },
+      {
+        "description": "agg + semi-join on pre-partitioned main tables then group by on non-partitioned column",
+        "sql": "SELECT {tbl1}.name, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} WHERE {tbl2}.data > 0) GROUP BY {tbl1}.name"
+      },
+      {
+        "description": "agg + semi-join on pre-partitioned main tables with group by on partitioned column on semi table",
+        "sql": "SELECT {tbl1}.name, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0 GROUP BY num HAVING COUNT(*) > 1) GROUP BY {tbl1}.name"
+      },
+      {
+        "description": "agg + semi-join on pre-partitioned main tables with group by on partitioned column with having filter on top of semi join",
+        "sql": "SELECT {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num HAVING COUNT(*) > 5"
+      },
+      {
+        "description": "agg + semi-join on pre-partitioned main tables with group by on partitioned column with sorting on top of semi join",
+        "sql": "SELECT {tbl1}.name, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.name ORDER BY SUM({tbl1}.val) DESC"
+      },
+      {
+        "description": "co-partition agg + semi-join with colocated tables & agg hint",
+        "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num"
+      },
+      {
+        "description": "co-partition agg + semi-join with single table partition & agg hint",
+        "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num"
+      },
+      {
+        "description": "co-partition agg + semi-join on pre-partitioned main tables with group by on partitioned column with having filter on top of semi join",
+        "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num HAVING COUNT(*) > 5"
+      },
+      {
+        "description": "co-partition agg + semi-join on pre-partitioned main tables with group by on partitioned column with sorting on top of semi join colocated on partition key",
+        "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num ORDER BY SUM({tbl1}.val) DESC"
+      },
+      {
+        "description": "semi-join on pre-partitioned main tables with sorting on top of semi join on join key",
+        "sql": "SELECT {tbl1}.num FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) ORDER BY {tbl1}.num DESC"
+      },
+      {
+        "description": "semi-join on pre-partitioned main tables with sorting on top of semi join with non-join columns",
+        "sql": "SELECT {tbl1}.name, {tbl1}.val FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) ORDER BY {tbl1}.val DESC"
+      }
+    ]
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org