You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/06/14 14:43:29 UTC

[pinot] branch master updated: adding PinotLogicalExchange to add exchange type indicator (#10813)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new bb4091f12f adding PinotLogicalExchange to add exchange type indicator (#10813)
bb4091f12f is described below

commit bb4091f12f380a739ae08619c18c7874223d1be4
Author: Xiang Fu <xi...@gmail.com>
AuthorDate: Wed Jun 14 07:43:20 2023 -0700

    adding PinotLogicalExchange to add exchange type indicator (#10813)
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../calcite/rel/logical/PinotLogicalExchange.java  |  94 +++++++
 .../rel/logical/PinotLogicalSortExchange.java      |  31 ++-
 .../calcite/rel/logical/PinotRelExchangeType.java  |  49 ++++
 .../PinotAggregateExchangeNodeInsertRule.java      |  12 +-
 .../rel/rules/PinotJoinExchangeNodeInsertRule.java |  14 +-
 .../rel/rules/PinotJoinToDynamicBroadcastRule.java |  22 +-
 .../apache/calcite/rel/rules/PinotRuleUtils.java   |   3 +-
 .../rules/PinotSetOpExchangeNodeInsertRule.java    |   4 +-
 .../rules/PinotWindowExchangeNodeInsertRule.java   |  19 +-
 .../apache/pinot/query/planner/PlanFragment.java   |   2 +-
 .../planner/logical/PinotLogicalQueryPlanner.java  |   7 +-
 .../query/planner/logical/PlanFragmenter.java      |  14 +-
 .../planner/logical/RelToPlanNodeConverter.java    |  13 +-
 .../planner/logical/ShuffleRewriteVisitor.java     |   4 +-
 .../query/planner/logical/SubPlanFragmenter.java   |   4 +-
 .../colocated/GreedyShuffleRewriteVisitor.java     |   8 +-
 .../pinot/query/planner/plannode/ExchangeNode.java |  19 +-
 .../query/planner/plannode/MailboxReceiveNode.java |  23 +-
 .../query/planner/plannode/MailboxSendNode.java    |  23 +-
 .../apache/pinot/query/QueryCompilationTest.java   |  16 +-
 .../src/test/resources/queries/AggregatePlans.json |  16 +-
 .../src/test/resources/queries/GroupByPlans.json   |  34 +--
 .../src/test/resources/queries/JoinPlans.json      |  98 +++----
 .../src/test/resources/queries/OrderByPlans.json   |   8 +-
 .../test/resources/queries/PinotHintablePlans.json |  34 +--
 .../src/test/resources/queries/SetOpPlans.json     |  38 +--
 .../resources/queries/WindowFunctionPlans.json     | 298 ++++++++++-----------
 .../apache/pinot/query/runtime/QueryRunner.java    |   2 +-
 .../query/runtime/plan/PhysicalPlanVisitor.java    |   6 +-
 .../plan/pipeline/PipelineBreakerVisitor.java      |   5 +-
 30 files changed, 569 insertions(+), 351 deletions(-)

diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotLogicalExchange.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotLogicalExchange.java
new file mode 100644
index 0000000000..c08b7ae8ad
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotLogicalExchange.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.logical;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelDistributionTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttle;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Exchange;
+
+
+/**
+ * Pinot's implementation of {@link Exchange} which needs information about whether to exchange is
+ * done on a streaming or a pipeline-breaking fashion.
+ */
+public class PinotLogicalExchange extends Exchange {
+  private final PinotRelExchangeType _exchangeType;
+
+  private PinotLogicalExchange(RelOptCluster cluster, RelTraitSet traitSet,
+      RelNode input, RelDistribution distribution, PinotRelExchangeType exchangeType) {
+    super(cluster, traitSet, input, distribution);
+    _exchangeType = exchangeType;
+    assert traitSet.containsIfApplicable(Convention.NONE);
+  }
+
+
+  public static PinotLogicalExchange create(RelNode input,
+      RelDistribution distribution) {
+    return create(input, distribution, PinotRelExchangeType.getDefaultExchangeType());
+  }
+
+  /**
+   * Creates a LogicalExchange.
+   *
+   * @param input     Input relational expression
+   * @param distribution Distribution specification
+   * @param exchangeType RelExchangeType specification
+   */
+  public static PinotLogicalExchange create(RelNode input,
+      RelDistribution distribution, PinotRelExchangeType exchangeType) {
+    RelOptCluster cluster = input.getCluster();
+    distribution = RelDistributionTraitDef.INSTANCE.canonize(distribution);
+    RelTraitSet traitSet =
+        input.getTraitSet().replace(Convention.NONE).replace(distribution);
+    return new PinotLogicalExchange(cluster, traitSet, input, distribution, exchangeType);
+  }
+
+  //~ Methods ----------------------------------------------------------------
+
+  @Override
+  public Exchange copy(RelTraitSet traitSet, RelNode newInput,
+      RelDistribution newDistribution) {
+    return new PinotLogicalExchange(getCluster(), traitSet, newInput,
+        newDistribution, _exchangeType);
+  }
+
+  @Override
+  public RelNode accept(RelShuttle shuttle) {
+    return shuttle.visit(this);
+  }
+
+  @Override
+  public RelWriter explainTerms(RelWriter pw) {
+    RelWriter relWriter = super.explainTerms(pw);
+    if (_exchangeType != PinotRelExchangeType.getDefaultExchangeType()) {
+      relWriter.item("relExchangeType", _exchangeType);
+    }
+    return relWriter;
+  }
+
+  public PinotRelExchangeType getExchangeType() {
+    return _exchangeType;
+  }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotLogicalSortExchange.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotLogicalSortExchange.java
index e019a680c1..ace06a6c43 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotLogicalSortExchange.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotLogicalSortExchange.java
@@ -44,11 +44,13 @@ public class PinotLogicalSortExchange extends SortExchange {
 
   protected final boolean _isSortOnSender;
   protected final boolean _isSortOnReceiver;
+  protected final PinotRelExchangeType _exchangeType;
 
   private PinotLogicalSortExchange(RelOptCluster cluster, RelTraitSet traitSet,
-      RelNode input, RelDistribution distribution, RelCollation collation,
+      RelNode input, RelDistribution distribution, PinotRelExchangeType exchangeType, RelCollation collation,
       boolean isSortOnSender, boolean isSortOnReceiver) {
     super(cluster, traitSet, input, distribution, collation);
+    _exchangeType = exchangeType;
     _isSortOnSender = isSortOnSender;
     _isSortOnReceiver = isSortOnReceiver;
   }
@@ -58,15 +60,27 @@ public class PinotLogicalSortExchange extends SortExchange {
    */
   public PinotLogicalSortExchange(RelInput input) {
     super(input);
+    _exchangeType = PinotRelExchangeType.STREAMING;
     _isSortOnSender = false;
     _isSortOnReceiver = true;
   }
 
+  public static PinotLogicalSortExchange create(
+      RelNode input,
+      RelDistribution distribution,
+      RelCollation collation,
+      boolean isSortOnSender,
+      boolean isSortOnReceiver) {
+    return create(input, distribution, PinotRelExchangeType.getDefaultExchangeType(), collation, isSortOnSender,
+        isSortOnReceiver);
+  }
+
   /**
    * Creates a PinotLogicalSortExchange.
    *
    * @param input     Input relational expression
    * @param distribution Distribution specification
+   * @param exchangeType Exchange type specification
    * @param collation array of sort specifications
    * @param isSortOnSender whether to sort on the sender
    * @param isSortOnReceiver whether to sort on receiver
@@ -74,6 +88,7 @@ public class PinotLogicalSortExchange extends SortExchange {
   public static PinotLogicalSortExchange create(
       RelNode input,
       RelDistribution distribution,
+      PinotRelExchangeType exchangeType,
       RelCollation collation,
       boolean isSortOnSender,
       boolean isSortOnReceiver) {
@@ -82,7 +97,7 @@ public class PinotLogicalSortExchange extends SortExchange {
     distribution = RelDistributionTraitDef.INSTANCE.canonize(distribution);
     RelTraitSet traitSet =
         input.getTraitSet().replace(Convention.NONE).replace(distribution).replace(collation);
-    return new PinotLogicalSortExchange(cluster, traitSet, input, distribution,
+    return new PinotLogicalSortExchange(cluster, traitSet, input, distribution, exchangeType,
         collation, isSortOnSender, isSortOnReceiver);
   }
 
@@ -92,14 +107,18 @@ public class PinotLogicalSortExchange extends SortExchange {
   public SortExchange copy(RelTraitSet traitSet, RelNode newInput,
       RelDistribution newDistribution, RelCollation newCollation) {
     return new PinotLogicalSortExchange(this.getCluster(), traitSet, newInput,
-        newDistribution, newCollation, _isSortOnSender, _isSortOnReceiver);
+        newDistribution, _exchangeType, newCollation, _isSortOnSender, _isSortOnReceiver);
   }
 
   @Override
   public RelWriter explainTerms(RelWriter pw) {
-    return super.explainTerms(pw)
+    RelWriter relWriter = super.explainTerms(pw)
         .item("isSortOnSender", _isSortOnSender)
         .item("isSortOnReceiver", _isSortOnReceiver);
+    if (_exchangeType != PinotRelExchangeType.getDefaultExchangeType()) {
+      relWriter.item("relExchangeType", _exchangeType);
+    }
+    return relWriter;
   }
 
   public boolean isSortOnSender() {
@@ -109,4 +128,8 @@ public class PinotLogicalSortExchange extends SortExchange {
   public boolean isSortOnReceiver() {
     return _isSortOnReceiver;
   }
+
+  public PinotRelExchangeType getExchangeType() {
+    return _exchangeType;
+  }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotRelExchangeType.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotRelExchangeType.java
new file mode 100644
index 0000000000..4a06e1d025
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotRelExchangeType.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.logical;
+
+/** Type of exchange. */
+public enum PinotRelExchangeType {
+
+  /**
+   * A streaming exchange is one that data will be sent and received in a streaming fashion.
+   */
+  STREAMING,
+
+  /**
+   * A sub-plan exchange is one that multi-stage query plan will execute the sub-tree below the exchange first; then
+   * treat the result as a {@link org.apache.calcite.rex.RexLiteral}; and put back into the logical plan for further
+   * optimization from the {@link org.apache.calcite.plan.RelOptPlanner}.
+   *
+   * <p>This is useful when plan can be further optimized if a sub plan is executed first from the broker.</p>
+   */
+  SUB_PLAN,
+
+  /**
+   * A pipeline-breaker is one that data will be sent streamingly from sender, but receiver will not start execution
+   * until all data has been received successfully.
+   *
+   * <p>This is useful when logical plan is fixed, but physical plan can be further optimized on the server.</p>
+   */
+  PIPELINE_BREAKER;
+
+  public static PinotRelExchangeType getDefaultExchangeType() {
+    return STREAMING;
+  }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
index e5fcb1a9ca..c0523f47ca 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
@@ -39,8 +39,8 @@ import org.apache.calcite.rel.hint.PinotHintOptions;
 import org.apache.calcite.rel.hint.PinotHintStrategyTable;
 import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.rel.logical.LogicalAggregate;
-import org.apache.calcite.rel.logical.LogicalExchange;
 import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.PinotLogicalExchange;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
@@ -134,11 +134,11 @@ public class PinotAggregateExchangeNodeInsertRule extends RelOptRule {
 
     // 2. attach exchange.
     List<Integer> groupSetIndices = ImmutableIntList.range(0, oldAggRel.getGroupCount());
-    LogicalExchange exchange = null;
+    PinotLogicalExchange exchange = null;
     if (groupSetIndices.size() == 0) {
-      exchange = LogicalExchange.create(newLeafAgg, RelDistributions.hash(Collections.emptyList()));
+      exchange = PinotLogicalExchange.create(newLeafAgg, RelDistributions.hash(Collections.emptyList()));
     } else {
-      exchange = LogicalExchange.create(newLeafAgg, RelDistributions.hash(groupSetIndices));
+      exchange = PinotLogicalExchange.create(newLeafAgg, RelDistributions.hash(groupSetIndices));
     }
 
     // 3. attach intermediate agg stage.
@@ -146,7 +146,7 @@ public class PinotAggregateExchangeNodeInsertRule extends RelOptRule {
     call.transformTo(newAggNode);
   }
 
-  private RelNode makeNewIntermediateAgg(RelOptRuleCall ruleCall, Aggregate oldAggRel, LogicalExchange exchange,
+  private RelNode makeNewIntermediateAgg(RelOptRuleCall ruleCall, Aggregate oldAggRel, PinotLogicalExchange exchange,
       boolean isLeafStageAggregationPresent, List<Integer> argList, List<Integer> groupByList) {
 
     // add the exchange as the input node to the relation builder.
@@ -257,7 +257,7 @@ public class PinotAggregateExchangeNodeInsertRule extends RelOptRule {
     }
 
     // 2. Create an exchange on top of the LogicalProject.
-    LogicalExchange exchange = LogicalExchange.create(project, RelDistributions.hash(newAggGroupByColumns));
+    PinotLogicalExchange exchange = PinotLogicalExchange.create(project, RelDistributions.hash(newAggGroupByColumns));
 
     // 3. Create an intermediate stage aggregation.
     RelNode newAggNode =
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
index 77fce00ea0..3dcad6998f 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
@@ -27,8 +27,8 @@ import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinInfo;
 import org.apache.calcite.rel.hint.PinotHintOptions;
 import org.apache.calcite.rel.hint.PinotHintStrategyTable;
-import org.apache.calcite.rel.logical.LogicalExchange;
 import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.PinotLogicalExchange;
 import org.apache.calcite.tools.RelBuilderFactory;
 
 
@@ -69,16 +69,16 @@ public class PinotJoinExchangeNodeInsertRule extends RelOptRule {
         PinotHintOptions.JOIN_HINT_OPTIONS, PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
     if (isColocatedJoin) {
       // join exchange are colocated, we should directly pass through via join key
-      leftExchange = LogicalExchange.create(leftInput, RelDistributions.SINGLETON);
-      rightExchange = LogicalExchange.create(rightInput, RelDistributions.SINGLETON);
+      leftExchange = PinotLogicalExchange.create(leftInput, RelDistributions.SINGLETON);
+      rightExchange = PinotLogicalExchange.create(rightInput, RelDistributions.SINGLETON);
     } else if (joinInfo.leftKeys.isEmpty()) {
       // when there's no JOIN key, use broadcast.
-      leftExchange = LogicalExchange.create(leftInput, RelDistributions.RANDOM_DISTRIBUTED);
-      rightExchange = LogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED);
+      leftExchange = PinotLogicalExchange.create(leftInput, RelDistributions.RANDOM_DISTRIBUTED);
+      rightExchange = PinotLogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED);
     } else {
       // when join key exists, use hash distribution.
-      leftExchange = LogicalExchange.create(leftInput, RelDistributions.hash(joinInfo.leftKeys));
-      rightExchange = LogicalExchange.create(rightInput, RelDistributions.hash(joinInfo.rightKeys));
+      leftExchange = PinotLogicalExchange.create(leftInput, RelDistributions.hash(joinInfo.leftKeys));
+      rightExchange = PinotLogicalExchange.create(rightInput, RelDistributions.hash(joinInfo.rightKeys));
     }
 
     RelNode newJoinNode =
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
index 7fee810386..3512cc581d 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
@@ -26,13 +26,15 @@ import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.hep.HepRelVertex;
 import org.apache.calcite.rel.RelDistributions;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Exchange;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinInfo;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.hint.PinotHintOptions;
 import org.apache.calcite.rel.hint.PinotHintStrategyTable;
-import org.apache.calcite.rel.logical.LogicalExchange;
 import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.PinotLogicalExchange;
+import org.apache.calcite.rel.logical.PinotRelExchangeType;
 import org.apache.calcite.tools.RelBuilderFactory;
 import org.apache.zookeeper.common.StringUtils;
 
@@ -140,7 +142,7 @@ public class PinotJoinToDynamicBroadcastRule extends RelOptRule {
         : join.getLeft();
     RelNode right = join.getRight() instanceof HepRelVertex ? ((HepRelVertex) join.getRight()).getCurrentRel()
         : join.getRight();
-    return left instanceof LogicalExchange && right instanceof LogicalExchange
+    return left instanceof Exchange && right instanceof Exchange
         && PinotRuleUtils.noExchangeInSubtree(left.getInput(0))
         && (join.getJoinType() == JoinRelType.SEMI && joinInfo.nonEquiConditions.isEmpty());
   }
@@ -148,23 +150,25 @@ public class PinotJoinToDynamicBroadcastRule extends RelOptRule {
   @Override
   public void onMatch(RelOptRuleCall call) {
     Join join = call.rel(0);
-    LogicalExchange left = (LogicalExchange) (join.getLeft() instanceof HepRelVertex
+    PinotLogicalExchange left = (PinotLogicalExchange) (join.getLeft() instanceof HepRelVertex
         ? ((HepRelVertex) join.getLeft()).getCurrentRel() : join.getLeft());
-    LogicalExchange right = (LogicalExchange) (join.getRight() instanceof HepRelVertex
+    PinotLogicalExchange right = (PinotLogicalExchange) (join.getRight() instanceof HepRelVertex
         ? ((HepRelVertex) join.getRight()).getCurrentRel() : join.getRight());
 
     boolean isColocatedJoin = PinotHintStrategyTable.containsHintOption(join.getHints(),
         PinotHintOptions.JOIN_HINT_OPTIONS, PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
-    LogicalExchange dynamicBroadcastExchange = isColocatedJoin
-        ? LogicalExchange.create(right.getInput(), RelDistributions.SINGLETON)
-        : LogicalExchange.create(right.getInput(), RelDistributions.BROADCAST_DISTRIBUTED);
+    PinotLogicalExchange dynamicBroadcastExchange = isColocatedJoin
+        ? PinotLogicalExchange.create(right.getInput(), RelDistributions.SINGLETON,
+            PinotRelExchangeType.PIPELINE_BREAKER)
+        : PinotLogicalExchange.create(right.getInput(), RelDistributions.BROADCAST_DISTRIBUTED,
+            PinotRelExchangeType.PIPELINE_BREAKER);
     Join dynamicFilterJoin =
         new LogicalJoin(join.getCluster(), join.getTraitSet(), left.getInput(), dynamicBroadcastExchange,
             join.getCondition(), join.getVariablesSet(), join.getJoinType(), join.isSemiJoinDone(),
             ImmutableList.copyOf(join.getSystemFieldList()));
     // adding pass-through exchange after join b/c currently leaf-stage doesn't support chaining operator(s) after JOIN
-    LogicalExchange passThroughAfterJoinExchange =
-        LogicalExchange.create(dynamicFilterJoin, RelDistributions.SINGLETON);
+    PinotLogicalExchange passThroughAfterJoinExchange =
+        PinotLogicalExchange.create(dynamicFilterJoin, RelDistributions.SINGLETON);
     call.transformTo(passThroughAfterJoinExchange);
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRuleUtils.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRuleUtils.java
index 5c52512def..530704750f 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRuleUtils.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRuleUtils.java
@@ -26,7 +26,6 @@ import org.apache.calcite.rel.core.Exchange;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.RelFactories;
-import org.apache.calcite.rel.logical.LogicalExchange;
 import org.apache.calcite.tools.RelBuilder;
 import org.apache.calcite.tools.RelBuilderFactory;
 
@@ -71,7 +70,7 @@ public class PinotRuleUtils {
     if (relNode instanceof HepRelVertex) {
       relNode = ((HepRelVertex) relNode).getCurrentRel();
     }
-    if (relNode instanceof LogicalExchange) {
+    if (relNode instanceof Exchange) {
       return false;
     }
     for (RelNode child : relNode.getInputs()) {
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSetOpExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSetOpExchangeNodeInsertRule.java
index 16227a1f83..9fe933aa4b 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSetOpExchangeNodeInsertRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSetOpExchangeNodeInsertRule.java
@@ -27,10 +27,10 @@ import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rel.RelDistributions;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.SetOp;
-import org.apache.calcite.rel.logical.LogicalExchange;
 import org.apache.calcite.rel.logical.LogicalIntersect;
 import org.apache.calcite.rel.logical.LogicalMinus;
 import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rel.logical.PinotLogicalExchange;
 import org.apache.calcite.tools.RelBuilderFactory;
 
 
@@ -69,7 +69,7 @@ public class PinotSetOpExchangeNodeInsertRule extends RelOptRule {
     List<Integer> hashFields =
         IntStream.range(0, setOp.getRowType().getFieldCount()).boxed().collect(Collectors.toCollection(ArrayList::new));
     for (RelNode input : setOp.getInputs()) {
-      RelNode exchange = LogicalExchange.create(input, RelDistributions.hash(hashFields));
+      RelNode exchange = PinotLogicalExchange.create(input, RelDistributions.hash(hashFields));
       newInputs.add(exchange);
     }
     SetOp newSetOpNode;
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java
index ae1ae851b8..da9a8d98ed 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java
@@ -34,9 +34,9 @@ import org.apache.calcite.rel.RelDistributions;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.Window;
-import org.apache.calcite.rel.logical.LogicalExchange;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.logical.PinotLogicalExchange;
 import org.apache.calcite.rel.logical.PinotLogicalSortExchange;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
@@ -92,7 +92,7 @@ public class PinotWindowExchangeNodeInsertRule extends RelOptRule {
     Window.Group windowGroup = window.groups.get(0);
     if (windowGroup.keys.isEmpty() && windowGroup.orderKeys.getKeys().isEmpty()) {
       // Empty OVER()
-      // Add a single LogicalExchange for empty OVER() since no sort is required
+      // Add a single Exchange for empty OVER() since no sort is required
 
       if (PinotRuleUtils.isProject(windowInput)) {
         // Check for empty LogicalProject below LogicalWindow. If present modify it to be a Literal only project and add
@@ -105,7 +105,8 @@ public class PinotWindowExchangeNodeInsertRule extends RelOptRule {
         }
       }
 
-      LogicalExchange exchange = LogicalExchange.create(windowInput, RelDistributions.hash(Collections.emptyList()));
+      PinotLogicalExchange exchange = PinotLogicalExchange.create(windowInput,
+          RelDistributions.hash(Collections.emptyList()));
       call.transformTo(
           LogicalWindow.create(window.getTraitSet(), exchange, window.constants, window.getRowType(), window.groups));
     } else if (windowGroup.keys.isEmpty() && !windowGroup.orderKeys.getKeys().isEmpty()) {
@@ -126,8 +127,8 @@ public class PinotWindowExchangeNodeInsertRule extends RelOptRule {
 
       if (isPartitionByOnly) {
         // Only PARTITION BY or PARTITION BY and ORDER BY on the same key(s)
-        // Add a LogicalExchange hashed on the partition by keys
-        LogicalExchange exchange = LogicalExchange.create(windowInput,
+        // Add an Exchange hashed on the partition by keys
+        PinotLogicalExchange exchange = PinotLogicalExchange.create(windowInput,
             RelDistributions.hash(windowGroup.keys.toList()));
         call.transformTo(LogicalWindow.create(window.getTraitSet(), exchange, window.constants, window.getRowType(),
             window.groups));
@@ -217,7 +218,7 @@ public class PinotWindowExchangeNodeInsertRule extends RelOptRule {
    *
    * This function modifies the empty LogicalProject below the LogicalWindow to add a literal and adds a LogicalProject
    * above LogicalWindow to remove the additional literal column from being projected any further. This also handles
-   * the addition of the LogicalExchange under the LogicalWindow.
+   * the addition of the Exchange under the LogicalWindow.
    *
    * TODO: Explore an option to handle empty LogicalProject by actually projecting empty rows for each entry. This way
    *       there will no longer be a need to add a literal to the empty LogicalProject, but just traverse the number of
@@ -240,9 +241,9 @@ public class PinotWindowExchangeNodeInsertRule extends RelOptRule {
     outputBuilder.addAll(window.getRowType().getFieldList());
 
     // This scenario is only possible for empty OVER() which uses functions that have no arguments such as COUNT(*) or
-    // ROW_NUMBER(). Add a LogicalExchange with empty hash distribution list
-    LogicalExchange exchange =
-        LogicalExchange.create(projectBelowWindow, RelDistributions.hash(Collections.emptyList()));
+    // ROW_NUMBER(). Add an Exchange with empty hash distribution list
+    PinotLogicalExchange exchange =
+        PinotLogicalExchange.create(projectBelowWindow, RelDistributions.hash(Collections.emptyList()));
     Window newWindow = new LogicalWindow(window.getCluster(), window.getTraitSet(), exchange,
         window.getConstants(), outputBuilder.build(), window.groups);
 
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlanFragment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlanFragment.java
index b8dd4e1be6..ad0f846180 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlanFragment.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlanFragment.java
@@ -24,7 +24,7 @@ import org.apache.pinot.query.planner.plannode.PlanNode;
 
 /**
  * The {@code PlanFragment} is the logical sub query plan that should be scheduled together from the result of
- * {@link org.apache.pinot.query.planner.logical.PinotQueryFragmenter}.
+ * {@link org.apache.pinot.query.planner.logical.PlanFragmenter}.
  *
  */
 public class PlanFragment {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
index 795276b1e5..924a580fa5 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.logical.PinotRelExchangeType;
 import org.apache.pinot.query.planner.PlanFragment;
 import org.apache.pinot.query.planner.PlanFragmentMetadata;
 import org.apache.pinot.query.planner.QueryPlan;
@@ -92,12 +93,14 @@ public class PinotLogicalQueryPlanner {
       // receiver so doesn't matter what the exchange type is. setting it to SINGLETON by default.
       PlanNode subPlanRootSenderNode =
           new MailboxSendNode(subPlanRoot.getPlanFragmentId(), subPlanRoot.getDataSchema(),
-              0, RelDistribution.Type.RANDOM_DISTRIBUTED, null, null, false);
+              0, RelDistribution.Type.RANDOM_DISTRIBUTED, PinotRelExchangeType.getDefaultExchangeType(), null, null,
+              false);
       subPlanRootSenderNode.addInput(subPlanRoot);
 
       PlanNode subPlanRootReceiverNode =
           new MailboxReceiveNode(0, subPlanRoot.getDataSchema(), subPlanRoot.getPlanFragmentId(),
-              RelDistribution.Type.RANDOM_DISTRIBUTED, null, null, false, false, subPlanRootSenderNode);
+              RelDistribution.Type.RANDOM_DISTRIBUTED, PinotRelExchangeType.getDefaultExchangeType(), null, null,
+              false, false, subPlanRootSenderNode);
       subPlanRoot = subPlanRootReceiverNode;
       PlanFragment planFragment1 = planFragmentContext._planFragmentIdToRootNodeMap.get(1);
       planFragmentContext._planFragmentIdToRootNodeMap.put(1,
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java
index 1c4c90f958..8ea16e8828 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.logical.PinotRelExchangeType;
 import org.apache.pinot.query.planner.PlanFragment;
 import org.apache.pinot.query.planner.PlanFragmentMetadata;
 import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
@@ -132,19 +133,21 @@ public class PlanFragmenter implements PlanNodeVisitor<PlanNode, PlanFragmenter.
     PlanNode nextPlanFragmentRoot = node.getInputs().get(0).visit(this, context);
 
     List<Integer> distributionKeys = node.getDistributionKeys();
-    RelDistribution.Type exchangeType = node.getDistributionType();
+    RelDistribution.Type distributionType = node.getDistributionType();
+    PinotRelExchangeType exchangeType = node.getExchangeType();
 
     // make an exchange sender and receiver node pair
     // only HASH_DISTRIBUTED requires a partition key selector; so all other types (SINGLETON and BROADCAST)
     // of exchange will not carry a partition key selector.
-    KeySelector<Object[], Object[]> keySelector = exchangeType == RelDistribution.Type.HASH_DISTRIBUTED
+    KeySelector<Object[], Object[]> keySelector = distributionType == RelDistribution.Type.HASH_DISTRIBUTED
         ? new FieldSelectionKeySelector(distributionKeys) : null;
 
     PlanNode mailboxSender =
         new MailboxSendNode(nextPlanFragmentId, nextPlanFragmentRoot.getDataSchema(),
-            currentPlanFragmentId, exchangeType, keySelector, node.getCollations(), node.isSortOnSender());
+            currentPlanFragmentId, distributionType, exchangeType, keySelector, node.getCollations(),
+            node.isSortOnSender());
     PlanNode mailboxReceiver = new MailboxReceiveNode(currentPlanFragmentId, nextPlanFragmentRoot.getDataSchema(),
-        nextPlanFragmentId, exchangeType, keySelector,
+        nextPlanFragmentId, distributionType, exchangeType, keySelector,
         node.getCollations(), node.isSortOnSender(), node.isSortOnReceiver(), mailboxSender);
     mailboxSender.addInput(nextPlanFragmentRoot);
 
@@ -159,8 +162,7 @@ public class PlanFragmenter implements PlanNodeVisitor<PlanNode, PlanFragmenter.
   }
 
   private boolean isPlanFragmentSplitter(PlanNode node) {
-    // TODO: always return true for now, we will add more logic here later.
-    return true;
+    return ((ExchangeNode) node).getExchangeType() != PinotRelExchangeType.SUB_PLAN;
   }
 
   public static class Context {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
index 7254ef60dc..63da99df08 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
@@ -39,7 +39,9 @@ import org.apache.calcite.rel.logical.LogicalSort;
 import org.apache.calcite.rel.logical.LogicalTableScan;
 import org.apache.calcite.rel.logical.LogicalValues;
 import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.logical.PinotLogicalExchange;
 import org.apache.calcite.rel.logical.PinotLogicalSortExchange;
+import org.apache.calcite.rel.logical.PinotRelExchangeType;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rel.type.RelRecordType;
@@ -108,20 +110,27 @@ public final class RelToPlanNodeConverter {
     RelCollation collation = null;
     boolean isSortOnSender = false;
     boolean isSortOnReceiver = false;
+    PinotRelExchangeType exchangeType = PinotRelExchangeType.getDefaultExchangeType();
     if (node instanceof SortExchange) {
       collation = ((SortExchange) node).getCollation();
       if (node instanceof PinotLogicalSortExchange) {
         // These flags only take meaning if the collation is not null or empty
         isSortOnSender = ((PinotLogicalSortExchange) node).isSortOnSender();
         isSortOnReceiver = ((PinotLogicalSortExchange) node).isSortOnReceiver();
+        exchangeType = ((PinotLogicalSortExchange) node).getExchangeType();
+      }
+    } else {
+      if (node instanceof PinotLogicalExchange) {
+        exchangeType = ((PinotLogicalExchange) node).getExchangeType();
       }
     }
     List<RelFieldCollation> fieldCollations = (collation == null) ? null : collation.getFieldCollations();
 
     // Compute all the tables involved under this exchange node
     Set<String> tableNames = getTableNamesFromRelRoot(node);
-    return new ExchangeNode(currentStageId, toDataSchema(node.getRowType()), tableNames, node.getDistribution(),
-        fieldCollations, isSortOnSender, isSortOnReceiver);
+
+    return new ExchangeNode(currentStageId, toDataSchema(node.getRowType()), exchangeType,
+        tableNames, node.getDistribution(), fieldCollations, isSortOnSender, isSortOnReceiver);
   }
 
   private static PlanNode convertLogicalSetOp(SetOp node, int currentStageId) {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
index 54d03da712..ba06f3f405 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
@@ -136,7 +136,7 @@ public class ShuffleRewriteVisitor implements PlanNodeVisitor<Set<Integer>, Void
     KeySelector<Object[], Object[]> selector = node.getPartitionKeySelector();
 
     if (canSkipShuffle(oldPartitionKeys, selector)) {
-      node.setExchangeType(RelDistribution.Type.SINGLETON);
+      node.setDistributionType(RelDistribution.Type.SINGLETON);
       return oldPartitionKeys;
     } else if (selector == null) {
       return new HashSet<>();
@@ -151,7 +151,7 @@ public class ShuffleRewriteVisitor implements PlanNodeVisitor<Set<Integer>, Void
     KeySelector<Object[], Object[]> selector = node.getPartitionKeySelector();
 
     if (canSkipShuffle(oldPartitionKeys, selector)) {
-      node.setExchangeType(RelDistribution.Type.SINGLETON);
+      node.setDistributionType(RelDistribution.Type.SINGLETON);
       return oldPartitionKeys;
     } else {
       // reset the context partitionKeys since we've determined that
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/SubPlanFragmenter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/SubPlanFragmenter.java
index adb87d1ad8..938920ea57 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/SubPlanFragmenter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/SubPlanFragmenter.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.calcite.rel.logical.PinotRelExchangeType;
 import org.apache.pinot.query.planner.SubPlanMetadata;
 import org.apache.pinot.query.planner.plannode.AggregateNode;
 import org.apache.pinot.query.planner.plannode.ExchangeNode;
@@ -138,8 +139,7 @@ public class SubPlanFragmenter implements PlanNodeVisitor<PlanNode, SubPlanFragm
   }
 
   private boolean isSubPlanSplitter(PlanNode node) {
-    // TODO: implement this when we introduce a new type of exchange node for sub-plan splitter
-    return false;
+    return ((ExchangeNode) node).getExchangeType() == PinotRelExchangeType.SUB_PLAN;
   }
 
   public static class Context {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
index 0057dc35ab..50f78e64bf 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
@@ -173,7 +173,7 @@ public class GreedyShuffleRewriteVisitor implements PlanNodeVisitor<Set<Colocati
         return new HashSet<>();
       } else if (colocationKeyCondition(oldColocationKeys, selector) && areServersSuperset(node.getPlanFragmentId(),
           node.getSenderStageId())) {
-        node.setExchangeType(RelDistribution.Type.SINGLETON);
+        node.setDistributionType(RelDistribution.Type.SINGLETON);
         _dispatchablePlanMetadataMap.get(node.getPlanFragmentId()).setServerInstanceToWorkerIdMap(
             _dispatchablePlanMetadataMap.get(node.getSenderStageId()).getServerInstanceToWorkerIdMap());
         return oldColocationKeys;
@@ -187,10 +187,10 @@ public class GreedyShuffleRewriteVisitor implements PlanNodeVisitor<Set<Colocati
     }
     // If the current stage is a join-stage then we already know whether shuffle can be skipped.
     if (_canSkipShuffleForJoin) {
-      node.setExchangeType(RelDistribution.Type.SINGLETON);
+      node.setDistributionType(RelDistribution.Type.SINGLETON);
       // For the join-case, servers are already re-assigned in visitJoin. Moreover, we haven't yet changed sender's
       // distribution.
-      ((MailboxSendNode) node.getSender()).setExchangeType(RelDistribution.Type.SINGLETON);
+      ((MailboxSendNode) node.getSender()).setDistributionType(RelDistribution.Type.SINGLETON);
       return oldColocationKeys;
     } else if (selector == null) {
       return new HashSet<>();
@@ -214,7 +214,7 @@ public class GreedyShuffleRewriteVisitor implements PlanNodeVisitor<Set<Colocati
       Set<ColocationKey> colocationKeys;
       if (canSkipShuffleBasic && areServersSuperset(node.getReceiverStageId(), node.getPlanFragmentId())) {
         // Servers are not re-assigned on sender-side. If needed, they are re-assigned on the receiver side.
-        node.setExchangeType(RelDistribution.Type.SINGLETON);
+        node.setDistributionType(RelDistribution.Type.SINGLETON);
         colocationKeys = oldColocationKeys;
       } else {
         colocationKeys = new HashSet<>();
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
index 46baec9be5..1d136efb20 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/ExchangeNode.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Set;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.logical.PinotRelExchangeType;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.serde.ProtoProperties;
 
@@ -33,7 +34,10 @@ import org.apache.pinot.query.planner.serde.ProtoProperties;
 public class ExchangeNode extends AbstractPlanNode {
 
   @ProtoProperties
-  private RelDistribution.Type _exchangeType;
+  private PinotRelExchangeType _exchangeType;
+
+  @ProtoProperties
+  private RelDistribution.Type _distributionType;
 
   @ProtoProperties
   private List<Integer> _keys;
@@ -57,12 +61,13 @@ public class ExchangeNode extends AbstractPlanNode {
     super(planFragmentId);
   }
 
-  public ExchangeNode(int currentStageId, DataSchema dataSchema, Set<String> tableNames, RelDistribution distribution,
-      List<RelFieldCollation> collations, boolean isSortOnSender,
+  public ExchangeNode(int currentStageId, DataSchema dataSchema, PinotRelExchangeType exchangeType,
+      Set<String> tableNames, RelDistribution distribution, List<RelFieldCollation> collations, boolean isSortOnSender,
       boolean isSortOnReceiver) {
     super(currentStageId, dataSchema);
+    _exchangeType = exchangeType;
     _keys = distribution.getKeys();
-    _exchangeType = distribution.getType();
+    _distributionType = distribution.getType();
     _isSortOnSender = isSortOnSender;
     _isSortOnReceiver = isSortOnReceiver;
     _collations = collations;
@@ -79,10 +84,14 @@ public class ExchangeNode extends AbstractPlanNode {
     return visitor.visitExchange(this, context);
   }
 
-  public RelDistribution.Type getDistributionType() {
+  public PinotRelExchangeType getExchangeType() {
     return _exchangeType;
   }
 
+  public RelDistribution.Type getDistributionType() {
+    return _distributionType;
+  }
+
   public List<Integer> getDistributionKeys() {
     return _keys;
   }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java
index e67b8343ed..c2515edbb6 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java
@@ -27,6 +27,7 @@ import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelFieldCollation.Direction;
 import org.apache.calcite.rel.RelFieldCollation.NullDirection;
+import org.apache.calcite.rel.logical.PinotRelExchangeType;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.logical.RexExpression;
@@ -38,7 +39,9 @@ public class MailboxReceiveNode extends AbstractPlanNode {
   @ProtoProperties
   private int _senderStageId;
   @ProtoProperties
-  private RelDistribution.Type _exchangeType;
+  private RelDistribution.Type _distributionType;
+  @ProtoProperties
+  private PinotRelExchangeType _exchangeType;
   @ProtoProperties
   private KeySelector<Object[], Object[]> _partitionKeySelector;
   @ProtoProperties
@@ -61,11 +64,13 @@ public class MailboxReceiveNode extends AbstractPlanNode {
   }
 
   public MailboxReceiveNode(int planFragmentId, DataSchema dataSchema, int senderStageId,
-      RelDistribution.Type exchangeType, @Nullable KeySelector<Object[], Object[]> partitionKeySelector,
+      RelDistribution.Type distributionType, PinotRelExchangeType exchangeType,
+      @Nullable KeySelector<Object[], Object[]> partitionKeySelector,
       @Nullable List<RelFieldCollation> fieldCollations, boolean isSortOnSender, boolean isSortOnReceiver,
       PlanNode sender) {
     super(planFragmentId, dataSchema);
     _senderStageId = senderStageId;
+    _distributionType = distributionType;
     _exchangeType = exchangeType;
     _partitionKeySelector = partitionKeySelector;
     if (!CollectionUtils.isEmpty(fieldCollations)) {
@@ -104,11 +109,19 @@ public class MailboxReceiveNode extends AbstractPlanNode {
     return _senderStageId;
   }
 
-  public void setExchangeType(RelDistribution.Type exchangeType) {
+  public void setDistributionType(RelDistribution.Type distributionType) {
+    _distributionType = distributionType;
+  }
+
+  public RelDistribution.Type getDistributionType() {
+    return _distributionType;
+  }
+
+  public void setExchangeType(PinotRelExchangeType exchangeType) {
     _exchangeType = exchangeType;
   }
 
-  public RelDistribution.Type getExchangeType() {
+  public PinotRelExchangeType getExchangeType() {
     return _exchangeType;
   }
 
@@ -142,7 +155,7 @@ public class MailboxReceiveNode extends AbstractPlanNode {
 
   @Override
   public String explain() {
-    return "MAIL_RECEIVE(" + _exchangeType + ")";
+    return "MAIL_RECEIVE(" + _distributionType + ")";
   }
 
   @Override
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
index 08624959b7..1edf2902df 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java
@@ -25,6 +25,7 @@ import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.logical.PinotRelExchangeType;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.logical.RexExpression;
@@ -36,7 +37,9 @@ public class MailboxSendNode extends AbstractPlanNode {
   @ProtoProperties
   private int _receiverStageId;
   @ProtoProperties
-  private RelDistribution.Type _exchangeType;
+  private RelDistribution.Type _distributionType;
+  @ProtoProperties
+  private PinotRelExchangeType _exchangeType;
   @ProtoProperties
   private KeySelector<Object[], Object[]> _partitionKeySelector;
   @ProtoProperties
@@ -51,10 +54,12 @@ public class MailboxSendNode extends AbstractPlanNode {
   }
 
   public MailboxSendNode(int planFragmentId, DataSchema dataSchema, int receiverStageId,
-      RelDistribution.Type exchangeType, @Nullable KeySelector<Object[], Object[]> partitionKeySelector,
+      RelDistribution.Type distributionType, PinotRelExchangeType exchangeType,
+      @Nullable KeySelector<Object[], Object[]> partitionKeySelector,
       @Nullable List<RelFieldCollation> fieldCollations, boolean isSortOnSender) {
     super(planFragmentId, dataSchema);
     _receiverStageId = receiverStageId;
+    _distributionType = distributionType;
     _exchangeType = exchangeType;
     _partitionKeySelector = partitionKeySelector;
     // TODO: Support ordering here if the 'fieldCollations' aren't empty and 'sortOnSender' is true
@@ -81,11 +86,19 @@ public class MailboxSendNode extends AbstractPlanNode {
     _receiverStageId = receiverStageId;
   }
 
-  public void setExchangeType(RelDistribution.Type exchangeType) {
+  public void setDistributionType(RelDistribution.Type distributionType) {
+    _distributionType = distributionType;
+  }
+
+  public RelDistribution.Type getDistributionType() {
+    return _distributionType;
+  }
+
+  public void setExchangeType(PinotRelExchangeType exchangeType) {
     _exchangeType = exchangeType;
   }
 
-  public RelDistribution.Type getExchangeType() {
+  public PinotRelExchangeType getExchangeType() {
     return _exchangeType;
   }
 
@@ -107,7 +120,7 @@ public class MailboxSendNode extends AbstractPlanNode {
 
   @Override
   public String explain() {
-    return "MAIL_SEND(" + _exchangeType + ")";
+    return "MAIL_SEND(" + _distributionType + ")";
   }
 
   @Override
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index c3e45746bf..0073455ed1 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -89,17 +89,17 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
             // JOIN is exchanged with hash distribution (data shuffle)
             MailboxReceiveNode left = (MailboxReceiveNode) node.getInputs().get(0);
             MailboxReceiveNode right = (MailboxReceiveNode) node.getInputs().get(1);
-            Assert.assertEquals(left.getExchangeType(), RelDistribution.Type.HASH_DISTRIBUTED);
-            Assert.assertEquals(right.getExchangeType(), RelDistribution.Type.HASH_DISTRIBUTED);
+            Assert.assertEquals(left.getDistributionType(), RelDistribution.Type.HASH_DISTRIBUTED);
+            Assert.assertEquals(right.getDistributionType(), RelDistribution.Type.HASH_DISTRIBUTED);
             break;
           }
           if (node instanceof AggregateNode && node.getInputs().get(0) instanceof MailboxReceiveNode) {
             // AGG is exchanged with singleton since it has already been distributed by JOIN.
             MailboxReceiveNode input = (MailboxReceiveNode) node.getInputs().get(0);
             if (shouldRewrite) {
-              Assert.assertEquals(input.getExchangeType(), RelDistribution.Type.SINGLETON);
+              Assert.assertEquals(input.getDistributionType(), RelDistribution.Type.SINGLETON);
             } else {
-              Assert.assertNotEquals(input.getExchangeType(), RelDistribution.Type.SINGLETON);
+              Assert.assertNotEquals(input.getDistributionType(), RelDistribution.Type.SINGLETON);
             }
             break;
           }
@@ -330,18 +330,18 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
         new Object[]{"EXPLAIN PLAN EXCLUDING ATTRIBUTES AS DOT FOR SELECT col1, COUNT(*) FROM a GROUP BY col1",
               "Execution Plan\n"
             + "digraph {\n"
-            + "\"LogicalExchange\\n\" -> \"LogicalAggregate\\n\" [label=\"0\"]\n"
-            + "\"LogicalAggregate\\n\" -> \"LogicalExchange\\n\" [label=\"0\"]\n"
+            + "\"PinotLogicalExchange\\n\" -> \"LogicalAggregate\\n\" [label=\"0\"]\n"
+            + "\"LogicalAggregate\\n\" -> \"PinotLogicalExchange\\n\" [label=\"0\"]\n"
             + "\"LogicalTableScan\\n\" -> \"LogicalAggregate\\n\" [label=\"0\"]\n"
             + "}\n"},
         new Object[]{"EXPLAIN PLAN FOR SELECT a.col1, b.col3 FROM a JOIN b ON a.col1 = b.col1",
               "Execution Plan\n"
             + "LogicalProject(col1=[$0], col3=[$2])\n"
             + "  LogicalJoin(condition=[=($0, $1)], joinType=[inner])\n"
-            + "    LogicalExchange(distribution=[hash[0]])\n"
+            + "    PinotLogicalExchange(distribution=[hash[0]])\n"
             + "      LogicalProject(col1=[$0])\n"
             + "        LogicalTableScan(table=[[a]])\n"
-            + "    LogicalExchange(distribution=[hash[0]])\n"
+            + "    PinotLogicalExchange(distribution=[hash[0]])\n"
             + "      LogicalProject(col1=[$0], col3=[$2])\n"
             + "        LogicalTableScan(table=[[b]])\n"
         },
diff --git a/pinot-query-planner/src/test/resources/queries/AggregatePlans.json b/pinot-query-planner/src/test/resources/queries/AggregatePlans.json
index 7f4dcf0f47..ffb95c703e 100644
--- a/pinot-query-planner/src/test/resources/queries/AggregatePlans.json
+++ b/pinot-query-planner/src/test/resources/queries/AggregatePlans.json
@@ -8,7 +8,7 @@
           "Execution Plan",
           "\nLogicalProject(avg=[/(CASE(=($1, 0), null:DECIMAL(1000, 0), $0), $1)])",
           "\n  LogicalAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[$SUM0($1)])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalAggregate(group=[{}], agg#0=[$SUM0($2)], agg#1=[COUNT()])",
           "\n        LogicalProject(col2=[$1], col3=[$2], col4=[$3])",
           "\n          LogicalFilter(condition=[AND(>=($2, 0), =($1, 'pink floyd'))])",
@@ -23,7 +23,7 @@
           "Execution Plan",
           "\nLogicalProject(avg=[/(CASE(=($1, 0), null:DECIMAL(1000, 0), $0), $1)], sum=[CASE(=($1, 0), null:DECIMAL(1000, 0), $0)], max=[$2])",
           "\n  LogicalAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[$SUM0($1)], max=[MAX($2)])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalAggregate(group=[{}], agg#0=[$SUM0($2)], agg#1=[COUNT()], max=[MAX($2)])",
           "\n        LogicalProject(col2=[$1], col3=[$2], col4=[$3])",
           "\n          LogicalFilter(condition=[AND(>=($2, 0), =($1, 'pink floyd'))])",
@@ -38,7 +38,7 @@
           "Execution Plan",
           "\nLogicalProject(avg=[/(CAST(CASE(=($1, 0), null:INTEGER, $0)):DOUBLE, $1)], count=[$1])",
           "\n  LogicalAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[$SUM0($1)])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalAggregate(group=[{}], agg#0=[$SUM0($1)], agg#1=[COUNT()])",
           "\n        LogicalProject(col2=[$1], col3=[$2])",
           "\n          LogicalFilter(condition=[AND(>=($2, 0), =($1, 'pink floyd'))])",
@@ -53,7 +53,7 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[CASE(=($1, 0), null:INTEGER, $0)], EXPR$1=[$1])",
           "\n  LogicalAggregate(group=[{}], EXPR$0=[$SUM0($0)], agg#1=[$SUM0($1)])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalAggregate(group=[{}], EXPR$0=[$SUM0($2)], agg#1=[COUNT()])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -66,7 +66,7 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[CASE(=($1, 0), null:INTEGER, $0)], EXPR$1=[$1])",
           "\n  LogicalAggregate(group=[{}], EXPR$0=[$SUM0($0)], agg#1=[$SUM0($1)])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalAggregate(group=[{}], EXPR$0=[$SUM0($1)], agg#1=[COUNT()])",
           "\n        LogicalProject(col2=[$1], col3=[$2])",
           "\n          LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
@@ -81,7 +81,7 @@
           "Execution Plan",
           "\nLogicalProject(sum=[CASE(=($1, 0), null:INTEGER, $0)], count=[$1])",
           "\n  LogicalAggregate(group=[{}], sum=[$SUM0($0)], agg#1=[$SUM0($1)])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalAggregate(group=[{}], sum=[$SUM0($1)], agg#1=[COUNT()])",
           "\n        LogicalProject(col2=[$1], col3=[$2])",
           "\n          LogicalFilter(condition=[AND(>=($2, 0), =($1, 'pink floyd'))])",
@@ -96,7 +96,7 @@
           "Execution Plan",
           "\nLogicalProject(avg=[/(CAST(CASE(=($1, 0), null:INTEGER, $0)):DOUBLE, $1)], count=[$1])",
           "\n  LogicalAggregate(group=[{}], agg#0=[$SUM0($0)], agg#1=[$SUM0($1)])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalAggregate(group=[{}], agg#0=[$SUM0($1)], agg#1=[COUNT()])",
           "\n        LogicalProject(col2=[$1], col3=[$2])",
           "\n          LogicalFilter(condition=[AND(>=($2, 0), =($1, 'pink floyd'))])",
@@ -111,7 +111,7 @@
           "Execution Plan",
           "\nLogicalProject(sum=[CASE(=($1, 0), null:INTEGER, $0)], count=[$1])",
           "\n  LogicalAggregate(group=[{}], sum=[$SUM0($0)], agg#1=[$SUM0($1)])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalAggregate(group=[{}], sum=[$SUM0($1)], agg#1=[COUNT()])",
           "\n        LogicalProject(col2=[$1], col3=[$2])",
           "\n          LogicalFilter(condition=[AND(>=($2, 0), =($1, 'pink floyd'))])",
diff --git a/pinot-query-planner/src/test/resources/queries/GroupByPlans.json b/pinot-query-planner/src/test/resources/queries/GroupByPlans.json
index 8ac6fdf7f0..c05cfba1b5 100644
--- a/pinot-query-planner/src/test/resources/queries/GroupByPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/GroupByPlans.json
@@ -7,7 +7,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
-          "\n  LogicalExchange(distribution=[hash[0]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
           "\n    LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($2)])",
           "\n      LogicalTableScan(table=[[a]])",
           "\n"
@@ -20,7 +20,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], EXPR$1=[$1], EXPR$2=[/(CAST($1):DOUBLE NOT NULL, $2)], EXPR$3=[$3], EXPR$4=[$4])",
           "\n  LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)], agg#1=[$SUM0($2)], EXPR$3=[MAX($3)], EXPR$4=[MIN($4)])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($2)], agg#1=[COUNT()], EXPR$3=[MAX($2)], EXPR$4=[MIN($2)])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -32,7 +32,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
-          "\n  LogicalExchange(distribution=[hash[0]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
           "\n    LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($2)])",
           "\n      LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n        LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
@@ -47,7 +47,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
-          "\n  LogicalExchange(distribution=[hash[0]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
           "\n    LogicalAggregate(group=[{0}], EXPR$1=[COUNT()])",
           "\n      LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n        LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
@@ -62,7 +62,7 @@
           "Execution Plan",
           "\nLogicalProject(col2=[$1], col1=[$0], EXPR$2=[$2])",
           "\n  LogicalAggregate(group=[{0, 1}], EXPR$2=[$SUM0($2)])",
-          "\n    LogicalExchange(distribution=[hash[0, 1]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n      LogicalAggregate(group=[{0, 1}], EXPR$2=[$SUM0($2)])",
           "\n        LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n          LogicalFilter(condition=[AND(>=($2, 0), =($0, 'a'))])",
@@ -78,7 +78,7 @@
           "\nLogicalProject(col1=[$0], EXPR$1=[$1], EXPR$2=[$2])",
           "\n  LogicalFilter(condition=[AND(>($1, 10), >=($3, 0), <($4, 20), <=($2, 10), =(/(CAST($2):DOUBLE NOT NULL, $1), 5))])",
           "\n    LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)], EXPR$2=[$SUM0($2)], agg#2=[MAX($3)], agg#3=[MIN($4)])",
-          "\n      LogicalExchange(distribution=[hash[0]])",
+          "\n      PinotLogicalExchange(distribution=[hash[0]])",
           "\n        LogicalAggregate(group=[{0}], EXPR$1=[COUNT()], EXPR$2=[$SUM0($2)], agg#2=[MAX($2)], agg#3=[MIN($2)])",
           "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n            LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
@@ -94,7 +94,7 @@
           "\nLogicalProject(value1=[$0], count=[$1], SUM=[$2])",
           "\n  LogicalFilter(condition=[AND(>($1, 10), >=($3, 0), <($4, 20), <=($2, 10), =(/(CAST($2):DOUBLE NOT NULL, $1), 5))])",
           "\n    LogicalAggregate(group=[{0}], count=[$SUM0($1)], SUM=[$SUM0($2)], agg#2=[MAX($3)], agg#3=[MIN($4)])",
-          "\n      LogicalExchange(distribution=[hash[0]])",
+          "\n      PinotLogicalExchange(distribution=[hash[0]])",
           "\n        LogicalAggregate(group=[{0}], count=[COUNT()], SUM=[$SUM0($2)], agg#2=[MAX($2)], agg#3=[MIN($2)])",
           "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n            LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
@@ -108,7 +108,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
-          "\n  LogicalExchange(distribution=[hash[0]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
           "\n    LogicalProject(col1=[$0], col3=[$2])",
           "\n      LogicalTableScan(table=[[a]])",
           "\n"
@@ -121,7 +121,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($1):DOUBLE NOT NULL, $2)])",
           "\n  LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[COUNT()])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col1=[$0], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -134,7 +134,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], EXPR$1=[$1], EXPR$2=[/(CAST($1):DOUBLE NOT NULL, $2)], EXPR$3=[$3], EXPR$4=[$4])",
           "\n  LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)], agg#1=[COUNT()], EXPR$3=[MAX($1)], EXPR$4=[MIN($1)])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col1=[$0], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -146,7 +146,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($2)])",
-          "\n  LogicalExchange(distribution=[hash[0]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
           "\n    LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n      LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
           "\n        LogicalTableScan(table=[[a]])",
@@ -159,7 +159,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($2)], EXPR$2=[MAX($2)])",
-          "\n  LogicalExchange(distribution=[hash[0]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
           "\n    LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n      LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
           "\n        LogicalTableScan(table=[[a]])",
@@ -173,7 +173,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalAggregate(group=[{0}], EXPR$1=[COUNT()])",
-          "\n  LogicalExchange(distribution=[hash[0]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
           "\n    LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n      LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
           "\n        LogicalTableScan(table=[[a]])",
@@ -187,7 +187,7 @@
           "Execution Plan",
           "\nLogicalProject(col2=[$1], col1=[$0], EXPR$2=[$2])",
           "\n  LogicalAggregate(group=[{0, 1}], EXPR$2=[$SUM0($2)])",
-          "\n    LogicalExchange(distribution=[hash[0, 1]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n      LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n        LogicalFilter(condition=[AND(>=($2, 0), =($0, 'a'))])",
           "\n          LogicalTableScan(table=[[a]])",
@@ -202,7 +202,7 @@
           "\nLogicalProject(col1=[$0], EXPR$1=[$1], EXPR$2=[$2])",
           "\n  LogicalFilter(condition=[AND(>($1, 10), >=($3, 0), <($4, 20), <=($2, 10), =(/(CAST($2):DOUBLE NOT NULL, $1), 5))])",
           "\n    LogicalAggregate(group=[{0}], EXPR$1=[COUNT()], EXPR$2=[$SUM0($2)], agg#2=[MAX($2)], agg#3=[MIN($2)])",
-          "\n      LogicalExchange(distribution=[hash[0]])",
+          "\n      PinotLogicalExchange(distribution=[hash[0]])",
           "\n        LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n          LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
           "\n            LogicalTableScan(table=[[a]])",
@@ -217,7 +217,7 @@
           "\nLogicalProject(col1=[$0], EXPR$1=[$1])",
           "\n  LogicalFilter(condition=[AND(>=($2, 0), <($3, 20), <=($1, 10), =(/(CAST($1):DOUBLE NOT NULL, $4), 5))])",
           "\n    LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($2)], agg#1=[MAX($2)], agg#2=[MIN($2)], agg#3=[COUNT()])",
-          "\n      LogicalExchange(distribution=[hash[0]])",
+          "\n      PinotLogicalExchange(distribution=[hash[0]])",
           "\n        LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n          LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
           "\n            LogicalTableScan(table=[[a]])",
@@ -232,7 +232,7 @@
           "\nLogicalProject(value1=[$0], count=[$1], SUM=[$2])",
           "\n  LogicalFilter(condition=[AND(>($1, 10), >=($3, 0), <($4, 20), <=($2, 10), =(/(CAST($2):DOUBLE NOT NULL, $1), 5))])",
           "\n    LogicalAggregate(group=[{0}], count=[COUNT()], SUM=[$SUM0($2)], agg#2=[MAX($2)], agg#3=[MIN($2)])",
-          "\n      LogicalExchange(distribution=[hash[0]])",
+          "\n      PinotLogicalExchange(distribution=[hash[0]])",
           "\n        LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n          LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
           "\n            LogicalTableScan(table=[[a]])",
diff --git a/pinot-query-planner/src/test/resources/queries/JoinPlans.json b/pinot-query-planner/src/test/resources/queries/JoinPlans.json
index 2ccebd1668..ea5576e727 100644
--- a/pinot-query-planner/src/test/resources/queries/JoinPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/JoinPlans.json
@@ -11,10 +11,10 @@
           "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalSort(sort0=[$0], dir0=[ASC])",
           "\n        LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
-          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0]])",
           "\n            LogicalProject(col1=[$0], ts=[$4])",
           "\n              LogicalTableScan(table=[[a]])",
-          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0]])",
           "\n            LogicalProject(col2=[$1], col3=[$2])",
           "\n              LogicalTableScan(table=[[b]])",
           "\n"
@@ -30,10 +30,10 @@
           "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalSort(sort0=[$0], dir0=[ASC])",
           "\n        LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
-          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0]])",
           "\n            LogicalProject(col1=[$0], ts=[$4])",
           "\n              LogicalTableScan(table=[[a]])",
-          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0]])",
           "\n            LogicalProject(col2=[$1], col3=[$2])",
           "\n              LogicalTableScan(table=[[b]])",
           "\n"
@@ -45,9 +45,9 @@
         "output": [
           "Execution Plan",
           "\nLogicalJoin(condition=[=($0, $6)], joinType=[inner])",
-          "\n  LogicalExchange(distribution=[hash[0]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
           "\n    LogicalTableScan(table=[[a]])",
-          "\n  LogicalExchange(distribution=[hash[1]])",
+          "\n  PinotLogicalExchange(distribution=[hash[1]])",
           "\n    LogicalTableScan(table=[[b]])",
           "\n"
         ]
@@ -58,10 +58,10 @@
         "output": [
           "Execution Plan",
           "\nLogicalJoin(condition=[=($0, $6)], joinType=[inner])",
-          "\n  LogicalExchange(distribution=[hash[0]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
           "\n    LogicalFilter(condition=[>=($2, 0)])",
           "\n      LogicalTableScan(table=[[a]])",
-          "\n  LogicalExchange(distribution=[hash[1]])",
+          "\n  PinotLogicalExchange(distribution=[hash[1]])",
           "\n    LogicalTableScan(table=[[b]])",
           "\n"
         ]
@@ -72,10 +72,10 @@
         "output": [
           "Execution Plan",
           "\nLogicalJoin(condition=[AND(=($0, $6), >($2, $7))], joinType=[inner])",
-          "\n  LogicalExchange(distribution=[hash[0]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
           "\n    LogicalFilter(condition=[>=($2, 0)])",
           "\n      LogicalTableScan(table=[[a]])",
-          "\n  LogicalExchange(distribution=[hash[1]])",
+          "\n  PinotLogicalExchange(distribution=[hash[1]])",
           "\n    LogicalTableScan(table=[[b]])",
           "\n"
         ]
@@ -86,9 +86,9 @@
         "output": [
           "Execution Plan",
           "\nLogicalJoin(condition=[AND(=($0, $5), =($1, $6))], joinType=[inner])",
-          "\n  LogicalExchange(distribution=[hash[0, 1]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n    LogicalTableScan(table=[[a]])",
-          "\n  LogicalExchange(distribution=[hash[0, 1]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n    LogicalTableScan(table=[[b]])",
           "\n"
         ]
@@ -100,11 +100,11 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], ts=[$1], col3=[$3])",
           "\n  LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col1=[$0], ts=[$4])",
           "\n        LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
           "\n          LogicalTableScan(table=[[a]])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col2=[$1], col3=[$2])",
           "\n        LogicalFilter(condition=[<($2, 0)])",
           "\n          LogicalTableScan(table=[[b]])",
@@ -118,14 +118,14 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($1):DOUBLE NOT NULL, $2)])",
           "\n  LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[$SUM0($2)])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], agg#1=[COUNT()])",
           "\n        LogicalJoin(condition=[=($0, $1)], joinType=[inner])",
-          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0]])",
           "\n            LogicalProject(col1=[$0])",
           "\n              LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
           "\n                LogicalTableScan(table=[[a]])",
-          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0]])",
           "\n            LogicalProject(col2=[$1], col3=[$2])",
           "\n              LogicalFilter(condition=[<($2, 0)])",
           "\n                LogicalTableScan(table=[[b]])",
@@ -139,10 +139,10 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[dateTrunc('DAY', +($2, $5))])",
           "\n  LogicalJoin(condition=[AND(=($0, $3), =($1, $4))], joinType=[inner])",
-          "\n    LogicalExchange(distribution=[hash[0, 1]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n      LogicalProject(col1=[$0], col2=[$1], ts=[$4])",
           "\n        LogicalTableScan(table=[[a]])",
-          "\n    LogicalExchange(distribution=[hash[0, 1]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n      LogicalProject(col1=[$0], col2=[$1], ts=[$4])",
           "\n        LogicalTableScan(table=[[b]])",
           "\n"
@@ -154,14 +154,14 @@
         "output": [
           "Execution Plan",
           "\nLogicalAggregate(group=[{0, 1}])",
-          "\n  LogicalExchange(distribution=[hash[0, 1]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n    LogicalAggregate(group=[{1, 2}])",
           "\n      LogicalJoin(condition=[=($0, $3)], joinType=[inner])",
-          "\n        LogicalExchange(distribution=[hash[0]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
           "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n            LogicalFilter(condition=[>=($2, 0)])",
           "\n              LogicalTableScan(table=[[a]])",
-          "\n        LogicalExchange(distribution=[hash[0]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
           "\n          LogicalProject(col1=[$0])",
           "\n            LogicalTableScan(table=[[b]])",
           "\n"
@@ -173,14 +173,14 @@
         "output": [
           "Execution Plan",
           "\nLogicalAggregate(group=[{0, 1}])",
-          "\n  LogicalExchange(distribution=[hash[0, 1]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n    LogicalAggregate(group=[{1, 2}])",
           "\n      LogicalJoin(condition=[=($0, $3)], joinType=[inner])",
-          "\n        LogicalExchange(distribution=[hash[0]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
           "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n            LogicalFilter(condition=[>=($2, 0)])",
           "\n              LogicalTableScan(table=[[a]])",
-          "\n        LogicalExchange(distribution=[hash[0]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
           "\n          LogicalProject(col1=[$0])",
           "\n            LogicalTableScan(table=[[b]])",
           "\n"
@@ -193,11 +193,11 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], col2=[$2])",
           "\n  LogicalJoin(condition=[=($0, $1)], joinType=[inner])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col1=[$0])",
           "\n        LogicalFilter(condition=[OR(=($1, 'bar'), =($1, 'foo'))])",
           "\n          LogicalTableScan(table=[[a]])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col1=[$0], col2=[$1])",
           "\n        LogicalFilter(condition=[AND(<>($1, 'alice':VARCHAR(7)), <>($1, 'charlie':VARCHAR(7)))])",
           "\n          LogicalTableScan(table=[[b]])",
@@ -211,11 +211,11 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], bvalue2=[$2])",
           "\n  LogicalJoin(condition=[=($0, $1)], joinType=[inner])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col1=[$0])",
           "\n        LogicalFilter(condition=[OR(=($1, 'bar'), =($1, 'foo'))])",
           "\n          LogicalTableScan(table=[[a]])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col1=[$0], col2=[$1])",
           "\n        LogicalFilter(condition=[AND(<>($1, 'alice':VARCHAR(7)), <>($1, 'charlie':VARCHAR(7)))])",
           "\n          LogicalTableScan(table=[[b]])",
@@ -229,10 +229,10 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], col2=[$1])",
           "\n  LogicalJoin(condition=[=($2, $5)], joinType=[semi])",
-          "\n    LogicalExchange(distribution=[hash[2]])",
+          "\n    PinotLogicalExchange(distribution=[hash[2]])",
           "\n      LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
-          "\n    LogicalExchange(distribution=[hash[2]])",
+          "\n    PinotLogicalExchange(distribution=[hash[2]])",
           "\n      LogicalTableScan(table=[[b]])",
           "\n"
         ]
@@ -244,23 +244,23 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], col2=[$1])",
           "\n  LogicalJoin(condition=[=($2, $4)], joinType=[semi])",
-          "\n    LogicalExchange(distribution=[hash[2]])",
+          "\n    PinotLogicalExchange(distribution=[hash[2]])",
           "\n      LogicalJoin(condition=[=($2, $4)], joinType=[semi])",
-          "\n        LogicalExchange(distribution=[hash[2]])",
+          "\n        PinotLogicalExchange(distribution=[hash[2]])",
           "\n          LogicalJoin(condition=[=($2, $4)], joinType=[semi])",
-          "\n            LogicalExchange(distribution=[hash[2]])",
+          "\n            PinotLogicalExchange(distribution=[hash[2]])",
           "\n              LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n                LogicalFilter(condition=[=($1, 'test')])",
           "\n                  LogicalTableScan(table=[[a]])",
-          "\n            LogicalExchange(distribution=[hash[1]])",
+          "\n            PinotLogicalExchange(distribution=[hash[1]])",
           "\n              LogicalProject(col1=[$0], col3=[$2])",
           "\n                LogicalFilter(condition=[=($0, 'foo')])",
           "\n                  LogicalTableScan(table=[[b]])",
-          "\n        LogicalExchange(distribution=[hash[1]])",
+          "\n        PinotLogicalExchange(distribution=[hash[1]])",
           "\n          LogicalProject(col1=[$0], col3=[$2])",
           "\n            LogicalFilter(condition=[=($0, 'bar')])",
           "\n              LogicalTableScan(table=[[b]])",
-          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n    PinotLogicalExchange(distribution=[hash[1]])",
           "\n      LogicalProject(col1=[$0], col3=[$2])",
           "\n        LogicalFilter(condition=[=($0, 'foobar')])",
           "\n          LogicalTableScan(table=[[b]])",
@@ -275,35 +275,35 @@
           "\nLogicalProject(col1=[$0], col2=[$1])",
           "\n  LogicalFilter(condition=[IS NOT TRUE($8)])",
           "\n    LogicalJoin(condition=[=($6, $7)], joinType=[left])",
-          "\n      LogicalExchange(distribution=[hash[6]])",
+          "\n      PinotLogicalExchange(distribution=[hash[6]])",
           "\n        LogicalProject(col1=[$0], col2=[$1], col30=[$3], $f1=[$4], col32=[$5], $f10=[$7], col34=[$2])",
           "\n          LogicalFilter(condition=[IS NOT TRUE($7)])",
           "\n            LogicalJoin(condition=[=($5, $6)], joinType=[left])",
-          "\n              LogicalExchange(distribution=[hash[5]])",
+          "\n              PinotLogicalExchange(distribution=[hash[5]])",
           "\n                LogicalProject(col1=[$0], col2=[$1], col3=[$2], col30=[$3], $f1=[$5], col32=[$2])",
           "\n                  LogicalFilter(condition=[IS NOT TRUE($5)])",
           "\n                    LogicalJoin(condition=[=($3, $4)], joinType=[left])",
-          "\n                      LogicalExchange(distribution=[hash[3]])",
+          "\n                      PinotLogicalExchange(distribution=[hash[3]])",
           "\n                        LogicalProject(col1=[$0], col2=[$1], col3=[$2], col30=[$2])",
           "\n                          LogicalFilter(condition=[=($1, 'test')])",
           "\n                            LogicalTableScan(table=[[a]])",
-          "\n                      LogicalExchange(distribution=[hash[0]])",
+          "\n                      PinotLogicalExchange(distribution=[hash[0]])",
           "\n                        LogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
-          "\n                          LogicalExchange(distribution=[hash[0]])",
+          "\n                          PinotLogicalExchange(distribution=[hash[0]])",
           "\n                            LogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
           "\n                              LogicalProject(col3=[$2], $f1=[true])",
           "\n                                LogicalFilter(condition=[=($0, 'foo')])",
           "\n                                  LogicalTableScan(table=[[b]])",
-          "\n              LogicalExchange(distribution=[hash[0]])",
+          "\n              PinotLogicalExchange(distribution=[hash[0]])",
           "\n                LogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
-          "\n                  LogicalExchange(distribution=[hash[0]])",
+          "\n                  PinotLogicalExchange(distribution=[hash[0]])",
           "\n                    LogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
           "\n                      LogicalProject(col3=[$2], $f1=[true])",
           "\n                        LogicalFilter(condition=[=($0, 'bar')])",
           "\n                          LogicalTableScan(table=[[b]])",
-          "\n      LogicalExchange(distribution=[hash[0]])",
+          "\n      PinotLogicalExchange(distribution=[hash[0]])",
           "\n        LogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
-          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0]])",
           "\n            LogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
           "\n              LogicalProject(col3=[$2], $f1=[true])",
           "\n                LogicalFilter(condition=[=($0, 'foobar')])",
@@ -318,12 +318,12 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0])",
           "\n  LogicalJoin(condition=[AND(=($0, $5), =($1, $6), >($3, $10))], joinType=[inner])",
-          "\n    LogicalExchange(distribution=[hash[0, 1]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n      LogicalTableScan(table=[[a]])",
-          "\n    LogicalExchange(distribution=[hash[0, 1]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n      LogicalProject(col1=[$0], col2=[$1], col10=[CAST($0):VARCHAR], col20=[CAST($1):VARCHAR], $f2=[CAST($2):INTEGER], EXPR$3=[*(0.5:DECIMAL(2, 1), $2)])",
           "\n        LogicalAggregate(group=[{0, 1}], agg#0=[$SUM0($2)])",
-          "\n          LogicalExchange(distribution=[hash[0, 1]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n            LogicalAggregate(group=[{0, 1}], agg#0=[$SUM0($2)])",
           "\n              LogicalTableScan(table=[[b]])",
           "\n"
diff --git a/pinot-query-planner/src/test/resources/queries/OrderByPlans.json b/pinot-query-planner/src/test/resources/queries/OrderByPlans.json
index f0d1883d45..5219ae95e4 100644
--- a/pinot-query-planner/src/test/resources/queries/OrderByPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/OrderByPlans.json
@@ -60,7 +60,7 @@
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
           "\n      LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
-          "\n        LogicalExchange(distribution=[hash[0]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
           "\n          LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($2)])",
           "\n            LogicalTableScan(table=[[a]])",
           "\n"
@@ -75,7 +75,7 @@
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
           "\n      LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
-          "\n        LogicalExchange(distribution=[hash[0]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
           "\n          LogicalProject(col1=[$0], col3=[$2])",
           "\n            LogicalTableScan(table=[[a]])",
           "\n"
@@ -90,7 +90,7 @@
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
           "\n      LogicalAggregate(group=[{0}], sum=[$SUM0($1)])",
-          "\n        LogicalExchange(distribution=[hash[0]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
           "\n          LogicalAggregate(group=[{0}], sum=[$SUM0($2)])",
           "\n            LogicalTableScan(table=[[a]])",
           "\n"
@@ -105,7 +105,7 @@
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
           "\n      LogicalAggregate(group=[{0}], sum=[$SUM0($1)])",
-          "\n        LogicalExchange(distribution=[hash[0]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
           "\n          LogicalProject(col1=[$0], col3=[$2])",
           "\n            LogicalTableScan(table=[[a]])",
           "\n"
diff --git a/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json b/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json
index af10c27bd3..71b261357c 100644
--- a/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json
@@ -7,10 +7,10 @@
         "output": [
           "Execution Plan",
           "\nLogicalJoin(condition=[=($0, $6)], joinType=[inner])",
-          "\n  LogicalExchange(distribution=[single])",
+          "\n  PinotLogicalExchange(distribution=[single])",
           "\n    LogicalFilter(condition=[>=($2, 0)])",
           "\n      LogicalTableScan(table=[[a]])",
-          "\n  LogicalExchange(distribution=[single])",
+          "\n  PinotLogicalExchange(distribution=[single])",
           "\n    LogicalTableScan(table=[[b]])",
           "\n"
         ]
@@ -23,11 +23,11 @@
           "\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($1):DOUBLE NOT NULL, $2)])",
           "\n  LogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], agg#1=[COUNT()])",
           "\n    LogicalJoin(condition=[=($0, $1)], joinType=[inner])",
-          "\n      LogicalExchange(distribution=[single])",
+          "\n      PinotLogicalExchange(distribution=[single])",
           "\n        LogicalProject(col1=[$0])",
           "\n          LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
           "\n            LogicalTableScan(table=[[a]])",
-          "\n      LogicalExchange(distribution=[single])",
+          "\n      PinotLogicalExchange(distribution=[single])",
           "\n        LogicalProject(col2=[$1], col3=[$2])",
           "\n          LogicalFilter(condition=[<($2, 0)])",
           "\n            LogicalTableScan(table=[[b]])",
@@ -39,11 +39,11 @@
         "sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(join_strategy='dynamic_broadcast',is_colocated_by_join_keys='false') */ a.col1, a.col2 FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0)",
         "output": [
           "Execution Plan",
-          "\nLogicalExchange(distribution=[single])",
+          "\nPinotLogicalExchange(distribution=[single])",
           "\n  LogicalJoin(condition=[=($0, $2)], joinType=[semi])",
           "\n    LogicalProject(col1=[$0], col2=[$1])",
           "\n      LogicalTableScan(table=[[a]])",
-          "\n    LogicalExchange(distribution=[broadcast])",
+          "\n    PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
           "\n      LogicalProject(col2=[$1], col3=[$2])",
           "\n        LogicalFilter(condition=[>($2, 0)])",
           "\n          LogicalTableScan(table=[[b]])",
@@ -56,9 +56,9 @@
         "output": [
           "Execution Plan",
           "\nLogicalJoin(condition=[=($0, $5)], joinType=[semi])",
-          "\n  LogicalExchange(distribution=[hash[0]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
           "\n    LogicalTableScan(table=[[a]])",
-          "\n  LogicalExchange(distribution=[hash[0]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
           "\n    LogicalProject(col2=[$1], col3=[$2])",
           "\n      LogicalFilter(condition=[>($2, 0)])",
           "\n        LogicalTableScan(table=[[b]])",
@@ -70,11 +70,11 @@
         "sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', is_colocated_by_join_keys='true') */ a.col1, a.col2 FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0)",
         "output": [
           "Execution Plan",
-          "\nLogicalExchange(distribution=[single])",
+          "\nPinotLogicalExchange(distribution=[single])",
           "\n  LogicalJoin(condition=[=($0, $2)], joinType=[semi])",
           "\n    LogicalProject(col1=[$0], col2=[$1])",
           "\n      LogicalTableScan(table=[[a]])",
-          "\n    LogicalExchange(distribution=[single])",
+          "\n    PinotLogicalExchange(distribution=[single], relExchangeType=[PIPELINE_BREAKER])",
           "\n      LogicalProject(col2=[$1], col3=[$2])",
           "\n        LogicalFilter(condition=[>($2, 0)])",
           "\n          LogicalTableScan(table=[[b]])",
@@ -87,11 +87,11 @@
         "output": [
           "Execution Plan",
           "\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
-          "\n  LogicalExchange(distribution=[single])",
+          "\n  PinotLogicalExchange(distribution=[single])",
           "\n    LogicalJoin(condition=[=($0, $2)], joinType=[semi])",
           "\n      LogicalProject(col1=[$0], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
-          "\n      LogicalExchange(distribution=[broadcast])",
+          "\n      PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
           "\n        LogicalProject(col2=[$1], col3=[$2])",
           "\n          LogicalFilter(condition=[>($2, 0)])",
           "\n            LogicalTableScan(table=[[b]])",
@@ -104,13 +104,13 @@
         "output": [
           "Execution Plan",
           "\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
-          "\n  LogicalExchange(distribution=[hash[0]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
           "\n    LogicalAggregate(group=[{1}], EXPR$1=[$SUM0($2)])",
-          "\n      LogicalExchange(distribution=[single])",
+          "\n      PinotLogicalExchange(distribution=[single])",
           "\n        LogicalJoin(condition=[=($0, $3)], joinType=[semi])",
           "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n            LogicalTableScan(table=[[a]])",
-          "\n          LogicalExchange(distribution=[broadcast])",
+          "\n          PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
           "\n            LogicalProject(col2=[$1], col3=[$2])",
           "\n              LogicalFilter(condition=[>($2, 0)])",
           "\n                LogicalTableScan(table=[[b]])",
@@ -124,7 +124,7 @@
           "Execution Plan",
           "\nLogicalProject(col2=[$1], col1=[$0], EXPR$2=[$2])",
           "\n  LogicalAggregate(group=[{0, 1}], EXPR$2=[$SUM0($2)])",
-          "\n    LogicalExchange(distribution=[hash[0, 1]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n      LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n        LogicalFilter(condition=[AND(>=($2, 0), =($0, 'a'))])",
           "\n          LogicalTableScan(table=[[a]])",
@@ -139,7 +139,7 @@
           "\nLogicalProject(col2=[$0], EXPR$1=[$1], EXPR$2=[$2], EXPR$3=[$3])",
           "\n  LogicalFilter(condition=[AND(>($1, 10), >=($4, 0), <($5, 20), <=($2, 10), =(/(CAST($2):DOUBLE NOT NULL, $1), 5))])",
           "\n    LogicalAggregate(group=[{0}], EXPR$1=[COUNT()], EXPR$2=[$SUM0($1)], EXPR$3=[$SUM0($2)], agg#3=[MAX($1)], agg#4=[MIN($1)])",
-          "\n      LogicalExchange(distribution=[hash[0]])",
+          "\n      PinotLogicalExchange(distribution=[hash[0]])",
           "\n        LogicalProject(col2=[$1], col3=[$2], $f2=[CAST($0):DECIMAL(1000, 500) NOT NULL])",
           "\n          LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
           "\n            LogicalTableScan(table=[[a]])",
diff --git a/pinot-query-planner/src/test/resources/queries/SetOpPlans.json b/pinot-query-planner/src/test/resources/queries/SetOpPlans.json
index c131fecb49..1e0991e96a 100644
--- a/pinot-query-planner/src/test/resources/queries/SetOpPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/SetOpPlans.json
@@ -7,10 +7,10 @@
         "output": [
           "Execution Plan",
           "\nLogicalUnion(all=[true])",
-          "\n  LogicalExchange(distribution=[hash[0, 1]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n    LogicalProject(col1=[$0], col2=[$1])",
           "\n      LogicalTableScan(table=[[a]])",
-          "\n  LogicalExchange(distribution=[hash[0, 1]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n    LogicalProject(col1=[$0], col2=[$1])",
           "\n      LogicalTableScan(table=[[b]])",
           "\n"
@@ -22,15 +22,15 @@
         "output": [
           "Execution Plan",
           "\nLogicalUnion(all=[true])",
-          "\n  LogicalExchange(distribution=[hash[0, 1]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n    LogicalUnion(all=[true])",
-          "\n      LogicalExchange(distribution=[hash[0, 1]])",
+          "\n      PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n        LogicalProject(col1=[$0], col2=[$1])",
           "\n          LogicalTableScan(table=[[a]])",
-          "\n      LogicalExchange(distribution=[hash[0, 1]])",
+          "\n      PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n        LogicalProject(col1=[$0], col2=[$1])",
           "\n          LogicalTableScan(table=[[b]])",
-          "\n  LogicalExchange(distribution=[hash[0, 1]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n    LogicalProject(col1=[$0], col2=[$1])",
           "\n      LogicalTableScan(table=[[c]])",
           "\n"
@@ -42,18 +42,18 @@
         "output": [
           "Execution Plan",
           "\nLogicalAggregate(group=[{0, 1}])",
-          "\n  LogicalExchange(distribution=[hash[0, 1]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n    LogicalAggregate(group=[{0, 1}])",
           "\n      LogicalUnion(all=[true])",
-          "\n        LogicalExchange(distribution=[hash[0, 1]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n          LogicalUnion(all=[true])",
-          "\n            LogicalExchange(distribution=[hash[0, 1]])",
+          "\n            PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n              LogicalProject(col1=[$0], col2=[$1])",
           "\n                LogicalTableScan(table=[[a]])",
-          "\n            LogicalExchange(distribution=[hash[0, 1]])",
+          "\n            PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n              LogicalProject(col1=[$0], col2=[$1])",
           "\n                LogicalTableScan(table=[[b]])",
-          "\n        LogicalExchange(distribution=[hash[0, 1]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n          LogicalProject(col1=[$0], col2=[$1])",
           "\n            LogicalTableScan(table=[[c]])",
           "\n"
@@ -65,15 +65,15 @@
         "output": [
           "Execution Plan",
           "\nLogicalIntersect(all=[false])",
-          "\n  LogicalExchange(distribution=[hash[0, 1]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n    LogicalIntersect(all=[false])",
-          "\n      LogicalExchange(distribution=[hash[0, 1]])",
+          "\n      PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n        LogicalProject(col1=[$0], col2=[$1])",
           "\n          LogicalTableScan(table=[[a]])",
-          "\n      LogicalExchange(distribution=[hash[0, 1]])",
+          "\n      PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n        LogicalProject(col1=[$0], col2=[$1])",
           "\n          LogicalTableScan(table=[[b]])",
-          "\n  LogicalExchange(distribution=[hash[0, 1]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n    LogicalProject(col1=[$0], col2=[$1])",
           "\n      LogicalTableScan(table=[[c]])",
           "\n"
@@ -85,15 +85,15 @@
         "output": [
           "Execution Plan",
           "\nLogicalMinus(all=[false])",
-          "\n  LogicalExchange(distribution=[hash[0, 1]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n    LogicalMinus(all=[false])",
-          "\n      LogicalExchange(distribution=[hash[0, 1]])",
+          "\n      PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n        LogicalProject(col1=[$0], col2=[$1])",
           "\n          LogicalTableScan(table=[[a]])",
-          "\n      LogicalExchange(distribution=[hash[0, 1]])",
+          "\n      PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n        LogicalProject(col1=[$0], col2=[$1])",
           "\n          LogicalTableScan(table=[[b]])",
-          "\n  LogicalExchange(distribution=[hash[0, 1]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n    LogicalProject(col1=[$0], col2=[$1])",
           "\n      LogicalTableScan(table=[[c]])",
           "\n"
diff --git a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
index b93665800e..2f75b4b2ce 100644
--- a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
@@ -8,7 +8,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$1])",
           "\n  LogicalWindow(window#0=[window(aggs [SUM($0)])])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalProject(col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -21,7 +21,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$1])",
           "\n  LogicalWindow(window#0=[window(aggs [COUNT()])])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalProject(winLiteral=[0])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -35,7 +35,7 @@
           "\nLogicalProject(EXPR$0=[42], EXPR$1=[$0])",
           "\n  LogicalProject($0=[$1])",
           "\n    LogicalWindow(window#0=[window(aggs [COUNT()])])",
-          "\n      LogicalExchange(distribution=[hash])",
+          "\n      PinotLogicalExchange(distribution=[hash])",
           "\n        LogicalProject(winLiteral=[0])",
           "\n          LogicalTableScan(table=[[a]])",
           "\n"
@@ -48,7 +48,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$1])",
           "\n  LogicalWindow(window#0=[window( rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalProject(winLiteral=[0])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -61,7 +61,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], $1=[$2])",
           "\n  LogicalWindow(window#0=[window(aggs [SUM($1)])])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalProject(col1=[$0], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -73,7 +73,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalWindow(window#0=[window( rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n  LogicalExchange(distribution=[hash])",
+          "\n  PinotLogicalExchange(distribution=[hash])",
           "\n    LogicalProject(col1=[$0])",
           "\n      LogicalTableScan(table=[[a]])",
           "\n"
@@ -87,7 +87,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], $1=[$2])",
           "\n  LogicalWindow(window#0=[window(aggs [SUM($1)])])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalProject(col1=[$0], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -101,7 +101,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$1])",
           "\n  LogicalWindow(window#0=[window(aggs [SUM($0)])])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalProject(col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -117,7 +117,7 @@
           "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
           "\n      LogicalProject(col1=[$0], EXPR$1=[$3], col2=[$1])",
           "\n        LogicalWindow(window#0=[window(aggs [SUM($2)])])",
-          "\n          LogicalExchange(distribution=[hash])",
+          "\n          PinotLogicalExchange(distribution=[hash])",
           "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -133,7 +133,7 @@
           "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
           "\n      LogicalProject(col1=[$0], EXPR$1=[$2], col2=[$1])",
           "\n        LogicalWindow(window#0=[window( rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n          LogicalExchange(distribution=[hash])",
+          "\n          PinotLogicalExchange(distribution=[hash])",
           "\n            LogicalProject(col1=[$0], col2=[$1])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -149,7 +149,7 @@
           "\n    LogicalSort(fetch=[10])",
           "\n      LogicalProject(col1=[$0], $1=[$2])",
           "\n        LogicalWindow(window#0=[window(aggs [SUM($1)])])",
-          "\n          LogicalExchange(distribution=[hash])",
+          "\n          PinotLogicalExchange(distribution=[hash])",
           "\n            LogicalProject(col1=[$0], col3=[$2])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -165,7 +165,7 @@
           "\n    LogicalSort(sort0=[$2], dir0=[ASC], fetch=[20])",
           "\n      LogicalProject(col1=[$0], EXPR$1=[$3], col2=[$1])",
           "\n        LogicalWindow(window#0=[window(aggs [SUM($2)])])",
-          "\n          LogicalExchange(distribution=[hash])",
+          "\n          PinotLogicalExchange(distribution=[hash])",
           "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -178,7 +178,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$1], $1=[$2])",
           "\n  LogicalWindow(window#0=[window(aggs [SUM($0)])])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalProject(col3=[$2], $1=[CONCAT($0, '-', $1)])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -191,7 +191,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)])",
           "\n  LogicalWindow(window#0=[window(aggs [SUM($1), COUNT($1)])])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalProject(col1=[$0], col3=[$2])",
           "\n        LogicalFilter(condition=[>($2, 10)])",
           "\n          LogicalTableScan(table=[[a]])",
@@ -204,7 +204,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalWindow(window#0=[window( rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n  LogicalExchange(distribution=[hash])",
+          "\n  PinotLogicalExchange(distribution=[hash])",
           "\n    LogicalProject(col1=[$0])",
           "\n      LogicalFilter(condition=[>($2, 10)])",
           "\n        LogicalTableScan(table=[[a]])",
@@ -218,7 +218,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$1], $1=[$2])",
           "\n  LogicalWindow(window#0=[window(aggs [MIN($0)])])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalProject(col3=[$2], $1=[CONCAT($0, '-', $1)])",
           "\n        LogicalFilter(condition=[OR(=($0, 'bar'), =($0, 'foo'))])",
           "\n          LogicalTableScan(table=[[a]])",
@@ -232,9 +232,9 @@
           "Execution Plan",
           "\nLogicalProject($0=[$1])",
           "\n  LogicalWindow(window#0=[window(aggs [MIN($0)])])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalAggregate(group=[{0}])",
-          "\n        LogicalExchange(distribution=[hash[0]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
           "\n          LogicalAggregate(group=[{2}])",
           "\n            LogicalTableScan(table=[[a]])",
           "\n"
@@ -247,9 +247,9 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], $1=[$2])",
           "\n  LogicalWindow(window#0=[window(aggs [MIN($1)])])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalAggregate(group=[{0, 1}])",
-          "\n        LogicalExchange(distribution=[hash[0, 1]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n          LogicalAggregate(group=[{0, 2}])",
           "\n            LogicalTableScan(table=[[a]])",
           "\n"
@@ -262,9 +262,9 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[$1], $1=[$2])",
           "\n  LogicalWindow(window#0=[window(aggs [MIN($0)])])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalAggregate(group=[{0}], EXPR$0=[$SUM0($1)])",
-          "\n        LogicalExchange(distribution=[hash[0]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
           "\n          LogicalAggregate(group=[{2}], EXPR$0=[$SUM0($2)])",
           "\n            LogicalTableScan(table=[[a]])",
           "\n"
@@ -277,10 +277,10 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)])",
           "\n  LogicalWindow(window#0=[window(aggs [SUM($0), COUNT($0)])])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE NOT NULL, $2)])",
           "\n        LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[$SUM0($2)])",
-          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0]])",
           "\n            LogicalAggregate(group=[{2}], agg#0=[$SUM0($2)], agg#1=[COUNT()])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -296,10 +296,10 @@
           "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
           "\n      LogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], col3=[$0])",
           "\n        LogicalWindow(window#0=[window(aggs [SUM($0), COUNT($0)])])",
-          "\n          LogicalExchange(distribution=[hash])",
+          "\n          PinotLogicalExchange(distribution=[hash])",
           "\n            LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE NOT NULL, $2)])",
           "\n              LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[$SUM0($2)])",
-          "\n                LogicalExchange(distribution=[hash[0]])",
+          "\n                PinotLogicalExchange(distribution=[hash[0]])",
           "\n                  LogicalAggregate(group=[{2}], agg#0=[$SUM0($2)], agg#1=[COUNT()])",
           "\n                    LogicalTableScan(table=[[a]])",
           "\n"
@@ -315,10 +315,10 @@
           "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
           "\n      LogicalProject(EXPR$0=[$1], EXPR$1=[$2], col3=[$0])",
           "\n        LogicalWindow(window#0=[window( rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n          LogicalExchange(distribution=[hash])",
+          "\n          PinotLogicalExchange(distribution=[hash])",
           "\n            LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE NOT NULL, $2)])",
           "\n              LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[$SUM0($2)])",
-          "\n                LogicalExchange(distribution=[hash[0]])",
+          "\n                PinotLogicalExchange(distribution=[hash[0]])",
           "\n                  LogicalAggregate(group=[{2}], agg#0=[$SUM0($2)], agg#1=[COUNT()])",
           "\n                    LogicalTableScan(table=[[a]])",
           "\n"
@@ -331,7 +331,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$2], $1=[$3])",
           "\n  LogicalWindow(window#0=[window(aggs [SUM($1), COUNT($0)])])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalProject(col2=[$1], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -344,7 +344,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], $1=[$3], $2=[$4])",
           "\n  LogicalWindow(window#0=[window(aggs [SUM($2), COUNT($1)])])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -358,7 +358,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], $1=[$3], $2=[$4])",
           "\n  LogicalWindow(window#0=[window(aggs [SUM($2), COUNT($1)])])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -372,7 +372,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$1], $1=[$2])",
           "\n  LogicalWindow(window#0=[window(aggs [SUM($0), MIN($0)])])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalProject(col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -388,7 +388,7 @@
           "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
           "\n      LogicalProject(col1=[$0], $1=[$3], $2=[$4])",
           "\n        LogicalWindow(window#0=[window(aggs [SUM($2), COUNT($1)])])",
-          "\n          LogicalExchange(distribution=[hash])",
+          "\n          PinotLogicalExchange(distribution=[hash])",
           "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -404,7 +404,7 @@
           "\n    LogicalSort(fetch=[100])",
           "\n      LogicalProject(col1=[$0], $1=[$3], $2=[$4])",
           "\n        LogicalWindow(window#0=[window(aggs [SUM($2), COUNT($1)])])",
-          "\n          LogicalExchange(distribution=[hash])",
+          "\n          PinotLogicalExchange(distribution=[hash])",
           "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -420,7 +420,7 @@
           "\n    LogicalSort(sort0=[$0], dir0=[ASC], fetch=[10])",
           "\n      LogicalProject(col1=[$0], $1=[$3], $2=[$4])",
           "\n        LogicalWindow(window#0=[window(aggs [SUM($2), COUNT($1)])])",
-          "\n          LogicalExchange(distribution=[hash])",
+          "\n          PinotLogicalExchange(distribution=[hash])",
           "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -433,7 +433,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$1], $1=[$2], $2=[$3])",
           "\n  LogicalWindow(window#0=[window(aggs [SUM($0), MAX($0)])])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalProject(col3=[$2], $1=[CONCAT($0, '-', $1)])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -446,7 +446,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
           "\n  LogicalWindow(window#0=[window(aggs [SUM($1), COUNT($1), COUNT($0)])])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalProject(col1=[$0], col3=[$2])",
           "\n        LogicalFilter(condition=[>($2, 100)])",
           "\n          LogicalTableScan(table=[[a]])",
@@ -460,7 +460,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$1], $1=[$2], $2=[$3])",
           "\n  LogicalWindow(window#0=[window(aggs [MIN($0), MAX($0)])])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalProject(col3=[$2], $1=[LENGTH(CONCAT($0, ' ', $1))])",
           "\n        LogicalFilter(condition=[AND(<>($0, 'bar'), <>($0, 'baz'), <>($0, 'foo'))])",
           "\n          LogicalTableScan(table=[[a]])",
@@ -475,7 +475,7 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[$0], EXPR$1=[$1], EXPR$2=[$1])",
           "\n  LogicalWindow(window#0=[window( rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalProject($0=[LENGTH(CONCAT($0, ' ', $1))])",
           "\n        LogicalFilter(condition=[AND(<>($0, 'bar'), <>($0, 'baz'), <>($0, 'foo'))])",
           "\n          LogicalTableScan(table=[[a]])",
@@ -489,9 +489,9 @@
           "Execution Plan",
           "\nLogicalProject($0=[$1], $1=[$2])",
           "\n  LogicalWindow(window#0=[window(aggs [MIN($0), SUM($0)])])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalAggregate(group=[{0}])",
-          "\n        LogicalExchange(distribution=[hash[0]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
           "\n          LogicalAggregate(group=[{2}])",
           "\n            LogicalTableScan(table=[[a]])",
           "\n"
@@ -504,9 +504,9 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], $1=[$2], $2=[$3])",
           "\n  LogicalWindow(window#0=[window(aggs [MIN($1), COUNT($0)])])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalAggregate(group=[{0, 1}])",
-          "\n        LogicalExchange(distribution=[hash[0, 1]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n          LogicalAggregate(group=[{0, 2}])",
           "\n            LogicalTableScan(table=[[a]])",
           "\n"
@@ -519,9 +519,9 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[$1], $1=[$2], $2=[$3])",
           "\n  LogicalWindow(window#0=[window(aggs [MIN($0), MAX($0)])])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalAggregate(group=[{0}], EXPR$0=[$SUM0($1)])",
-          "\n        LogicalExchange(distribution=[hash[0]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
           "\n          LogicalAggregate(group=[{2}], EXPR$0=[$SUM0($2)])",
           "\n            LogicalTableScan(table=[[a]])",
           "\n"
@@ -534,10 +534,10 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$2])",
           "\n  LogicalWindow(window#0=[window(aggs [SUM($0), COUNT($0)])])",
-          "\n    LogicalExchange(distribution=[hash])",
+          "\n    PinotLogicalExchange(distribution=[hash])",
           "\n      LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE NOT NULL, $2)])",
           "\n        LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[$SUM0($2)])",
-          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0]])",
           "\n            LogicalAggregate(group=[{2}], agg#0=[$SUM0($2)], agg#1=[COUNT()])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -553,10 +553,10 @@
           "\n    LogicalSort(sort0=[$3], dir0=[ASC])",
           "\n      LogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$2], col3=[$0])",
           "\n        LogicalWindow(window#0=[window(aggs [SUM($0), COUNT($0)])])",
-          "\n          LogicalExchange(distribution=[hash])",
+          "\n          PinotLogicalExchange(distribution=[hash])",
           "\n            LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE NOT NULL, $2)])",
           "\n              LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[$SUM0($2)])",
-          "\n                LogicalExchange(distribution=[hash[0]])",
+          "\n                PinotLogicalExchange(distribution=[hash[0]])",
           "\n                  LogicalAggregate(group=[{2}], agg#0=[$SUM0($2)], agg#1=[COUNT()])",
           "\n                    LogicalTableScan(table=[[a]])",
           "\n"
@@ -569,7 +569,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$2])",
           "\n  LogicalWindow(window#0=[window(partition {0} aggs [SUM($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col2=[$1], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -582,7 +582,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$1])",
           "\n  LogicalWindow(window#0=[window(partition {0} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col2=[$1])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -596,7 +596,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$2])",
           "\n  LogicalWindow(window#0=[window(partition {0} aggs [SUM($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col2=[$1], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -609,7 +609,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n  LogicalWindow(window#0=[window(partition {1} aggs [SUM($2), COUNT($2)])])",
-          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n    PinotLogicalExchange(distribution=[hash[1]])",
           "\n      LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -622,7 +622,7 @@
           "Execution Plan",
           "\nLogicalProject(value1=[$0], avg=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n  LogicalWindow(window#0=[window(partition {1} aggs [SUM($2), COUNT($2)])])",
-          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n    PinotLogicalExchange(distribution=[hash[1]])",
           "\n      LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -636,7 +636,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], $1=[$2])",
           "\n  LogicalWindow(window#0=[window(partition {1} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n    PinotLogicalExchange(distribution=[hash[1]])",
           "\n      LogicalProject(col1=[$0], col2=[$1])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -650,7 +650,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$2])",
           "\n  LogicalWindow(window#0=[window(partition {0} aggs [MAX($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col1=[$0], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -666,7 +666,7 @@
           "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
           "\n      LogicalProject(col2=[$1], EXPR$1=[$3], col1=[$0])",
           "\n        LogicalWindow(window#0=[window(partition {0} aggs [MIN($2)])])",
-          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0]])",
           "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -682,7 +682,7 @@
           "\n    LogicalSort(fetch=[10])",
           "\n      LogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n        LogicalWindow(window#0=[window(partition {1} aggs [SUM($2), COUNT($2)])])",
-          "\n          LogicalExchange(distribution=[hash[1]])",
+          "\n          PinotLogicalExchange(distribution=[hash[1]])",
           "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -698,7 +698,7 @@
           "\n    LogicalSort(sort0=[$2], dir0=[ASC], fetch=[10])",
           "\n      LogicalProject(col2=[$1], EXPR$1=[$3], col1=[$0])",
           "\n        LogicalWindow(window#0=[window(partition {0} aggs [MIN($2)])])",
-          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0]])",
           "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -714,7 +714,7 @@
           "\n    LogicalSort(sort0=[$2], dir0=[ASC], fetch=[10])",
           "\n      LogicalProject(col2=[$1], EXPR$1=[$2], col1=[$0])",
           "\n        LogicalWindow(window#0=[window(partition {0} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0]])",
           "\n            LogicalProject(col1=[$0], col2=[$1])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -727,7 +727,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$2], $1=[$3])",
           "\n  LogicalWindow(window#0=[window(partition {1} aggs [COUNT($0)])])",
-          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n    PinotLogicalExchange(distribution=[hash[1]])",
           "\n      LogicalProject(col2=[$1], col3=[$2], $2=[SUBSTR($0, 0, 2)])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -740,7 +740,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$1], $1=[$2])",
           "\n  LogicalWindow(window#0=[window(partition {0} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col3=[$2], $1=[SUBSTR($0, 0, 2)])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -753,7 +753,7 @@
           "Execution Plan",
           "\nLogicalProject(col2=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)])",
           "\n  LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col2=[$1], col3=[$2])",
           "\n        LogicalFilter(condition=[AND(>($2, 10), <=($2, 500))])",
           "\n          LogicalTableScan(table=[[a]])",
@@ -767,7 +767,7 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[$2], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n  LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col2=[$1], col3=[$2], $2=[CONCAT($0, '-', $1)])",
           "\n        LogicalFilter(condition=[OR(AND(<>($0, 'bar'), <>($0, 'foo')), >=($2, 42))])",
           "\n          LogicalTableScan(table=[[a]])",
@@ -781,7 +781,7 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[/(CAST($2):DOUBLE NOT NULL, $3)])",
           "\n  LogicalWindow(window#0=[window(partition {1} aggs [SUM($0), COUNT($0)])])",
-          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n    PinotLogicalExchange(distribution=[hash[1]])",
           "\n      LogicalProject(col3=[$2], $1=[CONCAT($0, '-', $1)])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -794,7 +794,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$1])",
           "\n  LogicalWindow(window#0=[window(partition {0} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject($0=[CONCAT($0, '-', $1)])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -807,9 +807,9 @@
           "Execution Plan",
           "\nLogicalProject($0=[$1])",
           "\n  LogicalWindow(window#0=[window(partition {0} aggs [MIN($0)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalAggregate(group=[{0}])",
-          "\n        LogicalExchange(distribution=[hash[0]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
           "\n          LogicalAggregate(group=[{2}])",
           "\n            LogicalTableScan(table=[[a]])",
           "\n"
@@ -822,9 +822,9 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], $1=[$2])",
           "\n  LogicalWindow(window#0=[window(partition {0} aggs [MIN($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalAggregate(group=[{0, 1}])",
-          "\n        LogicalExchange(distribution=[hash[0, 1]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n          LogicalAggregate(group=[{0, 2}])",
           "\n            LogicalTableScan(table=[[a]])",
           "\n"
@@ -836,10 +836,10 @@
         "output": [
           "Execution Plan",
           "\nLogicalWindow(window#0=[window(partition {0} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n  LogicalExchange(distribution=[hash[0]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
           "\n    LogicalProject(col1=[$0])",
           "\n      LogicalAggregate(group=[{0, 1}])",
-          "\n        LogicalExchange(distribution=[hash[0, 1]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n          LogicalAggregate(group=[{0, 2}])",
           "\n            LogicalTableScan(table=[[a]])",
           "\n"
@@ -852,9 +852,9 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[$1], $1=[$2])",
           "\n  LogicalWindow(window#0=[window(partition {0} aggs [MIN($0)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalAggregate(group=[{0}], EXPR$0=[$SUM0($1)])",
-          "\n        LogicalExchange(distribution=[hash[0]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
           "\n          LogicalAggregate(group=[{2}], EXPR$0=[$SUM0($2)])",
           "\n            LogicalTableScan(table=[[a]])",
           "\n"
@@ -867,10 +867,10 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)])",
           "\n  LogicalWindow(window#0=[window(partition {0} aggs [SUM($0), COUNT($0)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE NOT NULL, $2)])",
           "\n        LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[$SUM0($2)])",
-          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0]])",
           "\n            LogicalAggregate(group=[{2}], agg#0=[$SUM0($2)], agg#1=[COUNT()])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -886,10 +886,10 @@
           "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
           "\n      LogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], col3=[$0])",
           "\n        LogicalWindow(window#0=[window(partition {0} aggs [SUM($0), COUNT($0)])])",
-          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0]])",
           "\n            LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE NOT NULL, $2)])",
           "\n              LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[$SUM0($2)])",
-          "\n                LogicalExchange(distribution=[hash[0]])",
+          "\n                PinotLogicalExchange(distribution=[hash[0]])",
           "\n                  LogicalAggregate(group=[{2}], agg#0=[$SUM0($2)], agg#1=[COUNT()])",
           "\n                    LogicalTableScan(table=[[a]])",
           "\n"
@@ -902,7 +902,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$3], $1=[$4])",
           "\n  LogicalWindow(window#0=[window(partition {0} aggs [MAX($2), COUNT($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -916,7 +916,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$3], $1=[$4])",
           "\n  LogicalWindow(window#0=[window(partition {0} aggs [MAX($2), COUNT($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -929,7 +929,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
           "\n  LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1), MIN($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col1=[$0], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -942,7 +942,7 @@
           "Execution Plan",
           "\nLogicalProject(value1=[$0], avg=[/(CAST($2):DOUBLE NOT NULL, $3)], min=[$4])",
           "\n  LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1), MIN($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col1=[$0], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -956,7 +956,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$2], $1=[$3])",
           "\n  LogicalWindow(window#0=[window(partition {0} aggs [COUNT($1), MIN($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col2=[$1], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -972,7 +972,7 @@
           "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
           "\n      LogicalProject(col1=[$0], $1=[$3], $2=[$4])",
           "\n        LogicalWindow(window#0=[window(partition {0, 1} aggs [SUM($2), MAX($2)])])",
-          "\n          LogicalExchange(distribution=[hash[0, 1]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -988,7 +988,7 @@
           "\n    LogicalSort(fetch=[10])",
           "\n      LogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
           "\n        LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1), MIN($1)])])",
-          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0]])",
           "\n            LogicalProject(col1=[$0], col3=[$2])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -1004,7 +1004,7 @@
           "\n    LogicalSort(sort0=[$0], dir0=[ASC], fetch=[10])",
           "\n      LogicalProject(col1=[$0], $1=[$3], $2=[$4])",
           "\n        LogicalWindow(window#0=[window(partition {0, 1} aggs [SUM($2), MAX($2)])])",
-          "\n          LogicalExchange(distribution=[hash[0, 1]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -1020,7 +1020,7 @@
           "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
           "\n      LogicalProject(col1=[$0], $1=[$3], $2=[$4])",
           "\n        LogicalWindow(window#0=[window(partition {0, 1} aggs [SUM($2), MAX($2)])])",
-          "\n          LogicalExchange(distribution=[hash[0, 1]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -1036,7 +1036,7 @@
           "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
           "\n      LogicalProject(col1=[$0], EXPR$1=[$3], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n        LogicalWindow(window#0=[window(partition {0, 1} aggs [SUM($2), COUNT($2)])])",
-          "\n          LogicalExchange(distribution=[hash[0, 1]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -1053,7 +1053,7 @@
           "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
           "\n      LogicalProject(col1=[$0], $1=[$2], $2=[$3])",
           "\n        LogicalWindow(window#0=[window(partition {0, 1} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER(), ROW_NUMBER()])])",
-          "\n          LogicalExchange(distribution=[hash[0, 1]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n            LogicalProject(col1=[$0], col2=[$1])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -1069,7 +1069,7 @@
           "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
           "\n      LogicalProject(col1=[$0], EXPR$1=[$3], EXPR$2=[/(CAST($4):DOUBLE NOT NULL, $5)])",
           "\n        LogicalWindow(window#0=[window(partition {0, 1} aggs [SUM($2), SUM($2), COUNT($2)])])",
-          "\n          LogicalExchange(distribution=[hash[0, 1]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -1085,7 +1085,7 @@
           "\n    LogicalSort(sort0=[$3], dir0=[ASC])",
           "\n      LogicalProject(col1=[$0], EXPR$1=[$3], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)], col2=[$1])",
           "\n        LogicalWindow(window#0=[window(partition {0, 1} aggs [SUM($2), COUNT($2)])])",
-          "\n          LogicalExchange(distribution=[hash[0, 1]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -1098,7 +1098,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$2], $1=[$3], $2=[$4])",
           "\n  LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), MAX($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col2=[$1], col3=[$2], $2=[REVERSE($0)])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1111,7 +1111,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
           "\n  LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1), COUNT($0)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col1=[$0], col3=[$2])",
           "\n        LogicalFilter(condition=[AND(>($2, 42), OR(=($0, 'chewbacca':VARCHAR(9)), =($0, 'vader':VARCHAR(9)), =($0, 'yoda':VARCHAR(9))))])",
           "\n          LogicalTableScan(table=[[a]])",
@@ -1125,7 +1125,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$2], $1=[$3], $2=[$4])",
           "\n  LogicalWindow(window#0=[window(partition {0} aggs [MIN($1), MAX($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col1=[$0], col3=[$2], $2=[REVERSE(CONCAT($0, ' ', $1))])",
           "\n        LogicalFilter(condition=[AND(<>($1, 'bar'), <>($1, 'baz'), <>($1, 'foo'))])",
           "\n          LogicalTableScan(table=[[a]])",
@@ -1139,7 +1139,7 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[/(CAST($3):DOUBLE NOT NULL, $4)], EXPR$1=[$5])",
           "\n  LogicalWindow(window#0=[window(partition {2} aggs [SUM($1), COUNT($1), COUNT($0)])])",
-          "\n    LogicalExchange(distribution=[hash[2]])",
+          "\n    PinotLogicalExchange(distribution=[hash[2]])",
           "\n      LogicalProject(col1=[$0], col3=[$2], $2=[REVERSE(CONCAT($0, '-', $1))])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1152,9 +1152,9 @@
           "Execution Plan",
           "\nLogicalProject($0=[$1], $1=[$2])",
           "\n  LogicalWindow(window#0=[window(partition {0} aggs [MIN($0), COUNT($0)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalAggregate(group=[{0}])",
-          "\n        LogicalExchange(distribution=[hash[0]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
           "\n          LogicalAggregate(group=[{2}])",
           "\n            LogicalTableScan(table=[[a]])",
           "\n"
@@ -1167,9 +1167,9 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], EXPR$1=[$2], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n  LogicalWindow(window#0=[window(partition {0} aggs [MIN($1), SUM($1), COUNT($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalAggregate(group=[{0, 1}])",
-          "\n        LogicalExchange(distribution=[hash[0, 1]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n          LogicalAggregate(group=[{0, 2}])",
           "\n            LogicalTableScan(table=[[a]])",
           "\n"
@@ -1182,9 +1182,9 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[$1], $1=[$2], $2=[$3])",
           "\n  LogicalWindow(window#0=[window(partition {0} aggs [MIN($0), SUM($0)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalAggregate(group=[{0}], EXPR$0=[$SUM0($1)])",
-          "\n        LogicalExchange(distribution=[hash[0]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
           "\n          LogicalAggregate(group=[{2}], EXPR$0=[$SUM0($2)])",
           "\n            LogicalTableScan(table=[[a]])",
           "\n"
@@ -1197,10 +1197,10 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
           "\n  LogicalWindow(window#0=[window(partition {0} aggs [SUM($0), COUNT($0), MAX($0)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE NOT NULL, $2)])",
           "\n        LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[$SUM0($2)])",
-          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0]])",
           "\n            LogicalAggregate(group=[{2}], agg#0=[$SUM0($2)], agg#1=[COUNT()])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -1216,10 +1216,10 @@
           "\n    LogicalSort(sort0=[$3], dir0=[ASC])",
           "\n      LogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4], col3=[$0])",
           "\n        LogicalWindow(window#0=[window(partition {0} aggs [SUM($0), COUNT($0), MAX($0)])])",
-          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0]])",
           "\n            LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE NOT NULL, $2)])",
           "\n              LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[$SUM0($2)])",
-          "\n                LogicalExchange(distribution=[hash[0]])",
+          "\n                PinotLogicalExchange(distribution=[hash[0]])",
           "\n                  LogicalAggregate(group=[{2}], agg#0=[$SUM0($2)], agg#1=[COUNT()])",
           "\n                    LogicalTableScan(table=[[a]])",
           "\n"
@@ -1700,7 +1700,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$2])",
           "\n  LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col2=[$1], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1714,7 +1714,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$2])",
           "\n  LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col2=[$1], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1728,7 +1728,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$1])",
           "\n  LogicalWindow(window#0=[window(partition {0} order by [0] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col2=[$1])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1741,7 +1741,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n  LogicalWindow(window#0=[window(partition {1} order by [1] aggs [SUM($2), COUNT($2)])])",
-          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n    PinotLogicalExchange(distribution=[hash[1]])",
           "\n      LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1754,7 +1754,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], $1=[$2])",
           "\n  LogicalWindow(window#0=[window(partition {1} order by [1] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n    PinotLogicalExchange(distribution=[hash[1]])",
           "\n      LogicalProject(col1=[$0], col2=[$1])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1767,7 +1767,7 @@
           "Execution Plan",
           "\nLogicalProject(value1=[$0], avg=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n  LogicalWindow(window#0=[window(partition {1} order by [1] aggs [SUM($2), COUNT($2)])])",
-          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n    PinotLogicalExchange(distribution=[hash[1]])",
           "\n      LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1780,7 +1780,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$2])",
           "\n  LogicalWindow(window#0=[window(partition {0} order by [0] aggs [MAX($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col1=[$0], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1796,7 +1796,7 @@
           "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
           "\n      LogicalProject(col2=[$1], EXPR$1=[$3], col1=[$0])",
           "\n        LogicalWindow(window#0=[window(partition {0} order by [0] aggs [MIN($2)])])",
-          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0]])",
           "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -1812,7 +1812,7 @@
           "\n    LogicalSort(fetch=[10])",
           "\n      LogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n        LogicalWindow(window#0=[window(partition {1} order by [1] aggs [SUM($2), COUNT($2)])])",
-          "\n          LogicalExchange(distribution=[hash[1]])",
+          "\n          PinotLogicalExchange(distribution=[hash[1]])",
           "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -1828,7 +1828,7 @@
           "\n    LogicalSort(sort0=[$2], dir0=[ASC], fetch=[10])",
           "\n      LogicalProject(col2=[$1], EXPR$1=[$3], col1=[$0])",
           "\n        LogicalWindow(window#0=[window(partition {0} order by [0] aggs [MIN($2)])])",
-          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0]])",
           "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -1844,7 +1844,7 @@
           "\n    LogicalSort(sort0=[$2], dir0=[ASC], fetch=[10])",
           "\n      LogicalProject(col2=[$1], EXPR$1=[$2], col1=[$0])",
           "\n        LogicalWindow(window#0=[window(partition {0} order by [0] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0]])",
           "\n            LogicalProject(col1=[$0], col2=[$1])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -1857,7 +1857,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$2], $1=[$3])",
           "\n  LogicalWindow(window#0=[window(partition {1} order by [1] aggs [COUNT($0)])])",
-          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n    PinotLogicalExchange(distribution=[hash[1]])",
           "\n      LogicalProject(col2=[$1], col3=[$2], $2=[SUBSTR($0, 0, 2)])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1870,7 +1870,7 @@
           "Execution Plan",
           "\nLogicalProject(col2=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)])",
           "\n  LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1), COUNT($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col2=[$1], col3=[$2])",
           "\n        LogicalFilter(condition=[AND(>($2, 10), <=($2, 500))])",
           "\n          LogicalTableScan(table=[[a]])",
@@ -1883,7 +1883,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalWindow(window#0=[window(partition {0} order by [0] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n  LogicalExchange(distribution=[hash[0]])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
           "\n    LogicalProject(col2=[$1])",
           "\n      LogicalFilter(condition=[AND(>($2, 10), <=($2, 500))])",
           "\n        LogicalTableScan(table=[[a]])",
@@ -1897,7 +1897,7 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[$2], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n  LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1), COUNT($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col2=[$1], col3=[$2], $2=[CONCAT($0, '-', $1)])",
           "\n        LogicalFilter(condition=[OR(AND(<>($0, 'bar'), <>($0, 'foo')), >=($2, 42))])",
           "\n          LogicalTableScan(table=[[a]])",
@@ -1911,7 +1911,7 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[/(CAST($2):DOUBLE NOT NULL, $3)])",
           "\n  LogicalWindow(window#0=[window(partition {1} order by [1] aggs [SUM($0), COUNT($0)])])",
-          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n    PinotLogicalExchange(distribution=[hash[1]])",
           "\n      LogicalProject(col3=[$2], $1=[CONCAT($0, '-', $1)])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1924,7 +1924,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n  LogicalWindow(window#0=[window(partition {1} order by [1 DESC] aggs [SUM($2), COUNT($2)])])",
-          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n    PinotLogicalExchange(distribution=[hash[1]])",
           "\n      LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1937,7 +1937,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], $1=[$2])",
           "\n  LogicalWindow(window#0=[window(partition {1} order by [1 DESC] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n    PinotLogicalExchange(distribution=[hash[1]])",
           "\n      LogicalProject(col1=[$0], col2=[$1])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1950,7 +1950,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n  LogicalWindow(window#0=[window(partition {1} order by [1 ASC-nulls-first] aggs [SUM($2), COUNT($2)])])",
-          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n    PinotLogicalExchange(distribution=[hash[1]])",
           "\n      LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1963,7 +1963,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n  LogicalWindow(window#0=[window(partition {1} order by [1 DESC-nulls-last] aggs [SUM($2), COUNT($2)])])",
-          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n    PinotLogicalExchange(distribution=[hash[1]])",
           "\n      LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1976,7 +1976,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$3], $1=[$4])",
           "\n  LogicalWindow(window#0=[window(partition {0} order by [0] aggs [MAX($2), COUNT($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1990,7 +1990,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$3], $1=[$4])",
           "\n  LogicalWindow(window#0=[window(partition {0} order by [0] aggs [MAX($2), COUNT($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -2003,7 +2003,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
           "\n  LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1), COUNT($1), MIN($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col1=[$0], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -2016,7 +2016,7 @@
           "Execution Plan",
           "\nLogicalProject(value1=[$0], avg=[/(CAST($2):DOUBLE NOT NULL, $3)], min=[$4])",
           "\n  LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1), COUNT($1), MIN($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col1=[$0], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -2029,7 +2029,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$2], $1=[$3])",
           "\n  LogicalWindow(window#0=[window(partition {0} order by [0] aggs [COUNT($1), MIN($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col2=[$1], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -2045,7 +2045,7 @@
           "\n    LogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[ASC])",
           "\n      LogicalProject(col1=[$0], EXPR$1=[$3], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)], col2=[$1])",
           "\n        LogicalWindow(window#0=[window(partition {0, 1} order by [1, 0] aggs [SUM($2), COUNT($2)])])",
-          "\n          LogicalExchange(distribution=[hash[0, 1]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -2061,7 +2061,7 @@
           "\n    LogicalSort(fetch=[10])",
           "\n      LogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
           "\n        LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1), COUNT($1), MIN($1)])])",
-          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0]])",
           "\n            LogicalProject(col1=[$0], col3=[$2])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -2077,7 +2077,7 @@
           "\n    LogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[ASC], fetch=[10])",
           "\n      LogicalProject(col1=[$0], EXPR$1=[$3], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)], col2=[$1])",
           "\n        LogicalWindow(window#0=[window(partition {0, 1} order by [1, 0] aggs [SUM($2), COUNT($2)])])",
-          "\n          LogicalExchange(distribution=[hash[0, 1]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -2090,7 +2090,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$2], $1=[$3], $2=[$4])",
           "\n  LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1), MAX($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col2=[$1], col3=[$2], $2=[REVERSE($0)])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -2103,7 +2103,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
           "\n  LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1), COUNT($1), COUNT($0)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col1=[$0], col3=[$2])",
           "\n        LogicalFilter(condition=[AND(>($2, 42), OR(=($0, 'chewbacca':VARCHAR(9)), =($0, 'vader':VARCHAR(9)), =($0, 'yoda':VARCHAR(9))))])",
           "\n          LogicalTableScan(table=[[a]])",
@@ -2117,7 +2117,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$2], $1=[$3], $2=[$4])",
           "\n  LogicalWindow(window#0=[window(partition {0} order by [0] aggs [MIN($1), MAX($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col1=[$0], col3=[$2], $2=[REVERSE(CONCAT($0, ' ', $1))])",
           "\n        LogicalFilter(condition=[AND(<>($1, 'bar'), <>($1, 'baz'), <>($1, 'foo'))])",
           "\n          LogicalTableScan(table=[[a]])",
@@ -2131,7 +2131,7 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[/(CAST($3):DOUBLE NOT NULL, $4)], EXPR$1=[$5])",
           "\n  LogicalWindow(window#0=[window(partition {2} order by [2] aggs [SUM($1), COUNT($1), COUNT($0)])])",
-          "\n    LogicalExchange(distribution=[hash[2]])",
+          "\n    PinotLogicalExchange(distribution=[hash[2]])",
           "\n      LogicalProject(col1=[$0], col3=[$2], $2=[REVERSE(CONCAT($0, '-', $1))])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -2145,7 +2145,7 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[$1], EXPR$1=[$1])",
           "\n  LogicalWindow(window#0=[window(partition {0} order by [0] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject($0=[REVERSE(CONCAT($0, '-', $1))])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -2158,7 +2158,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
           "\n  LogicalWindow(window#0=[window(partition {0} order by [0 DESC] aggs [SUM($1), COUNT($1), MIN($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col1=[$0], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -2171,7 +2171,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
           "\n  LogicalWindow(window#0=[window(partition {0} order by [0 ASC-nulls-first] aggs [SUM($1), COUNT($1), MIN($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col1=[$0], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -2184,7 +2184,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
           "\n  LogicalWindow(window#0=[window(partition {0} order by [0 DESC-nulls-last] aggs [SUM($1), COUNT($1), MIN($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col1=[$0], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -2615,13 +2615,13 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], col10=[$2], $2=[$3])",
           "\n  LogicalWindow(window#0=[window(partition {0} aggs [SUM($1)])])",
-          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalExchange(distribution=[hash[0]])",
           "\n      LogicalProject(col1=[$0], col3=[$1], col10=[$2])",
           "\n        LogicalJoin(condition=[=($0, $3)], joinType=[inner])",
-          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n          PinotLogicalExchange(distribution=[hash[0]])",
           "\n            LogicalProject(col1=[$0], col3=[$2])",
           "\n              LogicalTableScan(table=[[a]])",
-          "\n          LogicalExchange(distribution=[hash[1]])",
+          "\n          PinotLogicalExchange(distribution=[hash[1]])",
           "\n            LogicalProject(col1=[$0], col2=[$1])",
           "\n              LogicalTableScan(table=[[b]])",
           "\n"
@@ -2636,7 +2636,7 @@
           "\n  LogicalWindow(window#0=[window(order by [2 DESC, 0] aggs [SUM($1), COUNT($1)])])",
           "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[2 DESC, 0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalAggregate(group=[{0, 1}], EXPR$1=[$SUM0($2)])",
-          "\n        LogicalExchange(distribution=[hash[0, 1]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n          LogicalAggregate(group=[{0, 2}], EXPR$1=[COUNT()])",
           "\n            LogicalTableScan(table=[[a]])",
           "\n"
@@ -2651,7 +2651,7 @@
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[1 DESC, 0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalProject(col1=[$0], EXPR$1=[$2])",
           "\n      LogicalAggregate(group=[{0, 1}], EXPR$1=[$SUM0($2)])",
-          "\n        LogicalExchange(distribution=[hash[0, 1]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n          LogicalAggregate(group=[{0, 2}], EXPR$1=[COUNT()])",
           "\n            LogicalTableScan(table=[[a]])",
           "\n"
@@ -2666,7 +2666,7 @@
           "\n  LogicalWindow(window#0=[window(partition {0} order by [2 DESC, 0] aggs [MAX($1)])])",
           "\n    PinotLogicalSortExchange(distribution=[hash[0]], collation=[[2 DESC, 0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalAggregate(group=[{0, 1}], EXPR$1=[$SUM0($2)])",
-          "\n        LogicalExchange(distribution=[hash[0, 1]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
           "\n          LogicalAggregate(group=[{0, 2}], EXPR$1=[COUNT()])",
           "\n            LogicalTableScan(table=[[a]])",
           "\n"
@@ -2711,11 +2711,11 @@
           "\n      PinotLogicalSortExchange(distribution=[hash[0]], collation=[[1 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n        LogicalProject(col2=[$1], col3=[$2])",
           "\n          LogicalJoin(condition=[=($0, $3)], joinType=[inner])",
-          "\n            LogicalExchange(distribution=[hash[0]])",
+          "\n            PinotLogicalExchange(distribution=[hash[0]])",
           "\n              LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n                LogicalFilter(condition=[>($2, 100)])",
           "\n                  LogicalTableScan(table=[[a]])",
-          "\n            LogicalExchange(distribution=[hash[0]])",
+          "\n            PinotLogicalExchange(distribution=[hash[0]])",
           "\n              LogicalProject(col2=[$1])",
           "\n                LogicalFilter(condition=[OR(=($0, 'brandon sanderson':VARCHAR(17)), =($0, 'douglas adams':VARCHAR(17)))])",
           "\n                  LogicalTableScan(table=[[b]])",
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 6fa02a216a..914c6c2ed0 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -220,7 +220,7 @@ public class QueryRunner {
         new LeafStageTransferableBlockOperator(opChainExecutionContext, this::processServerQueryRequest,
             serverQueryRequests, sendNode.getDataSchema());
     MailboxSendOperator mailboxSendOperator =
-        new MailboxSendOperator(opChainExecutionContext, leafStageOperator, sendNode.getExchangeType(),
+        new MailboxSendOperator(opChainExecutionContext, leafStageOperator, sendNode.getDistributionType(),
             sendNode.getPartitionKeySelector(), sendNode.getCollationKeys(), sendNode.getCollationDirections(),
             sendNode.isSortOnSender(), sendNode.getReceiverStageId());
     return new OpChain(opChainExecutionContext, mailboxSendOperator, Collections.emptyList());
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 68fcd82f67..722790d867 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -71,14 +71,14 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
   public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node, PhysicalPlanContext context) {
     if (node.isSortOnReceiver()) {
       SortedMailboxReceiveOperator sortedMailboxReceiveOperator =
-          new SortedMailboxReceiveOperator(context.getOpChainExecutionContext(), node.getExchangeType(),
+          new SortedMailboxReceiveOperator(context.getOpChainExecutionContext(), node.getDistributionType(),
               node.getDataSchema(), node.getCollationKeys(), node.getCollationDirections(),
               node.getCollationNullDirections(), node.isSortOnSender(), node.getSenderStageId());
       context.addReceivingMailboxIds(sortedMailboxReceiveOperator.getMailboxIds());
       return sortedMailboxReceiveOperator;
     } else {
       MailboxReceiveOperator mailboxReceiveOperator =
-          new MailboxReceiveOperator(context.getOpChainExecutionContext(), node.getExchangeType(),
+          new MailboxReceiveOperator(context.getOpChainExecutionContext(), node.getDistributionType(),
               node.getSenderStageId());
       context.addReceivingMailboxIds(mailboxReceiveOperator.getMailboxIds());
       return mailboxReceiveOperator;
@@ -88,7 +88,7 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
   @Override
   public MultiStageOperator visitMailboxSend(MailboxSendNode node, PhysicalPlanContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
-    return new MailboxSendOperator(context.getOpChainExecutionContext(), nextOperator, node.getExchangeType(),
+    return new MailboxSendOperator(context.getOpChainExecutionContext(), nextOperator, node.getDistributionType(),
         node.getPartitionKeySelector(), node.getCollationKeys(), node.getCollationDirections(), node.isSortOnSender(),
         node.getReceiverStageId());
   }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerVisitor.java
index 03ea0b2115..49add00a11 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerVisitor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.runtime.plan.pipeline;
 
+import org.apache.calcite.rel.logical.PinotRelExchangeType;
 import org.apache.pinot.query.planner.plannode.DefaultPostOrderTraversalVisitor;
 import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
 import org.apache.pinot.query.planner.plannode.PlanNode;
@@ -40,9 +41,7 @@ public class PipelineBreakerVisitor extends DefaultPostOrderTraversalVisitor<Voi
   @Override
   public Void visitMailboxReceive(MailboxReceiveNode node, PipelineBreakerContext context) {
     process(node, context);
-    // TODO: actually implement pipeline breaker attribute in PlanNode
-    // currently all mailbox receive node from leaf stage is considered as pipeline breaker node.
-    if (context.isLeafStage()) {
+    if (node.getExchangeType() == PinotRelExchangeType.PIPELINE_BREAKER) {
       context.addPipelineBreaker(node);
     }
     return null;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org