You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/11/17 16:48:40 UTC

(pinot) branch master updated: [multistage][feature] support RelDistribution trait planning (#11976)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 09da0eac74 [multistage][feature] support RelDistribution trait planning (#11976)
09da0eac74 is described below

commit 09da0eac748d15dee53449c517e83c862dc238b6
Author: Rong Rong <ro...@apache.org>
AuthorDate: Fri Nov 17 08:48:34 2023 -0800

    [multistage][feature] support RelDistribution trait planning (#11976)
    
    * trait rule to attach rel-distribution to RelNode
        * explicitly mark MailboxSendNode as partitioned in explain, makes it easy to spot
        * add back 'is_colocated_by_join_keys' option
    * adjust rules for partition worker/mailbox assignment
    * enhanced physical explain now to partition info and inlined mailbox send/receive naming
    * testing
        * fix test physical plan that currently doesn't consist leaf boundary exchange
        * add more runtime test to ensure the reuslts correct after the hinted hash/partition exchange works
    
    ---------
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../apache/calcite/rel/hint/PinotHintOptions.java  |   4 +
 .../rel/rules/PinotJoinToDynamicBroadcastRule.java |  11 +-
 .../rel/rules/PinotRelDistributionTraitRule.java   | 177 +++++++++
 .../org/apache/pinot/query/QueryEnvironment.java   | 102 +++--
 .../apache/pinot/query/context/PlannerContext.java |  13 +-
 .../query/planner/PhysicalExplainPlanVisitor.java  |  55 ++-
 .../query/planner/logical/LogicalPlanner.java      |   6 +-
 .../planner/logical/PinotLogicalQueryPlanner.java  |   2 +-
 .../query/planner/logical/PlanFragmenter.java      |   3 +-
 .../planner/logical/RelToPlanNodeConverter.java    |   7 +-
 .../planner/physical/DispatchablePlanMetadata.java |  12 +-
 .../planner/physical/DispatchablePlanVisitor.java  |   3 +-
 .../planner/physical/MailboxAssignmentVisitor.java |   3 +-
 .../pinot/query/planner/plannode/ExchangeNode.java |  10 +-
 .../query/planner/plannode/MailboxSendNode.java    |  21 +-
 .../apache/pinot/query/routing/WorkerManager.java  |   4 +-
 .../apache/pinot/query/QueryCompilationTest.java   |  12 +-
 .../resources/queries/ExplainPhysicalPlans.json    | 414 ++++++++++++++++++---
 .../src/test/resources/queries/QueryHints.json     | 196 +++++++++-
 19 files changed, 916 insertions(+), 139 deletions(-)

diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
index ba92d43741..b97d3107bc 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
@@ -80,6 +80,10 @@ public class PinotHintOptions {
      *   BREAK: Break right table build process, continue to perform JOIN operation, results might be partial.
      */
     public static final String JOIN_OVERFLOW_MODE = "join_overflow_mode";
+    /**
+     * Indicat that the join operator(s) within a certain selection scope are colocated
+     */
+    public static final String IS_COLOCATED_BY_JOIN_KEYS = "is_colocated_by_join_keys";
   }
 
   public static class TableHintOptions {
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
index e4cc9852c0..1e80585c7d 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
@@ -155,8 +155,15 @@ public class PinotJoinToDynamicBroadcastRule extends RelOptRule {
     PinotLogicalExchange right = (PinotLogicalExchange) (join.getRight() instanceof HepRelVertex
         ? ((HepRelVertex) join.getRight()).getCurrentRel() : join.getRight());
 
-    PinotLogicalExchange dynamicBroadcastExchange =
-        PinotLogicalExchange.create(right.getInput(), RelDistributions.BROADCAST_DISTRIBUTED,
+    // when colocated join hint is given, dynamic broadcast exchange can be hash-distributed b/c
+    //    1. currently, dynamic broadcast only works against main table off leaf-stage; (e.g. receive node on leaf)
+    //    2. when hash key are the same but hash functions are different, it can be done via normal hash shuffle.
+    boolean isColocatedJoin = PinotHintStrategyTable.isHintOptionTrue(join.getHints(),
+        PinotHintOptions.JOIN_HINT_OPTIONS, PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
+    PinotLogicalExchange dynamicBroadcastExchange = isColocatedJoin
+        ? PinotLogicalExchange.create(right.getInput(), RelDistributions.hash(join.analyzeCondition().rightKeys),
+        PinotRelExchangeType.PIPELINE_BREAKER)
+        : PinotLogicalExchange.create(right.getInput(), RelDistributions.BROADCAST_DISTRIBUTED,
             PinotRelExchangeType.PIPELINE_BREAKER);
     Join dynamicFilterJoin =
         new LogicalJoin(join.getCluster(), join.getTraitSet(), left.getInput(), dynamicBroadcastExchange,
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRelDistributionTraitRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRelDistributionTraitRule.java
new file mode 100644
index 0000000000..71008e4fd9
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRelDistributionTraitRule.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelDistributions;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.hint.PinotHintOptions;
+import org.apache.calcite.rel.hint.PinotHintStrategyTable;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.logical.PinotLogicalExchange;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.mapping.Mappings;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+
+
+/**
+ * Special rule for Pinot, this rule populates {@link RelDistribution} across the entire relational tree.
+ *
+ * we implement this rule as a workaround b/c {@link org.apache.calcite.plan.RelTraitPropagationVisitor}, which is
+ * deprecated. The idea is to associate every node with a RelDistribution derived from {@link RelNode#getInputs()}
+ * or from the node itself (via hints, or special handling of the type of node in question).
+ */
+public class PinotRelDistributionTraitRule extends RelOptRule {
+  public static final PinotRelDistributionTraitRule INSTANCE =
+      new PinotRelDistributionTraitRule(PinotRuleUtils.PINOT_REL_FACTORY);
+
+  public PinotRelDistributionTraitRule(RelBuilderFactory factory) {
+    super(operand(RelNode.class, any()));
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    return call.rels.length >= 1;
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    RelNode current = call.rel(0);
+    List<RelNode> inputs = current.getInputs();
+    RelDistribution relDistribution;
+
+    if (inputs == null || inputs.size() == 0) {
+      relDistribution = computeCurrentDistribution(current);
+    } else {
+      // if there's input to the current node, attempt to derive the RelDistribution.
+      relDistribution = deriveDistribution(current);
+    }
+    call.transformTo(attachTrait(current, relDistribution));
+  }
+
+  /**
+   * currently, Pinot has {@link RelTraitSet} default set to empty and thus we directly pull the cluster trait set,
+   * then plus the {@link RelDistribution} trait.
+   */
+  private static RelNode attachTrait(RelNode relNode, RelTrait trait) {
+    RelTraitSet clusterTraitSet = relNode.getCluster().traitSet();
+    if (relNode instanceof LogicalJoin) {
+      // work around {@link LogicalJoin#copy(RelTraitSet, RexNode, RelNode, RelNode, JoinRelType, boolean)} not copying
+      // properly
+      LogicalJoin join = (LogicalJoin) relNode;
+      return new LogicalJoin(join.getCluster(), clusterTraitSet.plus(trait), join.getLeft(),
+          join.getRight(), join.getCondition(), join.getVariablesSet(), join.getJoinType(), join.isSemiJoinDone(),
+          ImmutableList.copyOf(join.getSystemFieldList()));
+    } else if (relNode instanceof LogicalTableScan) {
+      LogicalTableScan tableScan = (LogicalTableScan) relNode;
+      return new LogicalTableScan(tableScan.getCluster(), clusterTraitSet.plus(trait), tableScan.getTable());
+    } else {
+      return relNode.copy(clusterTraitSet.plus(trait), relNode.getInputs());
+    }
+  }
+
+  private static RelDistribution deriveDistribution(RelNode node) {
+    List<RelNode> inputs = node.getInputs();
+    RelNode input = PinotRuleUtils.unboxRel(inputs.get(0));
+    if (node instanceof PinotLogicalExchange) {
+      // TODO: derive from input first, only if the result is ANY we change it to current
+      return computeCurrentDistribution(node);
+    } else if (node instanceof LogicalProject) {
+      assert inputs.size() == 1;
+      RelDistribution inputRelDistribution = input.getTraitSet().getDistribution();
+      LogicalProject project = (LogicalProject) node;
+      try {
+        if (inputRelDistribution != null) {
+          return inputRelDistribution.apply(project.getMapping());
+        }
+      } catch (Exception e) {
+        // ... skip;
+      }
+    } else if (node instanceof LogicalFilter) {
+      assert inputs.size() == 1;
+      RelDistribution inputRelDistribution = input.getTraitSet().getDistribution();
+      if (inputRelDistribution != null) {
+        return inputRelDistribution;
+      }
+    } else if (node instanceof LogicalAggregate) {
+      assert inputs.size() == 1;
+      RelDistribution inputRelDistribution = inputs.get(0).getTraitSet().getDistribution();
+      if (inputRelDistribution != null) {
+        // create a mapping that only contains the group set
+        LogicalAggregate agg = (LogicalAggregate) node;
+        List<Integer> groupSetIndices = new ArrayList<>();
+        agg.getGroupSet().forEach(groupSetIndices::add);
+        return inputRelDistribution.apply(Mappings.target(groupSetIndices, input.getRowType().getFieldCount()));
+      }
+    } else if (node instanceof LogicalJoin) {
+      // TODO: we only map a single RelTrait from the LEFT table, later we should support RIGHT table as well
+      assert inputs.size() == 2;
+      RelDistribution inputRelDistribution = inputs.get(0).getTraitSet().getDistribution();
+      if (inputRelDistribution != null) {
+        // Since we only support LEFT RelTrait propagation, the inputRelDistribution can directly be applied
+        // b/c the Join node always puts left relation RowTypes then right relation RowTypes sequentially.
+        return inputRelDistribution;
+      }
+    }
+    // TODO: add the rest of the nodes.
+    return computeCurrentDistribution(node);
+  }
+
+  private static RelDistribution computeCurrentDistribution(RelNode node) {
+    if (node instanceof Exchange) {
+      return ((Exchange) node).getDistribution();
+    } else if (node instanceof LogicalTableScan) {
+      LogicalTableScan tableScan = (LogicalTableScan) node;
+      // convert table scan hints into rel trait
+      String partitionKey =
+          PinotHintStrategyTable.getHintOption(tableScan.getHints(), PinotHintOptions.TABLE_HINT_OPTIONS,
+              PinotHintOptions.TableHintOptions.PARTITION_KEY);
+      if (partitionKey != null) {
+        int partitionIndex = tableScan.getRowType().getField(partitionKey, true, true).getIndex();
+        return RelDistributions.hash(ImmutableList.of(partitionIndex));
+      } else {
+        return RelDistributions.of(RelDistribution.Type.RANDOM_DISTRIBUTED, RelDistributions.EMPTY);
+      }
+    } else if (node instanceof LogicalAggregate) {
+      LogicalAggregate agg = (LogicalAggregate) node;
+      AggregateNode.AggType aggType = AggregateNode.AggType.valueOf(PinotHintStrategyTable.getHintOption(agg.getHints(),
+          PinotHintOptions.INTERNAL_AGG_OPTIONS, PinotHintOptions.InternalAggregateOptions.AGG_TYPE));
+      if (aggType == AggregateNode.AggType.FINAL || aggType == AggregateNode.AggType.DIRECT) {
+        List<Integer> groupSetIndices = new ArrayList<>();
+        agg.getGroupSet().forEach(groupSetIndices::add);
+        return RelDistributions.hash(groupSetIndices);
+      } else {
+        return RelDistributions.of(RelDistribution.Type.RANDOM_DISTRIBUTED, RelDistributions.EMPTY);
+      }
+    }
+    return RelDistributions.of(RelDistribution.Type.RANDOM_DISTRIBUTED, RelDistributions.EMPTY);
+  }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 4ac42a4306..fc210ed26c 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -29,6 +29,7 @@ import org.apache.calcite.config.CalciteConnectionConfigImpl;
 import org.apache.calcite.config.CalciteConnectionProperty;
 import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.hep.HepMatchOrder;
@@ -42,6 +43,7 @@ import org.apache.calcite.rel.hint.HintStrategyTable;
 import org.apache.calcite.rel.hint.PinotHintStrategyTable;
 import org.apache.calcite.rel.logical.LogicalCorrelate;
 import org.apache.calcite.rel.rules.PinotQueryRuleSets;
+import org.apache.calcite.rel.rules.PinotRelDistributionTraitRule;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.runtime.CalciteContextException;
@@ -89,7 +91,8 @@ public class QueryEnvironment {
   private final Prepare.CatalogReader _catalogReader;
   private final RelDataTypeFactory _typeFactory;
 
-  private final HepProgram _hepProgram;
+  private final HepProgram _optProgram;
+  private final HepProgram _traitProgram;
 
   // Pinot extensions
   private final WorkerManager _workerManager;
@@ -121,38 +124,8 @@ public class QueryEnvironment {
             .addRelBuilderConfigTransform(c -> c.withPushJoinCondition(true))
             .addRelBuilderConfigTransform(c -> c.withAggregateUnique(true)))
         .build();
-
-    HepProgramBuilder hepProgramBuilder = new HepProgramBuilder();
-    // Set the match order as DEPTH_FIRST. The default is arbitrary which works the same as DEPTH_FIRST, but it's
-    // best to be explicit.
-    hepProgramBuilder.addMatchOrder(HepMatchOrder.DEPTH_FIRST);
-
-    // ----
-    // Run the Calcite CORE rules using 1 HepInstruction per rule. We use 1 HepInstruction per rule for simplicity:
-    // the rules used here can rest assured that they are the only ones evaluated in a dedicated graph-traversal.
-    for (RelOptRule relOptRule : PinotQueryRuleSets.BASIC_RULES) {
-      hepProgramBuilder.addRuleInstance(relOptRule);
-    }
-
-    // ----
-    // Run Pinot rule to attach aggregation auxiliary info
-    hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.PINOT_AGG_PROCESS_RULES);
-
-    // ----
-    // Pushdown filters using a single HepInstruction.
-    hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.FILTER_PUSHDOWN_RULES);
-
-    // ----
-    // Prune duplicate/unnecessary nodes using a single HepInstruction.
-    // TODO: We can consider using HepMatchOrder.TOP_DOWN if we find cases where it would help.
-    hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.PRUNE_RULES);
-
-    // ----
-    // Run pinot specific rules that should run after all other rules, using 1 HepInstruction per rule.
-    for (RelOptRule relOptRule : PinotQueryRuleSets.PINOT_POST_RULES) {
-      hepProgramBuilder.addRuleInstance(relOptRule);
-    }
-    _hepProgram = hepProgramBuilder.build();
+    _optProgram = getOptProgram();
+    _traitProgram = getTraitProgram();
   }
 
   /**
@@ -168,7 +141,8 @@ public class QueryEnvironment {
    * @return QueryPlannerResult containing the dispatchable query plan and the relRoot.
    */
   public QueryPlannerResult planQuery(String sqlQuery, SqlNodeAndOptions sqlNodeAndOptions, long requestId) {
-    try (PlannerContext plannerContext = new PlannerContext(_config, _catalogReader, _typeFactory, _hepProgram)) {
+    try (PlannerContext plannerContext = new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram,
+        _traitProgram)) {
       plannerContext.setOptions(sqlNodeAndOptions.getOptions());
       RelRoot relRoot = compileQuery(sqlNodeAndOptions.getSqlNode(), plannerContext);
       // TODO: current code only assume one SubPlan per query, but we should support multiple SubPlans per query.
@@ -196,7 +170,8 @@ public class QueryEnvironment {
    * @return QueryPlannerResult containing the explained query plan and the relRoot.
    */
   public QueryPlannerResult explainQuery(String sqlQuery, SqlNodeAndOptions sqlNodeAndOptions, long requestId) {
-    try (PlannerContext plannerContext = new PlannerContext(_config, _catalogReader, _typeFactory, _hepProgram)) {
+    try (PlannerContext plannerContext = new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram,
+        _traitProgram)) {
       SqlExplain explain = (SqlExplain) sqlNodeAndOptions.getSqlNode();
       plannerContext.setOptions(sqlNodeAndOptions.getOptions());
       RelRoot relRoot = compileQuery(explain.getExplicandum(), plannerContext);
@@ -229,7 +204,8 @@ public class QueryEnvironment {
   }
 
   public List<String> getTableNamesForQuery(String sqlQuery) {
-    try (PlannerContext plannerContext = new PlannerContext(_config, _catalogReader, _typeFactory, _hepProgram)) {
+    try (PlannerContext plannerContext = new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram,
+        _traitProgram)) {
       SqlNode sqlNode = CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery).getSqlNode();
       if (sqlNode.getKind().equals(SqlKind.EXPLAIN)) {
         sqlNode = ((SqlExplain) sqlNode).getExplicandum();
@@ -335,8 +311,12 @@ public class QueryEnvironment {
     // 4. optimize relNode
     // TODO: add support for traits, cost factory.
     try {
-      plannerContext.getRelOptPlanner().setRoot(relRoot.rel);
-      return plannerContext.getRelOptPlanner().findBestExp();
+      RelOptPlanner optPlanner = plannerContext.getRelOptPlanner();
+      optPlanner.setRoot(relRoot.rel);
+      RelNode optimized = optPlanner.findBestExp();
+      RelOptPlanner traitPlanner = plannerContext.getRelTraitPlanner();
+      traitPlanner.setRoot(optimized);
+      return traitPlanner.findBestExp();
     } catch (Exception e) {
       throw new UnsupportedOperationException(
           "Cannot generate a valid execution plan for the given query: " + RelOptUtil.toString(relRoot.rel), e);
@@ -364,4 +344,50 @@ public class QueryEnvironment {
   private HintStrategyTable getHintStrategyTable() {
     return PinotHintStrategyTable.PINOT_HINT_STRATEGY_TABLE;
   }
+
+  private static HepProgram getOptProgram() {
+    HepProgramBuilder hepProgramBuilder = new HepProgramBuilder();
+    // Set the match order as DEPTH_FIRST. The default is arbitrary which works the same as DEPTH_FIRST, but it's
+    // best to be explicit.
+    hepProgramBuilder.addMatchOrder(HepMatchOrder.DEPTH_FIRST);
+
+    // ----
+    // Run the Calcite CORE rules using 1 HepInstruction per rule. We use 1 HepInstruction per rule for simplicity:
+    // the rules used here can rest assured that they are the only ones evaluated in a dedicated graph-traversal.
+    for (RelOptRule relOptRule : PinotQueryRuleSets.BASIC_RULES) {
+      hepProgramBuilder.addRuleInstance(relOptRule);
+    }
+
+    // ----
+    // Run Pinot rule to attach aggregation auxiliary info
+    hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.PINOT_AGG_PROCESS_RULES);
+
+    // ----
+    // Pushdown filters using a single HepInstruction.
+    hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.FILTER_PUSHDOWN_RULES);
+
+    // ----
+    // Prune duplicate/unnecessary nodes using a single HepInstruction.
+    // TODO: We can consider using HepMatchOrder.TOP_DOWN if we find cases where it would help.
+    hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.PRUNE_RULES);
+    return hepProgramBuilder.build();
+  }
+
+  private static HepProgram getTraitProgram() {
+    HepProgramBuilder hepProgramBuilder = new HepProgramBuilder();
+
+    // Set the match order as BOTTOM_UP.
+    hepProgramBuilder.addMatchOrder(HepMatchOrder.BOTTOM_UP);
+
+    // ----
+    // Run pinot specific rules that should run after all other rules, using 1 HepInstruction per rule.
+    for (RelOptRule relOptRule : PinotQueryRuleSets.PINOT_POST_RULES) {
+      hepProgramBuilder.addRuleInstance(relOptRule);
+    }
+
+    // apply RelDistribution trait to all nodes
+    hepProgramBuilder.addRuleInstance(PinotRelDistributionTraitRule.INSTANCE);
+
+    return hepProgramBuilder.build();
+  }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java
index 564fc17468..69a2682afa 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java
@@ -18,12 +18,14 @@
  */
 package org.apache.pinot.query.context;
 
+import java.util.Collections;
 import java.util.Map;
 import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.hep.HepProgram;
 import org.apache.calcite.prepare.PlannerImpl;
 import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.RelDistributionTraitDef;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.sql.validate.SqlValidator;
 import org.apache.calcite.tools.FrameworkConfig;
@@ -43,14 +45,17 @@ public class PlannerContext implements AutoCloseable {
   private final SqlValidator _validator;
 
   private final RelOptPlanner _relOptPlanner;
+  private final LogicalPlanner _relTraitPlanner;
 
   private Map<String, String> _options;
 
   public PlannerContext(FrameworkConfig config, Prepare.CatalogReader catalogReader, RelDataTypeFactory typeFactory,
-      HepProgram hepProgram) {
+      HepProgram optProgram, HepProgram traitProgram) {
     _planner = new PlannerImpl(config);
     _validator = new Validator(config.getOperatorTable(), catalogReader, typeFactory);
-    _relOptPlanner = new LogicalPlanner(hepProgram, Contexts.EMPTY_CONTEXT);
+    _relOptPlanner = new LogicalPlanner(optProgram, Contexts.EMPTY_CONTEXT, config.getTraitDefs());
+    _relTraitPlanner = new LogicalPlanner(traitProgram, Contexts.EMPTY_CONTEXT,
+        Collections.singletonList(RelDistributionTraitDef.INSTANCE));
   }
 
   public PlannerImpl getPlanner() {
@@ -65,6 +70,10 @@ public class PlannerContext implements AutoCloseable {
     return _relOptPlanner;
   }
 
+  public LogicalPlanner getRelTraitPlanner() {
+    return _relTraitPlanner;
+  }
+
   public void setOptions(Map<String, String> options) {
     _options = options;
   }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PhysicalExplainPlanVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PhysicalExplainPlanVisitor.java
index 245f531522..c51d24f5d3 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PhysicalExplainPlanVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PhysicalExplainPlanVisitor.java
@@ -18,9 +18,12 @@
  */
 package org.apache.pinot.query.planner;
 
+import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.stream.Collectors;
 import org.apache.pinot.query.planner.plannode.AggregateNode;
 import org.apache.pinot.query.planner.plannode.ExchangeNode;
@@ -37,6 +40,7 @@ import org.apache.pinot.query.planner.plannode.TableScanNode;
 import org.apache.pinot.query.planner.plannode.ValueNode;
 import org.apache.pinot.query.planner.plannode.WindowNode;
 import org.apache.pinot.query.routing.QueryServerInstance;
+import org.apache.pinot.query.routing.VirtualServerAddress;
 
 
 /**
@@ -92,6 +96,15 @@ public class PhysicalExplainPlanVisitor implements PlanNodeVisitor<StringBuilder
         .toString();
   }
 
+  /**
+   * This wrapper prints out contextual info from {@link Context} before invoking {@link PlanNode#explain()}.
+   * The format of the contextual info is always:
+   *   "`PREFIX`[`FRAGMENT_ID`]@`HOSTNAME`:`PORT`|[`WORKER_ID`(s)] `EXPLAIN`"
+   *
+   * @param node the {@link PlanNode} to be explained
+   * @param context the {@link Context} to be wrapped in front ot plan node explain.
+   * @return stringify format of the explained result wrapped with contextual info.
+   */
   private StringBuilder appendInfo(PlanNode node, Context context) {
     int planFragmentId = node.getPlanFragmentId();
     context._builder
@@ -102,7 +115,9 @@ public class PhysicalExplainPlanVisitor implements PlanNodeVisitor<StringBuilder
         .append(context._host.getHostname())
         .append(':')
         .append(context._host.getQueryServicePort())
-        .append(' ')
+        .append("|[")
+        .append(context._workerId)
+        .append("] ")
         .append(node.explain());
     return context._builder;
   }
@@ -185,16 +200,26 @@ public class PhysicalExplainPlanVisitor implements PlanNodeVisitor<StringBuilder
     return node.getInputs().get(0).visit(this, context.next(false, context._host, context._workerId));
   }
 
+  /**
+   * Print out mailbox sending info.
+   *
+   * Noted that when print out mailbox sending info. the receiving side follows the contextual info format defined in
+   * {@link PhysicalExplainPlanVisitor#appendInfo(PlanNode, Context)}.
+   *
+   * e.g. the RECEIVERs are printed as:
+   *   "{[`FRAGMENT_ID`]@`HOSTNAME`:`PORT`|[`WORKER_ID`(s)]}" and are comma-separated.
+   */
   private StringBuilder appendMailboxSend(MailboxSendNode node, Context context) {
     appendInfo(node, context);
 
     int receiverStageId = node.getReceiverStageId();
-    Map<QueryServerInstance, List<Integer>> servers =
-        _dispatchableSubPlan.getQueryStageList().get(receiverStageId)
-            .getServerInstanceToWorkerIdMap();
+    List<VirtualServerAddress> serverAddressList =
+        _dispatchableSubPlan.getQueryStageList().get(node.getPlanFragmentId())
+            .getWorkerMetadataList().get(context._workerId)
+            .getMailBoxInfosMap().get(receiverStageId).getVirtualAddressList();
+    List<String> serverInstanceToWorkerIdList = stringifyVirtualServerAddresses(serverAddressList);
     context._builder.append("->");
-    String receivers = servers.entrySet().stream()
-        .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
+    String receivers = serverInstanceToWorkerIdList.stream()
         .map(s -> "[" + receiverStageId + "]@" + s)
         .collect(Collectors.joining(",", "{", "}"));
     return context._builder.append(receivers);
@@ -252,7 +277,23 @@ public class PhysicalExplainPlanVisitor implements PlanNodeVisitor<StringBuilder
     }
   }
 
+  public static List<String> stringifyVirtualServerAddresses(List<VirtualServerAddress> serverAddressList) {
+    // using tree map to ensure print order.
+    Map<QueryServerInstance, List<Integer>> serverToWorkerIdMap = new TreeMap<>(
+        Comparator.comparing(QueryServerInstance::toString));
+    for (VirtualServerAddress serverAddress : serverAddressList) {
+      QueryServerInstance server = new QueryServerInstance(serverAddress.hostname(), serverAddress.port(), -1);
+      List<Integer> workerIds = serverToWorkerIdMap.getOrDefault(server, new ArrayList<>());
+      workerIds.add(serverAddress.workerId());
+      serverToWorkerIdMap.put(server, workerIds);
+    }
+    return serverToWorkerIdMap.entrySet().stream()
+        .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
+        .collect(Collectors.toList());
+  }
+
   public static String stringifyQueryServerInstanceToWorkerIdsEntry(Map.Entry<QueryServerInstance, List<Integer>> e) {
-    return e.getKey() + "|" + e.getValue();
+    QueryServerInstance server = e.getKey();
+    return server.getHostname() + ":" + server.getQueryServicePort() + "|" + e.getValue();
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/LogicalPlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/LogicalPlanner.java
index 0e317560e9..e8dd96f7dd 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/LogicalPlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/LogicalPlanner.java
@@ -35,8 +35,12 @@ public class LogicalPlanner extends HepPlanner {
   private List<RelTraitDef> _traitDefs;
 
   public LogicalPlanner(HepProgram program, Context context) {
+    this(program, context, new ArrayList<>());
+  }
+
+  public LogicalPlanner(HepProgram program, Context context, List<RelTraitDef> traitDefs) {
     super(program, context);
-    _traitDefs = new ArrayList();
+    _traitDefs = traitDefs;
   }
 
   @Override
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
index 07df7845b8..4b924316e7 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
@@ -100,7 +100,7 @@ public class PinotLogicalQueryPlanner {
       PlanNode subPlanRootSenderNode =
           new MailboxSendNode(subPlanRoot.getPlanFragmentId(), subPlanRoot.getDataSchema(), 0,
               RelDistribution.Type.BROADCAST_DISTRIBUTED, PinotRelExchangeType.getDefaultExchangeType(), null, null,
-              false);
+              false, false);
       subPlanRootSenderNode.addInput(subPlanRoot);
       PlanFragment planFragment1 =
           new PlanFragment(1, subPlanRootSenderNode, new PlanFragmentMetadata(), new ArrayList<>());
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java
index 9d7e5a2fdd..269db82f07 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java
@@ -159,7 +159,8 @@ public class PlanFragmenter implements PlanNodeVisitor<PlanNode, PlanFragmenter.
         distributionType == RelDistribution.Type.HASH_DISTRIBUTED ? node.getDistributionKeys() : null;
     MailboxSendNode mailboxSendNode =
         new MailboxSendNode(senderPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), receiverPlanFragmentId,
-            distributionType, exchangeType, distributionKeys, node.getCollations(), node.isSortOnSender());
+            distributionType, exchangeType, distributionKeys, node.getCollations(), node.isSortOnSender(),
+            node.isPartitioned());
     mailboxSendNode.addInput(nextPlanFragmentRoot);
     _planFragmentMap.put(senderPlanFragmentId,
         new PlanFragment(senderPlanFragmentId, mailboxSendNode, new PlanFragmentMetadata(), new ArrayList<>()));
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
index b9e00711af..03d8ec957b 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Exchange;
@@ -125,13 +126,17 @@ public final class RelToPlanNodeConverter {
         exchangeType = ((PinotLogicalExchange) node).getExchangeType();
       }
     }
+    RelDistribution inputDistributionTrait = node.getInputs().get(0).getTraitSet().getDistribution();
+    boolean isPartitioned = inputDistributionTrait != null
+        && inputDistributionTrait.getType() == RelDistribution.Type.HASH_DISTRIBUTED
+        && inputDistributionTrait == node.getDistribution();
     List<RelFieldCollation> fieldCollations = (collation == null) ? null : collation.getFieldCollations();
 
     // Compute all the tables involved under this exchange node
     Set<String> tableNames = getTableNamesFromRelRoot(node);
 
     return new ExchangeNode(currentStageId, toDataSchema(node.getRowType()), exchangeType, tableNames,
-        node.getDistribution(), fieldCollations, isSortOnSender, isSortOnReceiver);
+        node.getDistribution(), fieldCollations, isSortOnSender, isSortOnReceiver, isPartitioned);
   }
 
   private static PlanNode convertLogicalSetOp(SetOp node, int currentStageId) {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
index 48378d4e90..fb57c2c3d1 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
@@ -68,8 +68,8 @@ public class DispatchablePlanMetadata implements Serializable {
   // whether a stage requires singleton instance to execute, e.g. stage contains global reduce (sort/agg) operator.
   private boolean _requiresSingletonInstance;
 
-  // whether a stage is partitioned table scan
-  private boolean _isPartitionedTableScan;
+  // whether a stage is partitioned by the same way the sending exchange is desired
+  private boolean _isPartitioned;
   private int _partitionParallelism;
 
   public DispatchablePlanMetadata() {
@@ -136,12 +136,12 @@ public class DispatchablePlanMetadata implements Serializable {
     _requiresSingletonInstance = _requiresSingletonInstance || newRequireInstance;
   }
 
-  public boolean isPartitionedTableScan() {
-    return _isPartitionedTableScan;
+  public boolean isPartitioned() {
+    return _isPartitioned;
   }
 
-  public void setPartitionedTableScan(boolean isPartitionedTableScan) {
-    _isPartitionedTableScan = isPartitionedTableScan;
+  public void setPartitioned(boolean isPartitioned) {
+    _isPartitioned = isPartitioned;
   }
 
   public int getPartitionParallelism() {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
index d177a1d892..1511ebaf52 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
@@ -104,7 +104,8 @@ public class DispatchablePlanVisitor implements PlanNodeVisitor<Void, Dispatchab
   @Override
   public Void visitMailboxSend(MailboxSendNode node, DispatchablePlanContext context) {
     node.getInputs().get(0).visit(this, context);
-    getOrCreateDispatchablePlanMetadata(node, context);
+    DispatchablePlanMetadata dispatchablePlanMetadata = getOrCreateDispatchablePlanMetadata(node, context);
+    dispatchablePlanMetadata.setPartitioned(node.isPartitioned());
     context.getDispatchablePlanStageRootMap().put(node.getPlanFragmentId(), node);
     return null;
   }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
index 519d4d14b3..6fb9c4e798 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
@@ -67,7 +67,8 @@ public class MailboxAssignmentVisitor extends DefaultPostOrderTraversalVisitor<V
           senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(receiverFragmentId, mailboxMetadata);
           receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(senderFragmentId, mailboxMetadata);
         }
-      } else if (senderMetadata.isPartitionedTableScan() && (numReceivers / numSenders > 0)) {
+      } else if (senderMetadata.isPartitioned() && senderMetadata.getScannedTables().size() > 0
+          && (numReceivers / numSenders > 0)) {
         // For partitioned table scan, send the data to the worker with the same worker id (not necessary the same
         // instance). When partition parallelism is configured, send the data to the corresponding workers.
         // NOTE: Do not use partitionParallelism from the metadata because it might be configured only in the first
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
index 1d136efb20..201e735c82 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
@@ -48,6 +48,9 @@ public class ExchangeNode extends AbstractPlanNode {
   @ProtoProperties
   private boolean _isSortOnReceiver = false;
 
+  @ProtoProperties
+  private boolean _isPartitioned = false;
+
   @ProtoProperties
   private List<RelFieldCollation> _collations;
 
@@ -63,13 +66,14 @@ public class ExchangeNode extends AbstractPlanNode {
 
   public ExchangeNode(int currentStageId, DataSchema dataSchema, PinotRelExchangeType exchangeType,
       Set<String> tableNames, RelDistribution distribution, List<RelFieldCollation> collations, boolean isSortOnSender,
-      boolean isSortOnReceiver) {
+      boolean isSortOnReceiver, boolean isPartitioned) {
     super(currentStageId, dataSchema);
     _exchangeType = exchangeType;
     _keys = distribution.getKeys();
     _distributionType = distribution.getType();
     _isSortOnSender = isSortOnSender;
     _isSortOnReceiver = isSortOnReceiver;
+    _isPartitioned = isPartitioned;
     _collations = collations;
     _tableNames = tableNames;
   }
@@ -104,6 +108,10 @@ public class ExchangeNode extends AbstractPlanNode {
     return _isSortOnReceiver;
   }
 
+  public boolean isPartitioned() {
+    return _isPartitioned;
+  }
+
   public List<RelFieldCollation> getCollations() {
     return _collations;
   }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
index 1834ac90c1..20f12b7316 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
@@ -47,6 +47,8 @@ public class MailboxSendNode extends AbstractPlanNode {
   private List<RelFieldCollation.Direction> _collationDirections;
   @ProtoProperties
   private boolean _isSortOnSender;
+  @ProtoProperties
+  private boolean _isPartitioned;
 
   public MailboxSendNode(int planFragmentId) {
     super(planFragmentId);
@@ -55,7 +57,7 @@ public class MailboxSendNode extends AbstractPlanNode {
   public MailboxSendNode(int planFragmentId, DataSchema dataSchema, int receiverStageId,
       RelDistribution.Type distributionType, PinotRelExchangeType exchangeType,
       @Nullable List<Integer> distributionKeys, @Nullable List<RelFieldCollation> fieldCollations,
-      boolean isSortOnSender) {
+      boolean isSortOnSender, boolean isPartitioned) {
     super(planFragmentId, dataSchema);
     _receiverStageId = receiverStageId;
     _distributionType = distributionType;
@@ -75,6 +77,7 @@ public class MailboxSendNode extends AbstractPlanNode {
       _collationDirections = Collections.emptyList();
     }
     _isSortOnSender = isSortOnSender;
+    _isPartitioned = isPartitioned;
   }
 
   public int getReceiverStageId() {
@@ -117,9 +120,23 @@ public class MailboxSendNode extends AbstractPlanNode {
     return _isSortOnSender;
   }
 
+  public boolean isPartitioned() {
+    return _isPartitioned;
+  }
+
   @Override
   public String explain() {
-    return "MAIL_SEND(" + _distributionType + ")";
+    StringBuilder sb = new StringBuilder();
+    sb.append("MAIL_SEND(");
+    sb.append(_distributionType);
+    sb.append(')');
+    if (isPartitioned()) {
+       sb.append("[PARTITIONED]");
+    }
+    if (isSortOnSender()) {
+      sb.append("[SORTED]");
+    }
+    return sb.toString();
   }
 
   @Override
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 481cc7d593..5029321162 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -237,7 +237,7 @@ public class WorkerManager {
     metadata.setWorkerIdToServerInstanceMap(workedIdToServerInstanceMap);
     metadata.setWorkerIdToSegmentsMap(workerIdToSegmentsMap);
     metadata.setTimeBoundaryInfo(colocatedTableInfo._timeBoundaryInfo);
-    metadata.setPartitionedTableScan(true);
+    metadata.setPartitioned(metadata.isPartitioned());
     metadata.setPartitionParallelism(partitionParallelism);
   }
 
@@ -255,7 +255,7 @@ public class WorkerManager {
     // worker in the first child.
     if (!children.isEmpty()) {
       DispatchablePlanMetadata firstChildMetadata = metadataMap.get(children.get(0).getFragmentId());
-      if (firstChildMetadata.isPartitionedTableScan()) {
+      if (firstChildMetadata.isPartitioned() && firstChildMetadata.getScannedTables().size() > 0) {
         int partitionParallelism = firstChildMetadata.getPartitionParallelism();
         Map<Integer, QueryServerInstance> childWorkerIdToServerInstanceMap =
             firstChildMetadata.getWorkerIdToServerInstanceMap();
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index 6d59f8da22..28a386faa0 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -130,20 +130,20 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
         Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream()
                 .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
                 .collect(Collectors.toSet()),
-            tableName.equals("a") ? ImmutableList.of("localhost@{1,1}|[1]", "localhost@{2,2}|[0]")
-                : ImmutableList.of("localhost@{1,1}|[0]"));
+            tableName.equals("a") ? ImmutableList.of("localhost:1|[1]", "localhost:2|[0]")
+                : ImmutableList.of("localhost:1|[0]"));
       } else if (!PlannerUtils.isRootPlanFragment(stageId)) {
         // join stage should have both servers used.
         Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream()
                 .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
                 .collect(Collectors.toSet()),
-            ImmutableSet.of("localhost@{1,1}|[1]", "localhost@{2,2}|[0]"));
+            ImmutableSet.of("localhost:1|[1]", "localhost:2|[0]"));
       } else {
         // reduce stage should have the reducer instance.
         Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream()
                 .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
                 .collect(Collectors.toSet()),
-            ImmutableSet.of("localhost@{3,3}|[0]"));
+            ImmutableSet.of("localhost:3|[0]"));
       }
     }
   }
@@ -253,13 +253,13 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
         Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream()
                 .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
                 .collect(Collectors.toSet()),
-            ImmutableList.of("localhost@{1,1}|[0]"));
+            ImmutableList.of("localhost:1|[0]"));
       } else if (!PlannerUtils.isRootPlanFragment(stageId)) {
         // join stage should have both servers used.
         Assert.assertEquals(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().entrySet().stream()
                 .map(PhysicalExplainPlanVisitor::stringifyQueryServerInstanceToWorkerIdsEntry)
                 .collect(Collectors.toSet()),
-            ImmutableList.of("localhost@{1,1}|[1]", "localhost@{2,2}|[0]"));
+            ImmutableList.of("localhost:1|[1]", "localhost:2|[0]"));
       }
     }
   }
diff --git a/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json b/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json
index 9f9173a107..37b338da16 100644
--- a/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/ExplainPhysicalPlans.json
@@ -5,74 +5,386 @@
         "description": "explain plan with attributes",
         "sql": "EXPLAIN IMPLEMENTATION PLAN INCLUDING ALL ATTRIBUTES FOR SELECT col1, col3 FROM a",
         "output": [
-          "[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
-          "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
-          "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n",
-          "    └── [1]@localhost:2 PROJECT\n",
-          "        └── [1]@localhost:2 TABLE SCAN (a) null\n"
+          "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+          "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:2|[0] PROJECT\n",
+          "        └── [1]@localhost:2|[0] TABLE SCAN (a) null\n",
+          ""
         ]
       },
       {
         "description": "explain plan without attributes",
         "sql": "EXPLAIN IMPLEMENTATION PLAN EXCLUDING ATTRIBUTES FOR SELECT col1, COUNT(*) FROM a GROUP BY col1",
         "output": [
-          "[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
-          "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
-          "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n",
-          "    └── [1]@localhost:2 AGGREGATE_FINAL\n",
-          "        └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]} (Subtree Omitted)\n",
-          "            └── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n",
-          "                └── [2]@localhost:2 AGGREGATE_LEAF\n",
-          "                    └── [2]@localhost:2 TABLE SCAN (a) null\n"
+          "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+          "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:2|[0] AGGREGATE_FINAL\n",
+          "        └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            ├── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            └── [2]@localhost:2|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "                └── [2]@localhost:2|[0] AGGREGATE_LEAF\n",
+          "                    └── [2]@localhost:2|[0] TABLE SCAN (a) null\n",
+          ""
         ]
       },
       {
         "description": "explain plan with join",
         "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, b.col3 FROM a JOIN b ON a.col1 = b.col1",
         "output": [
-          "[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
-          "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
-          "└── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n",
-          "    └── [1]@localhost:2 PROJECT\n",
-          "        └── [1]@localhost:2 JOIN\n",
-          "            ├── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            │   ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]} (Subtree Omitted)\n",
-          "            │   └── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n",
-          "            │       └── [2]@localhost:2 PROJECT\n",
-          "            │           └── [2]@localhost:2 TABLE SCAN (a) null\n",
-          "            └── [1]@localhost:2 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "                └── [3]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{1,1}|[1],[1]@localhost@{2,2}|[0]}\n",
-          "                    └── [3]@localhost:1 PROJECT\n",
-          "                        └── [3]@localhost:1 TABLE SCAN (b) null\n"
+          "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+          "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:2|[0] PROJECT\n",
+          "        └── [1]@localhost:2|[0] JOIN\n",
+          "            ├── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │   ├── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            │   └── [2]@localhost:2|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "            │       └── [2]@localhost:2|[0] PROJECT\n",
+          "            │           └── [2]@localhost:2|[0] TABLE SCAN (a) null\n",
+          "            └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                └── [3]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "                    └── [3]@localhost:1|[0] PROJECT\n",
+          "                        └── [3]@localhost:1|[0] TABLE SCAN (b) null\n",
+          ""
         ]
       },
       {
-        "description": "explain plan with join with colocated tables",
+        "sql": "SELECT {tbl1}.name, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ GROUP BY {tbl1}.name"
+      },
+      {
+        "description": "explain plan with join with colocated tables, not on partition key",
+        "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col2, a.col3, b.col3 FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ JOIN b /*+ tableOptions(partition_key='col1', partition_size='4') */ ON a.col1 = b.col2 WHERE b.col3 > 0",
+        "output": [
+          "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+          "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:2|[0] PROJECT\n",
+          "        └── [1]@localhost:2|[0] JOIN\n",
+          "            ├── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │   ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            │   ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            │   ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            │   └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "            │       └── [2]@localhost:1|[1] PROJECT\n",
+          "            │           └── [2]@localhost:1|[1] TABLE SCAN (a) null\n",
+          "            └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                ├── [3]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "                ├── [3]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "                ├── [3]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "                └── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "                    └── [3]@localhost:1|[1] PROJECT\n",
+          "                        └── [3]@localhost:1|[1] FILTER\n",
+          "                            └── [3]@localhost:1|[1] TABLE SCAN (b) null\n",
+          ""
+        ]
+      },
+      {
+        "description": "explain plan with join with colocated tables, one-sided on partition key",
+        "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col2, a.col3, b.col3 FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ JOIN b /*+ tableOptions(partition_key='col1', partition_size='4') */ ON a.col1 = b.col1 WHERE b.col3 > 0",
+        "output": [
+          "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+          "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:2|[0] PROJECT\n",
+          "        └── [1]@localhost:2|[0] JOIN\n",
+          "            ├── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │   ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            │   ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            │   ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            │   └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "            │       └── [2]@localhost:1|[1] PROJECT\n",
+          "            │           └── [2]@localhost:1|[1] TABLE SCAN (a) null\n",
+          "            └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                ├── [3]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "                ├── [3]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "                ├── [3]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "                └── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "                    └── [3]@localhost:1|[1] PROJECT\n",
+          "                        └── [3]@localhost:1|[1] FILTER\n",
+          "                            └── [3]@localhost:1|[1] TABLE SCAN (b) null\n",
+          ""
+        ]
+      },
+      {
+        "description": "explain plan with join with colocated tables, both-sided on partition key",
         "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col2, a.col3, b.col3 FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */ JOIN b /*+ tableOptions(partition_key='col1', partition_size='4') */ ON a.col2 = b.col1 WHERE b.col3 > 0",
         "output": [
-          "[0]@localhost:3 MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
-          "├── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
-          "├── [1]@localhost:2 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
-          "├── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]} (Subtree Omitted)\n",
-          "└── [1]@localhost:1 MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost@{3,3}|[0]}\n",
-          "    └── [1]@localhost:1 PROJECT\n",
-          "        └── [1]@localhost:1 JOIN\n",
-          "            ├── [1]@localhost:1 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "            │   ├── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
-          "            │   ├── [2]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
-          "            │   ├── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
-          "            │   └── [2]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]}\n",
-          "            │       └── [2]@localhost:1 PROJECT\n",
-          "            │           └── [2]@localhost:1 TABLE SCAN (a) null\n",
-          "            └── [1]@localhost:1 MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
-          "                ├── [3]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
-          "                ├── [3]@localhost:2 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
-          "                ├── [3]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]} (Subtree Omitted)\n",
-          "                └── [3]@localhost:1 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost@{2,2}|[2, 3],[1]@localhost@{1,1}|[0, 1]}\n",
-          "                    └── [3]@localhost:1 PROJECT\n",
-          "                        └── [3]@localhost:1 FILTER\n",
-          "                            └── [3]@localhost:1 TABLE SCAN (b) null\n"
+          "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+          "├── [1]@localhost:2|[2] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "├── [1]@localhost:2|[3] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "├── [1]@localhost:1|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:1|[1] PROJECT\n",
+          "        └── [1]@localhost:1|[1] JOIN\n",
+          "            ├── [1]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            │   ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[2]} (Subtree Omitted)\n",
+          "            │   ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[3]} (Subtree Omitted)\n",
+          "            │   ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]} (Subtree Omitted)\n",
+          "            │   └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]}\n",
+          "            │       └── [2]@localhost:1|[1] PROJECT\n",
+          "            │           └── [2]@localhost:1|[1] TABLE SCAN (a) null\n",
+          "            └── [1]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                ├── [3]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[2]} (Subtree Omitted)\n",
+          "                ├── [3]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[3]} (Subtree Omitted)\n",
+          "                ├── [3]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]} (Subtree Omitted)\n",
+          "                └── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]}\n",
+          "                    └── [3]@localhost:1|[1] PROJECT\n",
+          "                        └── [3]@localhost:1|[1] FILTER\n",
+          "                            └── [3]@localhost:1|[1] TABLE SCAN (b) null\n",
+          ""
+        ]
+      },
+      {
+        "description": "explain plan with semi-join + group-by with colocated tables",
+        "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, COUNT(*) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */  WHERE a.col2 IN (   SELECT b.col1    FROM b /*+ tableOptions(partition_key='col1', partition_size='4') */    WHERE b.col1 = 'z' )  GROUP BY a.col1",
+        "output": [
+          "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+          "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:2|[0] AGGREGATE_FINAL\n",
+          "        └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "                └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
+          "                    └── [2]@localhost:1|[1] JOIN\n",
+          "                        ├── [2]@localhost:1|[1] PROJECT\n",
+          "                        │   └── [2]@localhost:1|[1] TABLE SCAN (a) null\n",
+          "                        └── [2]@localhost:1|[1] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+          "                            ├── [3]@localhost:2|[2] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]} (Subtree Omitted)\n",
+          "                            ├── [3]@localhost:2|[3] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]} (Subtree Omitted)\n",
+          "                            ├── [3]@localhost:1|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]} (Subtree Omitted)\n",
+          "                            └── [3]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]}\n",
+          "                                └── [3]@localhost:1|[1] PROJECT\n",
+          "                                    └── [3]@localhost:1|[1] FILTER\n",
+          "                                        └── [3]@localhost:1|[1] TABLE SCAN (b) null\n",
+          ""
+        ]
+      },
+      {
+        "description": "explain plan with semi-join + group-by with colocated tables with colocated hints",
+        "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ a.col1, COUNT(*) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */  WHERE a.col2 IN (   SELECT b.col1    FROM b /*+ tableOptions(partition_key='col1', partition_size='4') */    WHERE b.col1 = 'z' )  GROUP BY a.col1",
+        "output": [
+          "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+          "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:2|[0] AGGREGATE_FINAL\n",
+          "        └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "                └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
+          "                    └── [2]@localhost:1|[1] JOIN\n",
+          "                        ├── [2]@localhost:1|[1] PROJECT\n",
+          "                        │   └── [2]@localhost:1|[1] TABLE SCAN (a) null\n",
+          "                        └── [2]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                            ├── [3]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:2|[2]} (Subtree Omitted)\n",
+          "                            ├── [3]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:2|[3]} (Subtree Omitted)\n",
+          "                            ├── [3]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:1|[0]} (Subtree Omitted)\n",
+          "                            └── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:1|[1]}\n",
+          "                                └── [3]@localhost:1|[1] PROJECT\n",
+          "                                    └── [3]@localhost:1|[1] FILTER\n",
+          "                                        └── [3]@localhost:1|[1] TABLE SCAN (b) null\n",
+          ""
+        ]
+      },
+      {
+        "description": "explain plan with direct group-by with pre-partitioned tables on partition key",
+        "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col2, SUM(a.col3) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */  GROUP BY a.col2",
+        "output": [
+          "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+          "├── [1]@localhost:2|[2] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "├── [1]@localhost:2|[3] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "├── [1]@localhost:1|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:1|[1] AGGREGATE_FINAL\n",
+          "        └── [1]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[2]} (Subtree Omitted)\n",
+          "            ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[3]} (Subtree Omitted)\n",
+          "            ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]} (Subtree Omitted)\n",
+          "            └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]}\n",
+          "                └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
+          "                    └── [2]@localhost:1|[1] TABLE SCAN (a) null\n",
+          ""
+        ]
+      },
+      {
+        "description": "explain plan with direct group-by with pre-partitioned tables on non-partition key",
+        "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, SUM(a.col3) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */  GROUP BY a.col1",
+        "output": [
+          "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+          "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:2|[0] AGGREGATE_FINAL\n",
+          "        └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "                └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
+          "                    └── [2]@localhost:1|[1] TABLE SCAN (a) null\n",
+          ""
+        ]
+      },
+      {
+        "description": "explain plan with direct group-by with pre-partitioned tables on both partition and non-partition key",
+        "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, a.col2, SUM(a.col3) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */  GROUP BY a.col1, a.col2",
+        "comments": "TODO: currently we do not support minimum hash-set thus we repartition from col2 to (col1, col2), but once we support this should not have repartitioning.",
+        "output": [
+          "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+          "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:2|[0] AGGREGATE_FINAL\n",
+          "        └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "                └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
+          "                    └── [2]@localhost:1|[1] TABLE SCAN (a) null\n",
+          ""
+        ]
+      },
+      {
+        "description": "explain plan with direct inner join + group-by with colocated tables on join key",
+        "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col2, SUM(a.col3) FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */  JOIN b /*+ tableOptions(partition_key='col1', partition_size='4') */    ON a.col2 = b.col1  WHERE b.col3 > 0  GROUP BY a.col2",
+        "output": [
+          "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+          "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:2|[0] AGGREGATE_FINAL\n",
+          "        └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "                └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
+          "                    └── [2]@localhost:1|[1] JOIN\n",
+          "                        ├── [2]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                        │   ├── [3]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:2|[2]} (Subtree Omitted)\n",
+          "                        │   ├── [3]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:2|[3]} (Subtree Omitted)\n",
+          "                        │   ├── [3]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:1|[0]} (Subtree Omitted)\n",
+          "                        │   └── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:1|[1]}\n",
+          "                        │       └── [3]@localhost:1|[1] PROJECT\n",
+          "                        │           └── [3]@localhost:1|[1] TABLE SCAN (a) null\n",
+          "                        └── [2]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                            ├── [4]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:2|[2]} (Subtree Omitted)\n",
+          "                            ├── [4]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:2|[3]} (Subtree Omitted)\n",
+          "                            ├── [4]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:1|[0]} (Subtree Omitted)\n",
+          "                            └── [4]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:1|[1]}\n",
+          "                                └── [4]@localhost:1|[1] PROJECT\n",
+          "                                    └── [4]@localhost:1|[1] FILTER\n",
+          "                                        └── [4]@localhost:1|[1] TABLE SCAN (b) null\n",
+          ""
+        ]
+      },
+      {
+        "description": "explain plan with direct inner join + group-by with colocated tables on non-join key",
+        "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, SUM(a.col3)  FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */  JOIN b /*+ tableOptions(partition_key='col1', partition_size='4') */    ON a.col2 = b.col1  WHERE b.col3 > 0  GROUP BY a.col1",
+        "output": [
+          "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+          "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:2|[0] AGGREGATE_FINAL\n",
+          "        └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "                └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
+          "                    └── [2]@localhost:1|[1] JOIN\n",
+          "                        ├── [2]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                        │   ├── [3]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:2|[2]} (Subtree Omitted)\n",
+          "                        │   ├── [3]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:2|[3]} (Subtree Omitted)\n",
+          "                        │   ├── [3]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:1|[0]} (Subtree Omitted)\n",
+          "                        │   └── [3]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:1|[1]}\n",
+          "                        │       └── [3]@localhost:1|[1] PROJECT\n",
+          "                        │           └── [3]@localhost:1|[1] TABLE SCAN (a) null\n",
+          "                        └── [2]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                            ├── [4]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:2|[2]} (Subtree Omitted)\n",
+          "                            ├── [4]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:2|[3]} (Subtree Omitted)\n",
+          "                            ├── [4]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:1|[0]} (Subtree Omitted)\n",
+          "                            └── [4]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@localhost:1|[1]}\n",
+          "                                └── [4]@localhost:1|[1] PROJECT\n",
+          "                                    └── [4]@localhost:1|[1] FILTER\n",
+          "                                        └── [4]@localhost:1|[1] TABLE SCAN (b) null\n",
+          ""
+        ]
+      },
+      {
+        "description": "explain plan with semi join from grouped right table, then group-by with colocated tables on join key",
+        "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col2, SUM(a.col3)  FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */  WHERE a.col2 IN (   SELECT col2    FROM b /*+ tableOptions(partition_key='col1', partition_size='4') */    WHERE b.col3 > 0    GROUP BY col2 HAVING COUNT(*) > 1 )  GROUP BY a.col2",
+        "output": [
+          "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+          "├── [1]@localhost:2|[2] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "├── [1]@localhost:2|[3] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "├── [1]@localhost:1|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:1|[1] AGGREGATE_FINAL\n",
+          "        └── [1]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[2]} (Subtree Omitted)\n",
+          "            ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[3]} (Subtree Omitted)\n",
+          "            ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]} (Subtree Omitted)\n",
+          "            └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]}\n",
+          "                └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
+          "                    └── [2]@localhost:1|[1] JOIN\n",
+          "                        ├── [2]@localhost:1|[1] PROJECT\n",
+          "                        │   └── [2]@localhost:1|[1] TABLE SCAN (a) null\n",
+          "                        └── [2]@localhost:1|[1] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+          "                            ├── [3]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]} (Subtree Omitted)\n",
+          "                            └── [3]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]}\n",
+          "                                └── [3]@localhost:2|[0] FILTER\n",
+          "                                    └── [3]@localhost:2|[0] AGGREGATE_FINAL\n",
+          "                                        └── [3]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                                            ├── [4]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[3]@localhost:1|[1],[3]@localhost:2|[0]} (Subtree Omitted)\n",
+          "                                            ├── [4]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[3]@localhost:1|[1],[3]@localhost:2|[0]} (Subtree Omitted)\n",
+          "                                            ├── [4]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[3]@localhost:1|[1],[3]@localhost:2|[0]} (Subtree Omitted)\n",
+          "                                            └── [4]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[3]@localhost:1|[1],[3]@localhost:2|[0]}\n",
+          "                                                └── [4]@localhost:1|[1] AGGREGATE_LEAF\n",
+          "                                                    └── [4]@localhost:1|[1] PROJECT\n",
+          "                                                        └── [4]@localhost:1|[1] FILTER\n",
+          "                                                            └── [4]@localhost:1|[1] TABLE SCAN (b) null\n",
+          ""
+        ]
+      },
+      {
+        "description": "explain plan with semi join from grouped right table, then group-by with colocated tables on non-join key",
+        "sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.col1, SUM(a.col3)  FROM a /*+ tableOptions(partition_key='col2', partition_size='4') */  WHERE a.col2 IN (   SELECT col1    FROM b /*+ tableOptions(partition_key='col1', partition_size='4') */    WHERE b.col3 > 0    GROUP BY col1 HAVING COUNT(*) > 1 )  GROUP BY a.col1",
+        "output": [
+          "[0]@localhost:3|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+          "├── [1]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]} (Subtree Omitted)\n",
+          "└── [1]@localhost:2|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@localhost:3|[0]}\n",
+          "    └── [1]@localhost:2|[0] AGGREGATE_FINAL\n",
+          "        └── [1]@localhost:2|[0] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "            ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)\n",
+          "            └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}\n",
+          "                └── [2]@localhost:1|[1] AGGREGATE_LEAF\n",
+          "                    └── [2]@localhost:1|[1] JOIN\n",
+          "                        ├── [2]@localhost:1|[1] PROJECT\n",
+          "                        │   └── [2]@localhost:1|[1] TABLE SCAN (a) null\n",
+          "                        └── [2]@localhost:1|[1] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)\n",
+          "                            ├── [3]@localhost:2|[2] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]} (Subtree Omitted)\n",
+          "                            ├── [3]@localhost:2|[3] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]} (Subtree Omitted)\n",
+          "                            ├── [3]@localhost:1|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]} (Subtree Omitted)\n",
+          "                            └── [3]@localhost:1|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[2]@localhost:1|[0, 1],[2]@localhost:2|[2, 3]}\n",
+          "                                └── [3]@localhost:1|[1] FILTER\n",
+          "                                    └── [3]@localhost:1|[1] AGGREGATE_FINAL\n",
+          "                                        └── [3]@localhost:1|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)\n",
+          "                                            ├── [4]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[3]@localhost:2|[2]} (Subtree Omitted)\n",
+          "                                            ├── [4]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[3]@localhost:2|[3]} (Subtree Omitted)\n",
+          "                                            ├── [4]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[3]@localhost:1|[0]} (Subtree Omitted)\n",
+          "                                            └── [4]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[3]@localhost:1|[1]}\n",
+          "                                                └── [4]@localhost:1|[1] AGGREGATE_LEAF\n",
+          "                                                    └── [4]@localhost:1|[1] PROJECT\n",
+          "                                                        └── [4]@localhost:1|[1] FILTER\n",
+          "                                                            └── [4]@localhost:1|[1] TABLE SCAN (b) null\n",
+          ""
         ]
       }
     ]
diff --git a/pinot-query-runtime/src/test/resources/queries/QueryHints.json b/pinot-query-runtime/src/test/resources/queries/QueryHints.json
index cd4856a7e7..c05f3158df 100644
--- a/pinot-query-runtime/src/test/resources/queries/QueryHints.json
+++ b/pinot-query-runtime/src/test/resources/queries/QueryHints.json
@@ -120,6 +120,7 @@
     ]
   },
   "hint_option_queries_unmatched_partition": {
+    "comments": "this section specifically deal with unmatched partition tagging of table options",
     "tables": {
       "tbl1": {
         "schema": [
@@ -297,35 +298,61 @@
         "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, COUNT(*), SUM({tbl1}.val), SUM({tbl1}.num) FROM {tbl1} WHERE {tbl1}.val >= 0 AND {tbl1}.name != 'a' GROUP BY {tbl1}.num"
       },
       {
-        "description": "join with colocated tables",
+        "description": "join with pre-partitioned left and right tables",
         "sql": "SELECT {tbl1}.num, {tbl1}.val, {tbl2}.data FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0"
       },
+      {
+        "description": "join with pre-partitioned left and right tables; colocated hint",
+        "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, {tbl1}.val, {tbl2}.data FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0"
+      },
       {
         "description": "group by with pre-partitioned tables on partition column",
         "sql": "SELECT {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ GROUP BY {tbl1}.num"
       },
       {
         "description": "group by with pre-partitioned tables on non-partition column",
-        "ignored": true,
-        "comment": "tableOption usage implies direct exchange (e.g. without shuffle) this should be fix in the future",
         "sql": "SELECT {tbl1}.name, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ GROUP BY {tbl1}.name"
       },
       {
-        "description": "join with colocated tables then group-by left join key column",
+        "description": "join with pre-partitioned left and right tables then group-by left join key column",
         "sql": "SELECT {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0 GROUP BY {tbl1}.num"
       },
       {
-        "description": "join with colocated tables then group-by left join key column and other columns",
+        "description": "join with pre-partitioned left and right tables then group-by left join key column; colocated hint",
+        "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0 GROUP BY {tbl1}.num"
+      },
+      {
+        "description": "join with pre-partitioned left and right tables then group-by left join key column and other columns",
+        "sql": "SELECT {tbl1}.num, {tbl2}.id, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0 GROUP BY {tbl1}.num, {tbl2}.id"
+      },
+      {
+        "description": "join with pre-partitioned left and right tables then group-by left join key column and other columns; colocated hint",
+        "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, {tbl2}.id, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0 GROUP BY {tbl1}.num, {tbl2}.id"
+      },
+      {
+        "description": "join with pre-partitioned left and right tables then group-by left join key column and other columns",
         "sql": "SELECT {tbl1}.num, {tbl2}.id, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0 GROUP BY {tbl1}.num, {tbl2}.id"
       },
       {
-        "description": "join with colocated tables then group-by other columns",
+        "description": "join with pre-partitioned left and right tables then group-by left join key column and other columns; colocated hint",
+        "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, {tbl2}.id, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0 GROUP BY {tbl1}.num, {tbl2}.id"
+      },
+      {
+        "description": "join with pre-partitioned left and right tables then group-by other columns",
         "sql": "SELECT {tbl2}.id, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0 GROUP BY {tbl2}.id"
       },
       {
-        "description": "agg + semi-join on colocated tables then group by on partition column",
+        "description": "join with pre-partitioned left and right tables then group-by other columns; colocated hint",
+        "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ {tbl2}.id, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0 GROUP BY {tbl2}.id"
+      },
+      {
+        "description": "agg + semi-join on pre-partitioned main and side tables then group by on partition column",
         "sql": "SELECT {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num"
       },
+      {
+        "description": "agg + semi-join on pre-partitioned main and side tables then group by on partition column; colocated hint",
+        "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num"
+      },
       {
         "description": "agg + semi-join on pre-partitioned main tables then group by on partition column",
         "sql": "SELECT {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num"
@@ -335,40 +362,177 @@
         "sql": "SELECT {tbl1}.name, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} WHERE {tbl2}.data > 0) GROUP BY {tbl1}.name"
       },
       {
-        "description": "agg + semi-join on pre-partitioned main tables with group by on partitioned column on semi table",
+        "description": "agg + semi-join on pre-partitioned main table with group by on partitioned column on pre-partitioned semi table",
         "sql": "SELECT {tbl1}.name, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0 GROUP BY num HAVING COUNT(*) > 1) GROUP BY {tbl1}.name"
       },
       {
-        "description": "agg + semi-join on pre-partitioned main tables with group by on partitioned column with having filter on top of semi join",
+        "description": "agg + semi-join on pre-partitioned main table with group by on partitioned column on pre-partitioned semi table; colocated hint",
+        "ignored": true,
+        "comments": "although both table are partitioned, but the partition function for agg-having semi table is not the same as partition function of the main table, and thus cannot use colocated hint; This is not detected currently and thus will produce incorrect results",
+        "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.name, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0 GROUP BY num HAVING COUNT(*) > 1) GROUP BY {tbl1}.name"
+      },
+      {
+        "description": "agg + semi-join on pre-partitioned main and side tables with group by on partitioned column with having filter on top of semi join",
         "sql": "SELECT {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num HAVING COUNT(*) > 5"
       },
       {
-        "description": "agg + semi-join on pre-partitioned main tables with group by on partitioned column with sorting on top of semi join",
+        "description": "agg + semi-join on pre-partitioned main and side tables with group by on partitioned column with having filter on top of semi join; colocated hint",
+        "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num HAVING COUNT(*) > 5"
+      },
+      {
+        "description": "agg + semi-join on pre-partitioned main and side tables with group by on partitioned column with sorting on top of semi join",
         "sql": "SELECT {tbl1}.name, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.name ORDER BY SUM({tbl1}.val) DESC"
       },
       {
-        "description": "co-partition agg + semi-join with colocated tables & agg hint",
+        "description": "agg + semi-join on pre-partitioned main and side tables with group by on partitioned column with sorting on top of semi join; colocated hint",
+        "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.name, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.name ORDER BY SUM({tbl1}.val) DESC"
+      },
+      {
+        "description": "partition agg + semi-join with pre-partitioned main and side tables & agg hint",
         "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num"
       },
       {
-        "description": "co-partition agg + semi-join with single table partition & agg hint",
+        "description": "partition agg + semi-join with pre-partitioned main and side table & agg hint and colocated hint",
+        "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true'), joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num"
+      },
+      {
+        "description": "partition agg + semi-join with single table partition & agg hint",
         "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num"
       },
       {
-        "description": "co-partition agg + semi-join on pre-partitioned main tables with group by on partitioned column with having filter on top of semi join",
+        "description": "partition agg + semi-join on pre-partitioned main tables with group by on partitioned column with having filter on top of semi join",
         "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num HAVING COUNT(*) > 5"
       },
       {
-        "description": "co-partition agg + semi-join on pre-partitioned main tables with group by on partitioned column with sorting on top of semi join colocated on partition key",
+        "description": "partition agg + semi-join on pre-partitioned main tables with group by on partitioned column with having filter on top of semi join; colocated hint",
+        "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true'), joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num HAVING COUNT(*) > 5"
+      },
+      {
+        "description": "partition agg + semi-join on pre-partitioned main tables with group by on partitioned column with sorting on top of semi join colocated on partition key",
         "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num ORDER BY SUM({tbl1}.val) DESC"
       },
       {
-        "description": "semi-join on pre-partitioned main tables with sorting on top of semi join on join key",
+        "description": "partition agg + semi-join on pre-partitioned main tables with group by on partitioned column with sorting on top of semi join colocated on partition key; colocated hint",
+        "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true'), joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num ORDER BY SUM({tbl1}.val) DESC"
+      },
+      {
+        "description": "semi-join on pre-partitioned main and side tables with sorting on top of semi join on join key",
         "sql": "SELECT {tbl1}.num FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) ORDER BY {tbl1}.num DESC"
       },
       {
-        "description": "semi-join on pre-partitioned main tables with sorting on top of semi join with non-join columns",
+        "description": "semi-join on pre-partitioned main and side tables with sorting on top of semi join on join key; colocated hint",
+        "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) ORDER BY {tbl1}.num DESC"
+      },
+      {
+        "description": "semi-join on pre-partitioned main and side tables with sorting on top of semi join with non-join columns",
         "sql": "SELECT {tbl1}.name, {tbl1}.val FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) ORDER BY {tbl1}.val DESC"
+      },
+      {
+        "description": "semi-join on pre-partitioned main and side tables with sorting on top of semi join with non-join columns; colocated hint",
+        "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.name, {tbl1}.val FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) ORDER BY {tbl1}.val DESC"
+      }
+    ]
+  },
+  "hint_option_rel_trait_partition": {
+    "comments": "this section specifically test behavior of relational trait / partition inferences and aggressively",
+    "tables": {
+      "tbl1": {
+        "schema": [
+          {"name": "num", "type": "INT"},
+          {"name": "name", "type": "STRING"},
+          {"name": "val", "type": "INT"}
+        ],
+        "inputs": [
+          [1, "a", 23],
+          [2, "b", 34],
+          [3, "c", 45],
+          [3, "yyy", 56],
+          [4, "e", 12],
+          [4, "e", 23],
+          [6, "e", 34],
+          [7, "d", 45],
+          [7, "f", 56],
+          [8, "z", 67]
+        ],
+        "partitionColumns": [
+          "num"
+        ]
+      },
+      "tbl2": {
+        "schema": [
+          {"name": "num", "type": "INT"},
+          {"name": "id", "type": "STRING"},
+          {"name": "data", "type": "INT"}
+        ],
+        "inputs": [
+          [1, "xxx", 2],
+          [1, "xxx", 4],
+          [3, "yyy", 5],
+          [3, "zzz", 6],
+          [5, "zzz", 12],
+          [6, "e", 23],
+          [7, "d", 34],
+          [8, "z", 45]
+        ],
+        "partitionColumns": [
+          "num"
+        ]
+      },
+      "tbl3": {
+        "schema": [
+          {"name": "num", "type": "INT"},
+          {"name": "id", "type": "STRING"},
+          {"name": "data", "type": "INT"}
+        ],
+        "inputs": [
+          [1, "xxx", 12],
+          [2, "xxx", 23],
+          [3, "yyy", 34],
+          [4, "yyy", 45],
+          [5, "zzz", 12]
+        ]
+      }
+    },
+    "queries": [
+      {
+        "description": "colocated join, with hint, not on partition key",
+        "sql": "SELECT {tbl1}.num, {tbl1}.val, {tbl2}.data FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.val = {tbl2}.data WHERE {tbl2}.data > 0"
+      },
+      {
+        "description": "colocated join, with hint, one-side on partition key",
+        "sql": "SELECT {tbl1}.num, {tbl1}.val, {tbl2}.data FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.data WHERE {tbl2}.data > 0"
+      },
+      {
+        "description": "colocated join, with hint, both-side on partition key",
+        "sql": "SELECT {tbl1}.num, {tbl1}.val, {tbl2}.data FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0"
+      },
+      {
+        "description": "colocated join then group by, not on partition key, group-by join key",
+        "sql": "SELECT {tbl1}.val, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.val = {tbl2}.data WHERE {tbl2}.data > 0 GROUP BY {tbl1}.val"
+      },
+      {
+        "description": "colocated join then group by, not on partition key, group-by non-join key",
+        "sql": "SELECT {tbl2}.id, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.val = {tbl2}.data WHERE {tbl2}.data > 0 GROUP BY {tbl2}.id"
+      },
+      {
+        "description": "colocated join then group by, one-side on partition key, group-by join key",
+        "sql": "SELECT {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.data WHERE {tbl2}.data > 0 GROUP BY {tbl1}.num"
+      },
+      {
+        "description": "colocated join then group by, one-side on partition key, group-by non-join key",
+        "sql": "SELECT {tbl2}.id, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.data WHERE {tbl2}.data > 0 GROUP BY {tbl2}.id"
+      },
+      {
+        "description": "colocated join then group by, both-side on partition key, group-by left join key",
+        "sql": "SELECT {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0 GROUP BY {tbl1}.num"
+      },
+      {
+        "description": "colocated join then group by, both-side on partition key, group-by right join key",
+        "sql": "SELECT {tbl2}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0 GROUP BY {tbl2}.num"
+      },
+      {
+        "description": "colocated join then group by, both-side on partition key, group-by non-join key",
+        "sql": "SELECT {tbl2}.id, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num WHERE {tbl2}.data > 0 GROUP BY {tbl2}.id"
       }
     ]
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org