You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/03/16 21:31:15 UTC

[pinot] branch master updated: Add a PinotLogicalSortExchange to replace usage of LogicalSortExchange as it will be sender and receiver aware (#10408)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new bc9aa75585 Add a PinotLogicalSortExchange to replace usage of LogicalSortExchange as it will be sender and receiver aware (#10408)
bc9aa75585 is described below

commit bc9aa755859497aca3fcc0736c1b4db694f44127
Author: Sonam Mandal <so...@linkedin.com>
AuthorDate: Thu Mar 16 14:31:06 2023 -0700

    Add a PinotLogicalSortExchange to replace usage of LogicalSortExchange as it will be sender and receiver aware (#10408)
    
    - MailboxSendOperator will be modified later to add sort support.
    - Modify SortOperator to avoid sorting if the input is already sorted. It should still apply offset + limit
---
 .../rel/logical/PinotLogicalSortExchange.java      | 112 ++++++
 .../rel/rules/PinotSortExchangeCopyRule.java       |   4 +-
 .../rel/rules/PinotSortExchangeNodeInsertRule.java |  11 +-
 .../rules/PinotWindowExchangeNodeInsertRule.java   |  17 +-
 .../pinot/query/planner/logical/StagePlanner.java  |  34 +-
 .../query/planner/stage/MailboxReceiveNode.java    |  46 +++
 .../pinot/query/planner/stage/MailboxSendNode.java |  42 ++-
 .../rel/rules/PinotSortExchangeCopyRuleTest.java   |  89 ++++-
 .../test/resources/queries/BasicQueryPlans.json    |   4 +-
 .../src/test/resources/queries/JoinPlans.json      |   4 +-
 .../src/test/resources/queries/OrderByPlans.json   |  16 +-
 .../resources/queries/WindowFunctionPlans.json     | 176 ++++-----
 .../apache/pinot/query/runtime/QueryRunner.java    |   3 +-
 .../runtime/operator/MailboxReceiveOperator.java   |  77 +++-
 .../runtime/operator/MailboxSendOperator.java      |  20 +-
 .../pinot/query/runtime/operator/SortOperator.java |  57 +--
 .../query/runtime/operator/utils/SortUtils.java    |  73 ++++
 .../query/runtime/plan/PhysicalPlanVisitor.java    |  10 +-
 .../query/service/dispatch/QueryDispatcher.java    |   4 +-
 .../operator/MailboxReceiveOperatorTest.java       | 395 +++++++++++++++++++--
 .../runtime/operator/MailboxSendOperatorTest.java  |  10 +-
 .../query/runtime/operator/SortOperatorTest.java   | 153 +++++++-
 22 files changed, 1124 insertions(+), 233 deletions(-)

diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotLogicalSortExchange.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotLogicalSortExchange.java
new file mode 100644
index 0000000000..e019a680c1
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotLogicalSortExchange.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.logical;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelDistributionTraitDef;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.SortExchange;
+
+
+/**
+ * Pinot's implementation of {@code SortExchange} which needs information about whether to sort on the sender
+ * and/or receiver side of the exchange. Every {@code Exchange} is broken into a send and a receive node and the
+ * decision on where to sort is made by the planner and this information has to b passed onto the send and receive
+ * nodes for the correct execution.
+ *
+ * Note: This class does not extend {@code LogicalSortExchange} because its constructor which takes the list of
+ * parameters is private.
+ */
+public class PinotLogicalSortExchange extends SortExchange {
+
+  protected final boolean _isSortOnSender;
+  protected final boolean _isSortOnReceiver;
+
+  private PinotLogicalSortExchange(RelOptCluster cluster, RelTraitSet traitSet,
+      RelNode input, RelDistribution distribution, RelCollation collation,
+      boolean isSortOnSender, boolean isSortOnReceiver) {
+    super(cluster, traitSet, input, distribution, collation);
+    _isSortOnSender = isSortOnSender;
+    _isSortOnReceiver = isSortOnReceiver;
+  }
+
+  /**
+   * Creates a PinotLogicalSortExchange by parsing serialized output.
+   */
+  public PinotLogicalSortExchange(RelInput input) {
+    super(input);
+    _isSortOnSender = false;
+    _isSortOnReceiver = true;
+  }
+
+  /**
+   * Creates a PinotLogicalSortExchange.
+   *
+   * @param input     Input relational expression
+   * @param distribution Distribution specification
+   * @param collation array of sort specifications
+   * @param isSortOnSender whether to sort on the sender
+   * @param isSortOnReceiver whether to sort on receiver
+   */
+  public static PinotLogicalSortExchange create(
+      RelNode input,
+      RelDistribution distribution,
+      RelCollation collation,
+      boolean isSortOnSender,
+      boolean isSortOnReceiver) {
+    RelOptCluster cluster = input.getCluster();
+    collation = RelCollationTraitDef.INSTANCE.canonize(collation);
+    distribution = RelDistributionTraitDef.INSTANCE.canonize(distribution);
+    RelTraitSet traitSet =
+        input.getTraitSet().replace(Convention.NONE).replace(distribution).replace(collation);
+    return new PinotLogicalSortExchange(cluster, traitSet, input, distribution,
+        collation, isSortOnSender, isSortOnReceiver);
+  }
+
+  //~ Methods ----------------------------------------------------------------
+
+  @Override
+  public SortExchange copy(RelTraitSet traitSet, RelNode newInput,
+      RelDistribution newDistribution, RelCollation newCollation) {
+    return new PinotLogicalSortExchange(this.getCluster(), traitSet, newInput,
+        newDistribution, newCollation, _isSortOnSender, _isSortOnReceiver);
+  }
+
+  @Override
+  public RelWriter explainTerms(RelWriter pw) {
+    return super.explainTerms(pw)
+        .item("isSortOnSender", _isSortOnSender)
+        .item("isSortOnReceiver", _isSortOnReceiver);
+  }
+
+  public boolean isSortOnSender() {
+    return _isSortOnSender;
+  }
+
+  public boolean isSortOnReceiver() {
+    return _isSortOnReceiver;
+  }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRule.java
index 3a15679fd7..4f93f78efd 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRule.java
@@ -26,7 +26,7 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.SortExchange;
 import org.apache.calcite.rel.logical.LogicalSort;
-import org.apache.calcite.rel.logical.LogicalSortExchange;
+import org.apache.calcite.rel.logical.PinotLogicalSortExchange;
 import org.apache.calcite.rel.metadata.RelMdUtil;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexBuilder;
@@ -99,7 +99,7 @@ public class PinotSortExchangeCopyRule extends RelRule<RelRule.Config> {
   public interface Config extends RelRule.Config {
 
     Config DEFAULT = ImmutableSortExchangeCopyRule.Config.of()
-        .withOperandFor(LogicalSort.class, LogicalSortExchange.class);
+        .withOperandFor(LogicalSort.class, PinotLogicalSortExchange.class);
 
     @Override default PinotSortExchangeCopyRule toRule() {
       return new PinotSortExchangeCopyRule(this);
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
index f03b78dc3d..28a5018a08 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
@@ -24,7 +24,7 @@ import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rel.RelDistributions;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.logical.LogicalSort;
-import org.apache.calcite.rel.logical.LogicalSortExchange;
+import org.apache.calcite.rel.logical.PinotLogicalSortExchange;
 import org.apache.calcite.tools.RelBuilderFactory;
 
 
@@ -62,10 +62,15 @@ public class PinotSortExchangeNodeInsertRule extends RelOptRule {
   @Override
   public void onMatch(RelOptRuleCall call) {
     Sort sort = call.rel(0);
-    LogicalSortExchange exchange = LogicalSortExchange.create(
+    // TODO: Assess whether sorting is needed on both sender and receiver side or only receiver side. Potentially add
+    //       SqlHint support to determine this. For now setting sort only on receiver side as sender side sorting is
+    //       not yet implemented.
+    PinotLogicalSortExchange exchange = PinotLogicalSortExchange.create(
         sort.getInput(),
         RelDistributions.hash(Collections.emptyList()),
-        sort.getCollation());
+        sort.getCollation(),
+        false,
+        true);
     call.transformTo(LogicalSort.create(exchange, sort.getCollation(), sort.offset, sort.fetch));
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java
index ebae6c0df9..e9283db8f0 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java
@@ -29,8 +29,8 @@ import org.apache.calcite.rel.RelDistributions;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Window;
 import org.apache.calcite.rel.logical.LogicalExchange;
-import org.apache.calcite.rel.logical.LogicalSortExchange;
 import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.logical.PinotLogicalSortExchange;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.tools.RelBuilderFactory;
 
@@ -86,8 +86,11 @@ public class PinotWindowExchangeNodeInsertRule extends RelOptRule {
     } else if (windowGroup.keys.isEmpty() && !windowGroup.orderKeys.getKeys().isEmpty()) {
       // Only ORDER BY
       // Add a LogicalSortExchange with collation on the order by key(s) and an empty hash partition key
-      LogicalSortExchange sortExchange = LogicalSortExchange.create(windowInput,
-          RelDistributions.hash(Collections.emptyList()), windowGroup.orderKeys);
+      // TODO: ORDER BY only type queries need to be sorted on both sender and receiver side for better performance.
+      //       Sorted input data can use a k-way merge instead of a PriorityQueue for sorting. For now support to
+      //       sort on the sender side is not available thus setting this up to only sort on the receiver.
+      PinotLogicalSortExchange sortExchange = PinotLogicalSortExchange.create(windowInput,
+          RelDistributions.hash(Collections.emptyList()), windowGroup.orderKeys, false, true);
       call.transformTo(LogicalWindow.create(window.getTraitSet(), sortExchange, window.constants, window.getRowType(),
           window.groups));
     } else {
@@ -106,8 +109,12 @@ public class PinotWindowExchangeNodeInsertRule extends RelOptRule {
       } else {
         // PARTITION BY and ORDER BY on different key(s)
         // Add a LogicalSortExchange hashed on the partition by keys and collation based on order by keys
-        LogicalSortExchange sortExchange = LogicalSortExchange.create(windowInput,
-            RelDistributions.hash(windowGroup.keys.toList()), windowGroup.orderKeys);
+        // TODO: ORDER BY only type queries need to be sorted only on the receiver side unless a hint is set indicating
+        //       that the data is already partitioned and sorting can be done on the sender side instead. This way
+        //       sorting on the receiver side can be a no-op. Add support for this hint and pass it on. Until sender
+        //       side sorting is implemented, setting this hint will throw an error on execution.
+        PinotLogicalSortExchange sortExchange = PinotLogicalSortExchange.create(windowInput,
+            RelDistributions.hash(windowGroup.keys.toList()), windowGroup.orderKeys, false, true);
         call.transformTo(LogicalWindow.create(window.getTraitSet(), sortExchange, window.constants, window.getRowType(),
             window.groups));
       }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index 1081459dd7..f8859e668f 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -20,10 +20,13 @@ package org.apache.pinot.query.planner.logical;
 
 import java.util.List;
 import java.util.Map;
+import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.SortExchange;
+import org.apache.calcite.rel.logical.PinotLogicalSortExchange;
 import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.query.context.PlannerContext;
 import org.apache.pinot.query.planner.QueryPlan;
@@ -74,12 +77,12 @@ public class StagePlanner {
     // global root needs to send results back to the ROOT, a.k.a. the client response node. the last stage only has one
     // receiver so doesn't matter what the exchange type is. setting it to SINGLETON by default.
     StageNode globalSenderNode = new MailboxSendNode(globalStageRoot.getStageId(), globalStageRoot.getDataSchema(),
-        0, RelDistribution.Type.RANDOM_DISTRIBUTED, null);
+        0, RelDistribution.Type.RANDOM_DISTRIBUTED, null, null, false);
     globalSenderNode.addInput(globalStageRoot);
 
     StageNode globalReceiverNode =
         new MailboxReceiveNode(0, globalStageRoot.getDataSchema(), globalStageRoot.getStageId(),
-            RelDistribution.Type.RANDOM_DISTRIBUTED, null, globalSenderNode);
+            RelDistribution.Type.RANDOM_DISTRIBUTED, null, null, false, false, globalSenderNode);
 
     QueryPlan queryPlan = StageMetadataVisitor.attachMetadata(relRoot.fields, globalReceiverNode);
 
@@ -100,7 +103,19 @@ public class StagePlanner {
     if (isExchangeNode(node)) {
       StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId());
       RelDistribution distribution = ((Exchange) node).getDistribution();
-      return createSendReceivePair(nextStageRoot, distribution, currentStageId);
+      RelCollation collation = null;
+      boolean isSortOnSender = false;
+      boolean isSortOnReceiver = false;
+      if (isSortExchangeNode(node)) {
+        collation = ((SortExchange) node).getCollation();
+        if (node instanceof PinotLogicalSortExchange) {
+          // These flags only take meaning if the collation is not null or empty
+          isSortOnSender = ((PinotLogicalSortExchange) node).isSortOnSender();
+          isSortOnReceiver = ((PinotLogicalSortExchange) node).isSortOnReceiver();
+        }
+      }
+      return createSendReceivePair(nextStageRoot, distribution, collation, isSortOnSender, isSortOnReceiver,
+          currentStageId);
     } else {
       StageNode stageNode = RelToStageConverter.toStageNode(node, currentStageId);
       List<RelNode> inputs = node.getInputs();
@@ -118,7 +133,8 @@ public class StagePlanner {
     }
   }
 
-  private StageNode createSendReceivePair(StageNode nextStageRoot, RelDistribution distribution, int currentStageId) {
+  private StageNode createSendReceivePair(StageNode nextStageRoot, RelDistribution distribution, RelCollation collation,
+      boolean isSortOnSender, boolean isSortOnReceiver, int currentStageId) {
     List<Integer> distributionKeys = distribution.getKeys();
     RelDistribution.Type exchangeType = distribution.getType();
 
@@ -129,9 +145,11 @@ public class StagePlanner {
         ? new FieldSelectionKeySelector(distributionKeys) : null;
 
     StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), nextStageRoot.getDataSchema(),
-        currentStageId, exchangeType, keySelector);
+        currentStageId, exchangeType, keySelector, collation == null ? null : collation.getFieldCollations(),
+        isSortOnSender);
     StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, nextStageRoot.getDataSchema(),
-        nextStageRoot.getStageId(), exchangeType, keySelector, mailboxSender);
+        nextStageRoot.getStageId(), exchangeType, keySelector,
+        collation == null ? null : collation.getFieldCollations(), isSortOnSender, isSortOnReceiver, mailboxSender);
     mailboxSender.addInput(nextStageRoot);
 
     return mailboxReceiver;
@@ -141,6 +159,10 @@ public class StagePlanner {
     return (node instanceof Exchange);
   }
 
+  private boolean isSortExchangeNode(RelNode node) {
+    return (node instanceof SortExchange);
+  }
+
   private int getNewStageId() {
     return _stageIdCounter++;
   }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
index cf90a0005c..97a019741f 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
@@ -18,9 +18,16 @@
  */
 package org.apache.pinot.query.planner.stage;
 
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.planner.serde.ProtoProperties;
 
@@ -32,6 +39,14 @@ public class MailboxReceiveNode extends AbstractStageNode {
   private RelDistribution.Type _exchangeType;
   @ProtoProperties
   private KeySelector<Object[], Object[]> _partitionKeySelector;
+  @ProtoProperties
+  private List<RexExpression> _collationKeys;
+  @ProtoProperties
+  private List<RelFieldCollation.Direction> _collationDirections;
+  @ProtoProperties
+  private boolean _isSortOnSender;
+  @ProtoProperties
+  private boolean _isSortOnReceiver;
 
   // this is only available during planning and should not be relied
   // on in any post-serialization code
@@ -43,11 +58,26 @@ public class MailboxReceiveNode extends AbstractStageNode {
 
   public MailboxReceiveNode(int stageId, DataSchema dataSchema, int senderStageId,
       RelDistribution.Type exchangeType, @Nullable KeySelector<Object[], Object[]> partitionKeySelector,
+      @Nullable List<RelFieldCollation> fieldCollations, boolean isSortOnSender, boolean isSortOnReceiver,
       StageNode sender) {
     super(stageId, dataSchema);
     _senderStageId = senderStageId;
     _exchangeType = exchangeType;
     _partitionKeySelector = partitionKeySelector;
+    if (!CollectionUtils.isEmpty(fieldCollations)) {
+      _collationKeys = new ArrayList<>(fieldCollations.size());
+      _collationDirections = new ArrayList<>(fieldCollations.size());
+      for (RelFieldCollation fieldCollation : fieldCollations) {
+        _collationDirections.add(fieldCollation.getDirection());
+        _collationKeys.add(new RexExpression.InputRef(fieldCollation.getFieldIndex()));
+      }
+    } else {
+      _collationKeys = Collections.emptyList();
+      _collationDirections = Collections.emptyList();
+    }
+    _isSortOnSender = isSortOnSender;
+    Preconditions.checkState(!isSortOnSender, "Input shouldn't be sorted as ordering on send is not yet implemented!");
+    _isSortOnReceiver = isSortOnReceiver;
     _sender = sender;
   }
 
@@ -67,6 +97,22 @@ public class MailboxReceiveNode extends AbstractStageNode {
     return _partitionKeySelector;
   }
 
+  public List<RexExpression> getCollationKeys() {
+    return _collationKeys;
+  }
+
+  public List<RelFieldCollation.Direction> getCollationDirections() {
+    return _collationDirections;
+  }
+
+  public boolean isSortOnSender() {
+    return _isSortOnSender;
+  }
+
+  public boolean isSortOnReceiver() {
+    return _isSortOnReceiver;
+  }
+
   public StageNode getSender() {
     return _sender;
   }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
index 4219590100..c98b82907a 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
@@ -18,9 +18,16 @@
  */
 package org.apache.pinot.query.planner.stage;
 
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.planner.serde.ProtoProperties;
 
@@ -32,17 +39,38 @@ public class MailboxSendNode extends AbstractStageNode {
   private RelDistribution.Type _exchangeType;
   @ProtoProperties
   private KeySelector<Object[], Object[]> _partitionKeySelector;
+  @ProtoProperties
+  private List<RexExpression> _collationKeys;
+  @ProtoProperties
+  private List<RelFieldCollation.Direction> _collationDirections;
+  @ProtoProperties
+  private boolean _isSortOnSender;
 
   public MailboxSendNode(int stageId) {
     super(stageId);
   }
 
   public MailboxSendNode(int stageId, DataSchema dataSchema, int receiverStageId,
-      RelDistribution.Type exchangeType, @Nullable KeySelector<Object[], Object[]> partitionKeySelector) {
+      RelDistribution.Type exchangeType, @Nullable KeySelector<Object[], Object[]> partitionKeySelector,
+      @Nullable List<RelFieldCollation> fieldCollations, boolean isSortOnSender) {
     super(stageId, dataSchema);
     _receiverStageId = receiverStageId;
     _exchangeType = exchangeType;
     _partitionKeySelector = partitionKeySelector;
+    // TODO: Support ordering here if the 'fieldCollations' aren't empty and 'sortOnSender' is true
+    Preconditions.checkState(!isSortOnSender, "Ordering is not yet supported on Mailbox Send");
+    if (!CollectionUtils.isEmpty(fieldCollations) && isSortOnSender) {
+      _collationKeys = new ArrayList<>(fieldCollations.size());
+      _collationDirections = new ArrayList<>(fieldCollations.size());
+      for (RelFieldCollation fieldCollation : fieldCollations) {
+        _collationDirections.add(fieldCollation.getDirection());
+        _collationKeys.add(new RexExpression.InputRef(fieldCollation.getFieldIndex()));
+      }
+    } else {
+      _collationKeys = Collections.emptyList();
+      _collationDirections = Collections.emptyList();
+    }
+    _isSortOnSender = isSortOnSender;
   }
 
   public int getReceiverStageId() {
@@ -61,6 +89,18 @@ public class MailboxSendNode extends AbstractStageNode {
     return _partitionKeySelector;
   }
 
+  public List<RexExpression> getCollationKeys() {
+    return _collationKeys;
+  }
+
+  public List<RelFieldCollation.Direction> getCollationDirections() {
+    return _collationDirections;
+  }
+
+  public boolean isSortOnSender() {
+    return _isSortOnSender;
+  }
+
   @Override
   public String explain() {
     return "MAIL_SEND(" + _exchangeType + ")";
diff --git a/pinot-query-planner/src/test/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRuleTest.java b/pinot-query-planner/src/test/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRuleTest.java
index b92fd79352..443998833c 100644
--- a/pinot-query-planner/src/test/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRuleTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRuleTest.java
@@ -29,7 +29,7 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.SortExchange;
 import org.apache.calcite.rel.logical.LogicalSort;
-import org.apache.calcite.rel.logical.LogicalSortExchange;
+import org.apache.calcite.rel.logical.PinotLogicalSortExchange;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
@@ -81,7 +81,8 @@ public class PinotSortExchangeCopyRuleTest {
   @Test
   public void shouldMatchLimitNoOffsetNoSort() {
     // Given:
-    SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, RelCollations.EMPTY);
+    SortExchange exchange = PinotLogicalSortExchange.create(_input, RelDistributions.SINGLETON, RelCollations.EMPTY,
+        false, false);
     Sort sort = LogicalSort.create(exchange, RelCollations.EMPTY, null, literal(1));
     Mockito.when(_call.rel(0)).thenReturn(sort);
     Mockito.when(_call.rel(1)).thenReturn(exchange);
@@ -95,7 +96,7 @@ public class PinotSortExchangeCopyRuleTest {
 
     RelNode sortCopy = sortCopyCapture.getValue();
     Assert.assertTrue(sortCopy instanceof LogicalSort);
-    Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof LogicalSortExchange);
+    Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof PinotLogicalSortExchange);
     Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0) instanceof LogicalSort);
 
     LogicalSort innerSort = (LogicalSort) ((LogicalSort) sortCopy).getInput().getInput(0);
@@ -104,11 +105,68 @@ public class PinotSortExchangeCopyRuleTest {
     Assert.assertEquals((innerSort).fetch, literal(1));
   }
 
+  @Test
+  public void shouldMatchLimitNoOffsetYesSortNoSortEnabled() {
+    // Given:
+    RelCollation collation = RelCollations.of(1);
+    SortExchange exchange = PinotLogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation,
+        false, false);
+    Sort sort = LogicalSort.create(exchange, collation, null, literal(1));
+    Mockito.when(_call.rel(0)).thenReturn(sort);
+    Mockito.when(_call.rel(1)).thenReturn(exchange);
+
+    // When:
+    PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.onMatch(_call);
+
+    // Then:
+    ArgumentCaptor<RelNode> sortCopyCapture = ArgumentCaptor.forClass(LogicalSort.class);
+    Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture(), Mockito.anyMap());
+
+    RelNode sortCopy = sortCopyCapture.getValue();
+    Assert.assertTrue(sortCopy instanceof LogicalSort);
+    Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof PinotLogicalSortExchange);
+    Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0) instanceof LogicalSort);
+
+    LogicalSort innerSort = (LogicalSort) ((LogicalSort) sortCopy).getInput().getInput(0);
+    Assert.assertEquals(innerSort.getCollation().getKeys().size(), 1);
+    Assert.assertNull((innerSort).offset);
+    Assert.assertEquals((innerSort).fetch, literal(1));
+  }
+
+  @Test
+  public void shouldMatchLimitNoOffsetYesSortOnSender() {
+    // Given:
+    RelCollation collation = RelCollations.of(1);
+    SortExchange exchange = PinotLogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation,
+        true, false);
+    Sort sort = LogicalSort.create(exchange, collation, null, literal(1));
+    Mockito.when(_call.rel(0)).thenReturn(sort);
+    Mockito.when(_call.rel(1)).thenReturn(exchange);
+
+    // When:
+    PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.onMatch(_call);
+
+    // Then:
+    ArgumentCaptor<RelNode> sortCopyCapture = ArgumentCaptor.forClass(LogicalSort.class);
+    Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture(), Mockito.anyMap());
+
+    RelNode sortCopy = sortCopyCapture.getValue();
+    Assert.assertTrue(sortCopy instanceof LogicalSort);
+    Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof PinotLogicalSortExchange);
+    Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0) instanceof LogicalSort);
+
+    LogicalSort innerSort = (LogicalSort) ((LogicalSort) sortCopy).getInput().getInput(0);
+    Assert.assertEquals(innerSort.getCollation().getKeys().size(), 1);
+    Assert.assertNull((innerSort).offset);
+    Assert.assertEquals((innerSort).fetch, literal(1));
+  }
+
   @Test
   public void shouldMatchLimitNoOffsetYesSort() {
     // Given:
     RelCollation collation = RelCollations.of(1);
-    SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation);
+    SortExchange exchange = PinotLogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation, false,
+        true);
     Sort sort = LogicalSort.create(exchange, collation, null, literal(1));
     Mockito.when(_call.rel(0)).thenReturn(sort);
     Mockito.when(_call.rel(1)).thenReturn(exchange);
@@ -122,7 +180,7 @@ public class PinotSortExchangeCopyRuleTest {
 
     RelNode sortCopy = sortCopyCapture.getValue();
     Assert.assertTrue(sortCopy instanceof LogicalSort);
-    Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof LogicalSortExchange);
+    Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof PinotLogicalSortExchange);
     Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0) instanceof LogicalSort);
 
     LogicalSort innerSort = (LogicalSort) ((LogicalSort) sortCopy).getInput().getInput(0);
@@ -134,7 +192,8 @@ public class PinotSortExchangeCopyRuleTest {
   @Test
   public void shouldMatchNoSortAndPushDownLimitPlusOffset() {
     // Given:
-    SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, RelCollations.EMPTY);
+    SortExchange exchange = PinotLogicalSortExchange.create(_input, RelDistributions.SINGLETON, RelCollations.EMPTY,
+        false, true);
     Sort sort = LogicalSort.create(exchange, RelCollations.EMPTY, literal(2), literal(1));
     Mockito.when(_call.rel(0)).thenReturn(sort);
     Mockito.when(_call.rel(1)).thenReturn(exchange);
@@ -148,7 +207,7 @@ public class PinotSortExchangeCopyRuleTest {
 
     RelNode sortCopy = sortCopyCapture.getValue();
     Assert.assertTrue(sortCopy instanceof LogicalSort);
-    Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof LogicalSortExchange);
+    Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof PinotLogicalSortExchange);
     Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0) instanceof LogicalSort);
 
     LogicalSort innerSort = (LogicalSort) ((LogicalSort) sortCopy).getInput().getInput(0);
@@ -161,7 +220,8 @@ public class PinotSortExchangeCopyRuleTest {
   public void shouldMatchSortOnly() {
     // Given:
     RelCollation collation = RelCollations.of(1);
-    SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation);
+    SortExchange exchange = PinotLogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation, false,
+        true);
     Sort sort = LogicalSort.create(exchange, collation, null, null);
     Mockito.when(_call.rel(0)).thenReturn(sort);
     Mockito.when(_call.rel(1)).thenReturn(exchange);
@@ -175,7 +235,7 @@ public class PinotSortExchangeCopyRuleTest {
 
     RelNode sortCopy = sortCopyCapture.getValue();
     Assert.assertTrue(sortCopy instanceof LogicalSort);
-    Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof LogicalSortExchange);
+    Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof PinotLogicalSortExchange);
     Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0) instanceof LogicalSort);
 
     LogicalSort innerSort = (LogicalSort) ((LogicalSort) sortCopy).getInput().getInput(0);
@@ -188,7 +248,8 @@ public class PinotSortExchangeCopyRuleTest {
   public void shouldMatchLimitOffsetAndSort() {
     // Given:
     RelCollation collation = RelCollations.of(1);
-    SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation);
+    SortExchange exchange = PinotLogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation, false,
+        true);
     Sort sort = LogicalSort.create(exchange, collation, literal(1), literal(2));
     Mockito.when(_call.rel(0)).thenReturn(sort);
     Mockito.when(_call.rel(1)).thenReturn(exchange);
@@ -202,7 +263,7 @@ public class PinotSortExchangeCopyRuleTest {
 
     RelNode sortCopy = sortCopyCapture.getValue();
     Assert.assertTrue(sortCopy instanceof LogicalSort);
-    Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof LogicalSortExchange);
+    Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof PinotLogicalSortExchange);
     Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0) instanceof LogicalSort);
 
     LogicalSort innerSort = (LogicalSort) ((LogicalSort) sortCopy).getInput().getInput(0);
@@ -215,7 +276,8 @@ public class PinotSortExchangeCopyRuleTest {
   public void shouldNotMatchOnlySortAlreadySorted() {
     // Given:
     RelCollation collation = RelCollations.of(1);
-    SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation);
+    SortExchange exchange = PinotLogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation, false,
+        true);
     Sort sort = LogicalSort.create(exchange, collation, null, null);
     Mockito.when(_call.rel(0)).thenReturn(sort);
     Mockito.when(_call.rel(1)).thenReturn(exchange);
@@ -231,7 +293,8 @@ public class PinotSortExchangeCopyRuleTest {
   @Test
   public void shouldNotMatchOffsetNoLimitNoSort() {
     // Given:
-    SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, RelCollations.EMPTY);
+    SortExchange exchange = PinotLogicalSortExchange.create(_input, RelDistributions.SINGLETON, RelCollations.EMPTY,
+        false, true);
     Sort sort = LogicalSort.create(exchange, RelCollations.EMPTY, literal(1), null);
     Mockito.when(_call.rel(0)).thenReturn(sort);
     Mockito.when(_call.rel(1)).thenReturn(exchange);
diff --git a/pinot-query-planner/src/test/resources/queries/BasicQueryPlans.json b/pinot-query-planner/src/test/resources/queries/BasicQueryPlans.json
index 13d505b3cb..749db3748c 100644
--- a/pinot-query-planner/src/test/resources/queries/BasicQueryPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/BasicQueryPlans.json
@@ -39,7 +39,7 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[dateTrunc('DAY', $4)])",
           "\n  LogicalSort(offset=[0], fetch=[10])",
-          "\n    LogicalSortExchange(distribution=[hash], collation=[[]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalSort(fetch=[10])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -52,7 +52,7 @@
           "Execution Plan",
           "\nLogicalProject(day=[dateTrunc('DAY', $4)])",
           "\n  LogicalSort(offset=[0], fetch=[10])",
-          "\n    LogicalSortExchange(distribution=[hash], collation=[[]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalSort(fetch=[10])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
diff --git a/pinot-query-planner/src/test/resources/queries/JoinPlans.json b/pinot-query-planner/src/test/resources/queries/JoinPlans.json
index ab57467c57..e8e1631c62 100644
--- a/pinot-query-planner/src/test/resources/queries/JoinPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/JoinPlans.json
@@ -8,7 +8,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$0], ts=[$1], col3=[$3])",
           "\n  LogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
-          "\n    LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalSort(sort0=[$0], dir0=[ASC])",
           "\n        LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
           "\n          LogicalExchange(distribution=[hash[0]])",
@@ -27,7 +27,7 @@
           "Execution Plan",
           "\nLogicalProject(value1=[$0], ts1=[$1], col3=[$3])",
           "\n  LogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
-          "\n    LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalSort(sort0=[$0], dir0=[ASC])",
           "\n        LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
           "\n          LogicalExchange(distribution=[hash[0]])",
diff --git a/pinot-query-planner/src/test/resources/queries/OrderByPlans.json b/pinot-query-planner/src/test/resources/queries/OrderByPlans.json
index 0b7f33e026..e0787faa3e 100644
--- a/pinot-query-planner/src/test/resources/queries/OrderByPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/OrderByPlans.json
@@ -8,7 +8,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$3])",
           "\n  LogicalSort(sort0=[$3], dir0=[ASC], offset=[0])",
-          "\n    LogicalSortExchange(distribution=[hash], collation=[[3]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[3]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalSort(sort0=[$3], dir0=[ASC])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -21,7 +21,7 @@
           "Execution Plan",
           "\nLogicalProject(value1=[$3])",
           "\n  LogicalSort(sort0=[$3], dir0=[ASC], offset=[0])",
-          "\n    LogicalSortExchange(distribution=[hash], collation=[[3]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[3]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalSort(sort0=[$3], dir0=[ASC])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -33,7 +33,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$3], dir0=[ASC], offset=[0], fetch=[10])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[3]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[3]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$3], dir0=[ASC], fetch=[10])",
           "\n      LogicalTableScan(table=[[a]])",
           "\n"
@@ -45,7 +45,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$3], sort1=[$1], dir0=[ASC], dir1=[DESC], offset=[0], fetch=[10])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[3, 1 DESC]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[3, 1 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$3], sort1=[$1], dir0=[ASC], dir1=[DESC], fetch=[10])",
           "\n      LogicalTableScan(table=[[b]])",
           "\n"
@@ -57,7 +57,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
           "\n      LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
           "\n        LogicalExchange(distribution=[hash[0]])",
@@ -72,7 +72,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
           "\n      LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
           "\n        LogicalExchange(distribution=[hash[0]])",
@@ -87,7 +87,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
           "\n      LogicalAggregate(group=[{0}], sum=[$SUM0($1)])",
           "\n        LogicalExchange(distribution=[hash[0]])",
@@ -102,7 +102,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
           "\n      LogicalAggregate(group=[{0}], sum=[$SUM0($1)])",
           "\n        LogicalExchange(distribution=[hash[0]])",
diff --git a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
index 4516ae1aa2..49312c481b 100644
--- a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
@@ -61,7 +61,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
           "\n      LogicalProject(col1=[$2], EXPR$1=[$3], col2=[$0])",
           "\n        LogicalWindow(window#0=[window(aggs [SUM($1)])])",
@@ -77,7 +77,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(offset=[0], fetch=[10])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(fetch=[10])",
           "\n      LogicalProject(col1=[$1], $1=[$2])",
           "\n        LogicalWindow(window#0=[window(aggs [SUM($0)])])",
@@ -93,7 +93,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0], fetch=[20])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$2], dir0=[ASC], fetch=[20])",
           "\n      LogicalProject(col1=[$2], EXPR$1=[$3], col2=[$0])",
           "\n        LogicalWindow(window#0=[window(aggs [SUM($1)])])",
@@ -211,7 +211,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
           "\n      LogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], col3=[$0])",
           "\n        LogicalWindow(window#0=[window(aggs [SUM($0), COUNT($0)])])",
@@ -284,7 +284,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
           "\n      LogicalProject(col1=[$2], $1=[$3], $2=[$4])",
           "\n        LogicalWindow(window#0=[window(aggs [SUM($1), COUNT($0)])])",
@@ -300,7 +300,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(offset=[0], fetch=[100])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(fetch=[100])",
           "\n      LogicalProject(col1=[$2], $1=[$3], $2=[$4])",
           "\n        LogicalWindow(window#0=[window(aggs [SUM($1), COUNT($0)])])",
@@ -316,7 +316,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0], fetch=[10])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$0], dir0=[ASC], fetch=[10])",
           "\n      LogicalProject(col1=[$2], $1=[$3], $2=[$4])",
           "\n        LogicalWindow(window#0=[window(aggs [SUM($1), COUNT($0)])])",
@@ -434,7 +434,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$3], dir0=[ASC], offset=[0])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[3]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[3]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$3], dir0=[ASC])",
           "\n      LogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$2], col3=[$0])",
           "\n        LogicalWindow(window#0=[window(aggs [SUM($0), COUNT($0)])])",
@@ -520,7 +520,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
           "\n      LogicalProject(col2=[$0], EXPR$1=[$3], col1=[$2])",
           "\n        LogicalWindow(window#0=[window(partition {2} aggs [MIN($1)])])",
@@ -536,7 +536,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(offset=[0], fetch=[10])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(fetch=[10])",
           "\n      LogicalProject(col1=[$2], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n        LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1)])])",
@@ -552,7 +552,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0], fetch=[10])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$2], dir0=[ASC], fetch=[10])",
           "\n      LogicalProject(col2=[$0], EXPR$1=[$3], col1=[$2])",
           "\n        LogicalWindow(window#0=[window(partition {2} aggs [MIN($1)])])",
@@ -683,7 +683,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
           "\n      LogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], col3=[$0])",
           "\n        LogicalWindow(window#0=[window(partition {0} aggs [SUM($0), COUNT($0)])])",
@@ -769,7 +769,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
           "\n      LogicalProject(col1=[$2], $1=[$3], $2=[$4])",
           "\n        LogicalWindow(window#0=[window(partition {0, 2} aggs [SUM($1), MAX($1)])])",
@@ -785,7 +785,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(offset=[0], fetch=[10])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(fetch=[10])",
           "\n      LogicalProject(col1=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
           "\n        LogicalWindow(window#0=[window(partition {1} aggs [SUM($0), COUNT($0), MIN($0)])])",
@@ -801,7 +801,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0], fetch=[10])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$0], dir0=[ASC], fetch=[10])",
           "\n      LogicalProject(col1=[$2], $1=[$3], $2=[$4])",
           "\n        LogicalWindow(window#0=[window(partition {0, 2} aggs [SUM($1), MAX($1)])])",
@@ -817,7 +817,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
           "\n      LogicalProject(col1=[$2], $1=[$3], $2=[$4])",
           "\n        LogicalWindow(window#0=[window(partition {0, 2} aggs [SUM($1), MAX($1)])])",
@@ -833,7 +833,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
           "\n      LogicalProject(col1=[$2], EXPR$1=[$3], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n        LogicalWindow(window#0=[window(partition {0, 2} aggs [SUM($1), COUNT($1)])])",
@@ -849,7 +849,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
           "\n      LogicalProject(col1=[$2], EXPR$1=[$3], EXPR$2=[/(CAST($4):DOUBLE NOT NULL, $5)])",
           "\n        LogicalWindow(window#0=[window(partition {0, 2} aggs [SUM($1), SUM($1), COUNT($1)])])",
@@ -865,7 +865,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$3], dir0=[ASC], offset=[0])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[3]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[3]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$3], dir0=[ASC])",
           "\n      LogicalProject(col1=[$2], EXPR$1=[$3], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)], col2=[$0])",
           "\n        LogicalWindow(window#0=[window(partition {0, 2} aggs [SUM($1), COUNT($1)])])",
@@ -996,7 +996,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$3], dir0=[ASC], offset=[0])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[3]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[3]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$3], dir0=[ASC])",
           "\n      LogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4], col3=[$0])",
           "\n        LogicalWindow(window#0=[window(partition {0} aggs [SUM($0), COUNT($0), MAX($0)])])",
@@ -1016,7 +1016,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$2])",
           "\n  LogicalWindow(window#0=[window(order by [0] aggs [SUM($1)])])",
-          "\n    LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1030,7 +1030,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$2])",
           "\n  LogicalWindow(window#0=[window(order by [0] aggs [SUM($1)])])",
-          "\n    LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1043,7 +1043,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$2], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n  LogicalWindow(window#0=[window(order by [0] aggs [SUM($1), COUNT($1)])])",
-          "\n    LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1056,7 +1056,7 @@
           "Execution Plan",
           "\nLogicalProject(value1=[$2], avg=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n  LogicalWindow(window#0=[window(order by [0] aggs [SUM($1), COUNT($1)])])",
-          "\n    LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1069,7 +1069,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$2])",
           "\n  LogicalWindow(window#0=[window(order by [1] aggs [MAX($0)])])",
-          "\n    LogicalSortExchange(distribution=[hash], collation=[[1]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col3=[$2], col1=[$3])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1081,11 +1081,11 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
           "\n      LogicalProject(col2=[$0], EXPR$1=[$3], col1=[$2])",
           "\n        LogicalWindow(window#0=[window(order by [2 DESC] aggs [MIN($1)])])",
-          "\n          LogicalSortExchange(distribution=[hash], collation=[[2 DESC]])",
+          "\n          PinotLogicalSortExchange(distribution=[hash], collation=[[2 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n            LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -1097,11 +1097,11 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(offset=[0], fetch=[10])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(fetch=[10])",
           "\n      LogicalProject(col1=[$2], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n        LogicalWindow(window#0=[window(order by [0] aggs [SUM($1), COUNT($1)])])",
-          "\n          LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n          PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n            LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -1113,11 +1113,11 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0], fetch=[10])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$2], dir0=[ASC], fetch=[10])",
           "\n      LogicalProject(col2=[$0], EXPR$1=[$3], col1=[$2])",
           "\n        LogicalWindow(window#0=[window(order by [2 DESC] aggs [MIN($1)])])",
-          "\n          LogicalSortExchange(distribution=[hash], collation=[[2 DESC]])",
+          "\n          PinotLogicalSortExchange(distribution=[hash], collation=[[2 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n            LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -1130,7 +1130,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$2], $1=[$3])",
           "\n  LogicalWindow(window#0=[window(order by [1] aggs [COUNT($0)])])",
-          "\n    LogicalSortExchange(distribution=[hash], collation=[[1]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2], $2=[SUBSTR($3, 0, 2)])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1143,7 +1143,7 @@
           "Execution Plan",
           "\nLogicalProject(col2=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)])",
           "\n  LogicalWindow(window#0=[window(order by [0] aggs [SUM($1), COUNT($1)])])",
-          "\n    LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2])",
           "\n        LogicalFilter(condition=[AND(>($2, 10), <=($2, 500))])",
           "\n          LogicalTableScan(table=[[a]])",
@@ -1157,7 +1157,7 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[$2], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n  LogicalWindow(window#0=[window(order by [0] aggs [SUM($1), COUNT($1)])])",
-          "\n    LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2], $2=[CONCAT($3, '-', $1)])",
           "\n        LogicalFilter(condition=[OR(AND(<>($3, 'bar'), <>($3, 'foo')), >=($2, 42))])",
           "\n          LogicalTableScan(table=[[a]])",
@@ -1171,7 +1171,7 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[/(CAST($2):DOUBLE NOT NULL, $3)])",
           "\n  LogicalWindow(window#0=[window(order by [1] aggs [SUM($0), COUNT($0)])])",
-          "\n    LogicalSortExchange(distribution=[hash], collation=[[1]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col3=[$2], $1=[CONCAT($3, '-', $1)])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1184,7 +1184,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$3], $1=[$4])",
           "\n  LogicalWindow(window#0=[window(order by [2] aggs [MAX($1), COUNT($0)])])",
-          "\n    LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1198,7 +1198,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$3], $1=[$4])",
           "\n  LogicalWindow(window#0=[window(order by [2] aggs [MAX($1), COUNT($0)])])",
-          "\n    LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1211,7 +1211,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
           "\n  LogicalWindow(window#0=[window(order by [1] aggs [SUM($0), COUNT($0), MIN($0)])])",
-          "\n    LogicalSortExchange(distribution=[hash], collation=[[1]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col3=[$2], col1=[$3])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1224,7 +1224,7 @@
           "Execution Plan",
           "\nLogicalProject(value1=[$1], avg=[/(CAST($2):DOUBLE NOT NULL, $3)], min=[$4])",
           "\n  LogicalWindow(window#0=[window(order by [1] aggs [SUM($0), COUNT($0), MIN($0)])])",
-          "\n    LogicalSortExchange(distribution=[hash], collation=[[1]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col3=[$2], col1=[$3])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1237,7 +1237,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$2], $1=[$3])",
           "\n  LogicalWindow(window#0=[window(order by [0] aggs [COUNT($1), MIN($1)])])",
-          "\n    LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1249,11 +1249,11 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$0], dir0=[DESC], offset=[0])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[0 DESC]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$0], dir0=[DESC])",
           "\n      LogicalProject(col1=[$2], EXPR$1=[$3], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n        LogicalWindow(window#0=[window(order by [0, 2 DESC] aggs [SUM($1), COUNT($1)])])",
-          "\n          LogicalSortExchange(distribution=[hash], collation=[[0, 2 DESC]])",
+          "\n          PinotLogicalSortExchange(distribution=[hash], collation=[[0, 2 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n            LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -1265,11 +1265,11 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(offset=[0], fetch=[10])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(fetch=[10])",
           "\n      LogicalProject(col1=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
           "\n        LogicalWindow(window#0=[window(order by [1] aggs [SUM($0), COUNT($0), MIN($0)])])",
-          "\n          LogicalSortExchange(distribution=[hash], collation=[[1]])",
+          "\n          PinotLogicalSortExchange(distribution=[hash], collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n            LogicalProject(col3=[$2], col1=[$3])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -1281,11 +1281,11 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$0], dir0=[DESC], offset=[0], fetch=[10])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[0 DESC]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$0], dir0=[DESC], fetch=[10])",
           "\n      LogicalProject(col1=[$2], EXPR$1=[$3], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n        LogicalWindow(window#0=[window(order by [0, 2 DESC] aggs [SUM($1), COUNT($1)])])",
-          "\n          LogicalSortExchange(distribution=[hash], collation=[[0, 2 DESC]])",
+          "\n          PinotLogicalSortExchange(distribution=[hash], collation=[[0, 2 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n            LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -1298,7 +1298,7 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[REVERSE($2)], EXPR$1=[$3], EXPR$2=[$4])",
           "\n  LogicalWindow(window#0=[window(order by [0] aggs [SUM($1), MAX($1)])])",
-          "\n    LogicalSortExchange(distribution=[hash], collation=[[0]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1311,7 +1311,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
           "\n  LogicalWindow(window#0=[window(order by [1] aggs [SUM($0), COUNT($0), COUNT($1)])])",
-          "\n    LogicalSortExchange(distribution=[hash], collation=[[1]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col3=[$2], col1=[$3])",
           "\n        LogicalFilter(condition=[AND(>($2, 42), OR(=($3, 'chewbacca':VARCHAR(9)), =($3, 'vader':VARCHAR(9)), =($3, 'yoda':VARCHAR(9))))])",
           "\n          LogicalTableScan(table=[[a]])",
@@ -1325,7 +1325,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$2], $1=[$3], $2=[$4])",
           "\n  LogicalWindow(window#0=[window(order by [1] aggs [MIN($0), MAX($0)])])",
-          "\n    LogicalSortExchange(distribution=[hash], collation=[[1]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col3=[$2], col1=[$3], $2=[REVERSE(CONCAT($3, ' ', $1))])",
           "\n        LogicalFilter(condition=[AND(<>($1, 'bar'), <>($1, 'baz'), <>($1, 'foo'))])",
           "\n          LogicalTableScan(table=[[a]])",
@@ -1339,7 +1339,7 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[/(CAST($3):DOUBLE NOT NULL, $4)], EXPR$1=[$5])",
           "\n  LogicalWindow(window#0=[window(order by [2] aggs [SUM($0), COUNT($0), COUNT($1)])])",
-          "\n    LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col3=[$2], col1=[$3], $2=[REVERSE(CONCAT($3, '-', $1))])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1417,7 +1417,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
           "\n      LogicalProject(col2=[$0], EXPR$1=[$3], col1=[$2])",
           "\n        LogicalWindow(window#0=[window(partition {2} order by [2] aggs [MIN($1)])])",
@@ -1433,7 +1433,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(offset=[0], fetch=[10])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(fetch=[10])",
           "\n      LogicalProject(col1=[$2], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n        LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1), COUNT($1)])])",
@@ -1449,7 +1449,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0], fetch=[10])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$2], dir0=[ASC], fetch=[10])",
           "\n      LogicalProject(col2=[$0], EXPR$1=[$3], col1=[$2])",
           "\n        LogicalWindow(window#0=[window(partition {2} order by [2] aggs [MIN($1)])])",
@@ -1624,7 +1624,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[ASC], offset=[0])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[3, 0]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[3, 0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[ASC])",
           "\n      LogicalProject(col1=[$2], EXPR$1=[$3], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)], col2=[$0])",
           "\n        LogicalWindow(window#0=[window(partition {0, 2} order by [0, 2] aggs [SUM($1), COUNT($1)])])",
@@ -1640,7 +1640,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(offset=[0], fetch=[10])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(fetch=[10])",
           "\n      LogicalProject(col1=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
           "\n        LogicalWindow(window#0=[window(partition {1} order by [1] aggs [SUM($0), COUNT($0), MIN($0)])])",
@@ -1656,7 +1656,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[ASC], offset=[0], fetch=[10])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[3, 0]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[3, 0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[ASC], fetch=[10])",
           "\n      LogicalProject(col1=[$2], EXPR$1=[$3], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)], col2=[$0])",
           "\n        LogicalWindow(window#0=[window(partition {0, 2} order by [0, 2] aggs [SUM($1), COUNT($1)])])",
@@ -1766,7 +1766,7 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n  LogicalWindow(window#0=[window(partition {0} order by [2] aggs [SUM($1), COUNT($1)])])",
-          "\n    LogicalSortExchange(distribution=[hash[0]], collation=[[2]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash[0]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1779,7 +1779,7 @@
           "Execution Plan",
           "\nLogicalProject(avg=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n  LogicalWindow(window#0=[window(partition {0} order by [2] aggs [SUM($1), COUNT($1)])])",
-          "\n    LogicalSortExchange(distribution=[hash[0]], collation=[[2]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash[0]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1792,7 +1792,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$2], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n  LogicalWindow(window#0=[window(partition {0} order by [2] aggs [SUM($1), COUNT($1)])])",
-          "\n    LogicalSortExchange(distribution=[hash[0]], collation=[[2]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash[0]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1805,7 +1805,7 @@
           "Execution Plan",
           "\nLogicalProject(value1=[$2], avg=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n  LogicalWindow(window#0=[window(partition {0} order by [2] aggs [SUM($1), COUNT($1)])])",
-          "\n    LogicalSortExchange(distribution=[hash[0]], collation=[[2]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash[0]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1818,7 +1818,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$3])",
           "\n  LogicalWindow(window#0=[window(partition {2} order by [0] aggs [MAX($1)])])",
-          "\n    LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash[2]], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1830,11 +1830,11 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
           "\n      LogicalProject(col2=[$0], EXPR$1=[$3], col1=[$2])",
           "\n        LogicalWindow(window#0=[window(partition {2} order by [0] aggs [MIN($1)])])",
-          "\n          LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+          "\n          PinotLogicalSortExchange(distribution=[hash[2]], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n            LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -1846,11 +1846,11 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(offset=[0], fetch=[10])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(fetch=[10])",
           "\n      LogicalProject(col1=[$2], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n        LogicalWindow(window#0=[window(partition {0} order by [2] aggs [SUM($1), COUNT($1)])])",
-          "\n          LogicalSortExchange(distribution=[hash[0]], collation=[[2]])",
+          "\n          PinotLogicalSortExchange(distribution=[hash[0]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n            LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -1862,11 +1862,11 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0], fetch=[10])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[2]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$2], dir0=[ASC], fetch=[10])",
           "\n      LogicalProject(col2=[$0], EXPR$1=[$3], col1=[$2])",
           "\n        LogicalWindow(window#0=[window(partition {2} order by [0] aggs [MIN($1)])])",
-          "\n          LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+          "\n          PinotLogicalSortExchange(distribution=[hash[2]], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n            LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -1879,7 +1879,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$3], $1=[$4])",
           "\n  LogicalWindow(window#0=[window(partition {1} order by [2] aggs [COUNT($0)])])",
-          "\n    LogicalSortExchange(distribution=[hash[1]], collation=[[2]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash[1]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2], col1=[$3], $3=[SUBSTR($3, 0, 2)])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1892,7 +1892,7 @@
           "Execution Plan",
           "\nLogicalProject(col2=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n  LogicalWindow(window#0=[window(partition {2} order by [0] aggs [SUM($1), COUNT($1)])])",
-          "\n    LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash[2]], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n        LogicalFilter(condition=[AND(>($2, 10), <=($2, 500))])",
           "\n          LogicalTableScan(table=[[a]])",
@@ -1906,7 +1906,7 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[$3], EXPR$1=[/(CAST($4):DOUBLE NOT NULL, $5)])",
           "\n  LogicalWindow(window#0=[window(partition {0} order by [2] aggs [SUM($1), COUNT($1)])])",
-          "\n    LogicalSortExchange(distribution=[hash[0]], collation=[[2]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash[0]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2], col1=[$3], $3=[CONCAT($3, '-', $1)])",
           "\n        LogicalFilter(condition=[OR(AND(<>($3, 'bar'), <>($3, 'foo')), >=($2, 42))])",
           "\n          LogicalTableScan(table=[[a]])",
@@ -1920,7 +1920,7 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n  LogicalWindow(window#0=[window(partition {2} order by [1] aggs [SUM($0), COUNT($0)])])",
-          "\n    LogicalSortExchange(distribution=[hash[2]], collation=[[1]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash[2]], collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col3=[$2], $1=[REVERSE($1)], $2=[CONCAT($3, '-', $1)])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1933,7 +1933,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$3], $1=[$4])",
           "\n  LogicalWindow(window#0=[window(partition {2} order by [1] aggs [MAX($1), COUNT($0)])])",
-          "\n    LogicalSortExchange(distribution=[hash[2]], collation=[[1]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash[2]], collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1947,7 +1947,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$3], $1=[$4])",
           "\n  LogicalWindow(window#0=[window(partition {2} order by [1] aggs [MAX($1), COUNT($0)])])",
-          "\n    LogicalSortExchange(distribution=[hash[2]], collation=[[1]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash[2]], collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1960,7 +1960,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$2], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)], EXPR$2=[$5])",
           "\n  LogicalWindow(window#0=[window(partition {2} order by [0] aggs [SUM($1), COUNT($1), MIN($1)])])",
-          "\n    LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash[2]], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1973,7 +1973,7 @@
           "Execution Plan",
           "\nLogicalProject(value1=[$2], avg=[/(CAST($3):DOUBLE NOT NULL, $4)], min=[$5])",
           "\n  LogicalWindow(window#0=[window(partition {2} order by [0] aggs [SUM($1), COUNT($1), MIN($1)])])",
-          "\n    LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash[2]], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1986,7 +1986,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$3], $1=[$4])",
           "\n  LogicalWindow(window#0=[window(partition {2} order by [0] aggs [COUNT($1), MIN($1)])])",
-          "\n    LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash[2]], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -1998,11 +1998,11 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[DESC], offset=[0])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[3, 0 DESC]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[3, 0 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[DESC])",
           "\n      LogicalProject(col1=[$2], EXPR$1=[$3], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)], col2=[$0])",
           "\n        LogicalWindow(window#0=[window(partition {0, 2} order by [1, 2] aggs [SUM($1), COUNT($1)])])",
-          "\n          LogicalSortExchange(distribution=[hash[0, 2]], collation=[[1, 2]])",
+          "\n          PinotLogicalSortExchange(distribution=[hash[0, 2]], collation=[[1, 2]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n            LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -2014,11 +2014,11 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(offset=[0], fetch=[10])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(fetch=[10])",
           "\n      LogicalProject(col1=[$2], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)], EXPR$2=[$5])",
           "\n        LogicalWindow(window#0=[window(partition {2} order by [0] aggs [SUM($1), COUNT($1), MIN($1)])])",
-          "\n          LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+          "\n          PinotLogicalSortExchange(distribution=[hash[2]], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n            LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -2030,11 +2030,11 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[DESC], offset=[0], fetch=[10])",
-          "\n  LogicalSortExchange(distribution=[hash], collation=[[3, 0 DESC]])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[3, 0 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n    LogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[DESC], fetch=[10])",
           "\n      LogicalProject(col1=[$2], EXPR$1=[$3], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)], col2=[$0])",
           "\n        LogicalWindow(window#0=[window(partition {0, 2} order by [1, 2] aggs [SUM($1), COUNT($1)])])",
-          "\n          LogicalSortExchange(distribution=[hash[0, 2]], collation=[[1, 2]])",
+          "\n          PinotLogicalSortExchange(distribution=[hash[0, 2]], collation=[[1, 2]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n            LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n              LogicalTableScan(table=[[a]])",
           "\n"
@@ -2047,7 +2047,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$3], $1=[$4], $2=[$5])",
           "\n  LogicalWindow(window#0=[window(partition {0} order by [2] aggs [SUM($1), MAX($1)])])",
-          "\n    LogicalSortExchange(distribution=[hash[0]], collation=[[2]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash[0]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2], col1=[$3], $3=[REVERSE($3)])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -2060,7 +2060,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$2], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)], EXPR$2=[$5])",
           "\n  LogicalWindow(window#0=[window(partition {2} order by [0] aggs [SUM($1), COUNT($1), COUNT($2)])])",
-          "\n    LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash[2]], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
           "\n        LogicalFilter(condition=[AND(>($2, 42), OR(=($3, 'chewbacca':VARCHAR(9)), =($3, 'vader':VARCHAR(9)), =($3, 'yoda':VARCHAR(9))))])",
           "\n          LogicalTableScan(table=[[a]])",
@@ -2074,7 +2074,7 @@
           "Execution Plan",
           "\nLogicalProject($0=[$3], $1=[$4], $2=[$5])",
           "\n  LogicalWindow(window#0=[window(partition {2} order by [0] aggs [MIN($1), MAX($1)])])",
-          "\n    LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash[2]], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col2=[$1], col3=[$2], col1=[$3], $3=[REVERSE(CONCAT($3, ' ', $1))])",
           "\n        LogicalFilter(condition=[AND(<>($1, 'bar'), <>($1, 'baz'), <>($1, 'foo'))])",
           "\n          LogicalTableScan(table=[[a]])",
@@ -2088,7 +2088,7 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[/(CAST($4):DOUBLE NOT NULL, $5)], EXPR$1=[$6])",
           "\n  LogicalWindow(window#0=[window(partition {3} order by [2] aggs [SUM($0), COUNT($0), COUNT($1)])])",
-          "\n    LogicalSortExchange(distribution=[hash[3]], collation=[[2]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash[3]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalProject(col3=[$2], col1=[$3], $2=[CONCAT($3, '-', $1)], $3=[REVERSE(CONCAT($3, '-', $1))])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -2120,7 +2120,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$1], EXPR$1=[$2], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n  LogicalWindow(window#0=[window(order by [2 DESC, 1] aggs [SUM($0), COUNT($0)])])",
-          "\n    LogicalSortExchange(distribution=[hash], collation=[[2 DESC, 1]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[2 DESC, 1]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalAggregate(group=[{0, 1}], EXPR$1=[$SUM0($2)])",
           "\n        LogicalExchange(distribution=[hash[0, 1]])",
           "\n          LogicalAggregate(group=[{2, 3}], EXPR$1=[COUNT()])",
@@ -2135,7 +2135,7 @@
           "Execution Plan",
           "\nLogicalProject(col1=[$1], EXPR$1=[$2], $2=[$3])",
           "\n  LogicalWindow(window#0=[window(partition {1} order by [2 DESC, 1] aggs [MAX($0)])])",
-          "\n    LogicalSortExchange(distribution=[hash[1]], collation=[[2 DESC, 1]])",
+          "\n    PinotLogicalSortExchange(distribution=[hash[1]], collation=[[2 DESC, 1]], isSortOnSender=[false], isSortOnReceiver=[true])",
           "\n      LogicalAggregate(group=[{0, 1}], EXPR$1=[$SUM0($2)])",
           "\n        LogicalExchange(distribution=[hash[0, 1]])",
           "\n          LogicalAggregate(group=[{2, 3}], EXPR$1=[COUNT()])",
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 9436ec4bf0..b0d0433f42 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -215,7 +215,8 @@ public class QueryRunner {
               deadlineMs, distributedStagePlan.getMetadataMap());
       mailboxSendOperator = new MailboxSendOperator(opChainExecutionContext,
           new LeafStageTransferableBlockOperator(opChainExecutionContext, serverQueryResults, sendNode.getDataSchema()),
-          sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), sendNode.getStageId(),
+          sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), sendNode.getCollationKeys(),
+          sendNode.getCollationDirections(), sendNode.isSortOnSender(), sendNode.getStageId(),
           sendNode.getReceiverStageId());
       int blockCounter = 0;
       while (!TransferableBlockUtils.isEndOfStream(mailboxSendOperator.nextBlock())) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 520392985e..fe9782c4c6 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -23,19 +23,27 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.PriorityQueue;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.routing.VirtualServer;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.utils.SortUtils;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.apache.pinot.query.service.QueryConfig;
 import org.slf4j.Logger;
@@ -50,6 +58,9 @@ import org.slf4j.LoggerFactory;
  *  We use sendingStageInstance to deduce mailboxId and fetch the content from mailboxService.
  *  When exchangeType is Singleton, we find the mapping mailbox for the mailboxService. If not found, use empty list.
  *  When exchangeType is non-Singleton, we pull from each instance in round-robin way to get matched mailbox content.
+ *
+ *  TODO: Once sorting on the {@code MailboxSendOperator} is available, modify this to use a k-way merge instead of
+ *        resorting via the PriorityQueue.
  */
 public class MailboxReceiveOperator extends MultiStageOperator {
   private static final Logger LOGGER = LoggerFactory.getLogger(MailboxReceiveOperator.class);
@@ -62,10 +73,17 @@ public class MailboxReceiveOperator extends MultiStageOperator {
 
   private final MailboxService<TransferableBlock> _mailboxService;
   private final RelDistribution.Type _exchangeType;
+  private final List<RexExpression> _collationKeys;
+  private final List<RelFieldCollation.Direction> _collationDirections;
+  private final boolean _isSortOnSender;
+  private final boolean _isSortOnReceiver;
+  private final DataSchema _dataSchema;
   private final List<MailboxIdentifier> _sendingMailbox;
   private final long _deadlineTimestampNano;
+  private final PriorityQueue<Object[]> _priorityQueue;
   private int _serverIdx;
   private TransferableBlock _upstreamErrorBlock;
+  private boolean _isSortedBlockConstructed;
 
   private static MailboxIdentifier toMailboxId(VirtualServer sender, long jobId, int senderStageId,
       int receiverStageId, VirtualServerAddress receiver) {
@@ -77,16 +95,20 @@ public class MailboxReceiveOperator extends MultiStageOperator {
         receiverStageId);
   }
 
-  public MailboxReceiveOperator(OpChainExecutionContext context, RelDistribution.Type exchangeType, int senderStageId,
-      int receiverStageId) {
-    this(context, context.getMetadataMap().get(senderStageId).getServerInstances(), exchangeType, senderStageId,
+  public MailboxReceiveOperator(OpChainExecutionContext context, RelDistribution.Type exchangeType,
+      List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections, boolean isSortOnSender,
+      boolean isSortOnReceiver, DataSchema dataSchema, int senderStageId, int receiverStageId) {
+    this(context, context.getMetadataMap().get(senderStageId).getServerInstances(), exchangeType, collationKeys,
+        collationDirections, isSortOnSender, isSortOnReceiver, dataSchema, senderStageId,
         receiverStageId, context.getTimeoutMs());
   }
 
   // TODO: Move deadlineInNanoSeconds to OperatorContext.
-  //TODO: Remove boxed timeoutMs value from here and use long deadlineMs from context.
+  // TODO: Remove boxed timeoutMs value from here and use long deadlineMs from context.
   public MailboxReceiveOperator(OpChainExecutionContext context, List<VirtualServer> sendingStageInstances,
-      RelDistribution.Type exchangeType, int senderStageId, int receiverStageId, Long timeoutMs) {
+      RelDistribution.Type exchangeType, List<RexExpression> collationKeys,
+      List<RelFieldCollation.Direction> collationDirections, boolean isSortOnSender, boolean isSortOnReceiver,
+      DataSchema dataSchema, int senderStageId, int receiverStageId, Long timeoutMs) {
     super(context);
     _mailboxService = context.getMailboxService();
     VirtualServerAddress receiver = context.getServer();
@@ -121,8 +143,20 @@ public class MailboxReceiveOperator extends MultiStageOperator {
         _sendingMailbox.add(toMailboxId(instance, jobId, senderStageId, receiverStageId, receiver));
       }
     }
+    _collationKeys = collationKeys;
+    _collationDirections = collationDirections;
+    _isSortOnSender = isSortOnSender;
+    _isSortOnReceiver = isSortOnReceiver;
+    _dataSchema = dataSchema;
+    if (CollectionUtils.isEmpty(collationKeys) || !_isSortOnReceiver) {
+      _priorityQueue = null;
+    } else {
+      _priorityQueue = new PriorityQueue<>(new SortUtils.SortComparator(collationKeys, collationDirections,
+          dataSchema, false));
+    }
     _upstreamErrorBlock = null;
     _serverIdx = 0;
+    _isSortedBlockConstructed = false;
   }
 
   public List<MailboxIdentifier> getSendingMailbox() {
@@ -143,6 +177,7 @@ public class MailboxReceiveOperator extends MultiStageOperator {
   @Override
   protected TransferableBlock getNextBlock() {
     if (_upstreamErrorBlock != null) {
+      cleanUpResourcesOnError();
       return _upstreamErrorBlock;
     } else if (System.nanoTime() >= _deadlineTimestampNano) {
       return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
@@ -165,12 +200,20 @@ public class MailboxReceiveOperator extends MultiStageOperator {
           // Get null block when pulling times out from mailbox.
           if (block != null) {
             if (block.isErrorBlock()) {
+              cleanUpResourcesOnError();
               _upstreamErrorBlock =
                   TransferableBlockUtils.getErrorTransferableBlock(block.getDataBlock().getExceptions());
               return _upstreamErrorBlock;
             }
             if (!block.isEndOfStreamBlock()) {
-              return block;
+              if (_priorityQueue != null) {
+                // Ordering is enabled, add rows to the PriorityQueue
+                List<Object[]> container = block.getContainer();
+                _priorityQueue.addAll(container);
+              } else {
+                // Ordering is not enabled, return the input block as is
+                return block;
+              }
             } else {
               if (!block.getResultMetadata().isEmpty()) {
                 _operatorStatsMap.putAll(block.getResultMetadata());
@@ -185,6 +228,18 @@ public class MailboxReceiveOperator extends MultiStageOperator {
       }
     }
 
+    if (((openMailboxCount == 0) || (openMailboxCount <= eosMailboxCount))
+        && (!CollectionUtils.isEmpty(_priorityQueue)) && !_isSortedBlockConstructed) {
+      // Some data is present in the PriorityQueue, these need to be sent upstream
+      LinkedList<Object[]> rows = new LinkedList<>();
+      while (_priorityQueue.size() > 0) {
+        Object[] row = _priorityQueue.poll();
+        rows.addFirst(row);
+      }
+      _isSortedBlockConstructed = true;
+      return new TransferableBlock(rows, _dataSchema, DataBlock.Type.ROW);
+    }
+
     // there are two conditions in which we should return EOS: (1) there were
     // no mailboxes to open (this shouldn't happen because the second condition
     // should be hit first, but is defensive) (2) every mailbox that was opened
@@ -196,6 +251,16 @@ public class MailboxReceiveOperator extends MultiStageOperator {
     return block;
   }
 
+  private void cleanUpResourcesOnError() {
+    if (_priorityQueue != null) {
+      _priorityQueue.clear();
+    }
+  }
+
+  public boolean hasCollationKeys() {
+    return !CollectionUtils.isEmpty(_collationKeys);
+  }
+
   @Override
   public void close() {
     super.close();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 940e668cdc..3be593a70a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -28,9 +28,11 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.routing.VirtualServer;
 import org.apache.pinot.query.routing.VirtualServerAddress;
@@ -45,6 +47,8 @@ import org.slf4j.LoggerFactory;
 
 /**
  * This {@code MailboxSendOperator} is created to send {@link TransferableBlock}s to the receiving end.
+ *
+ * TODO: Add support to sort the data prior to sending if sorting is enabled
  */
 public class MailboxSendOperator extends MultiStageOperator {
   private static final Logger LOGGER = LoggerFactory.getLogger(MailboxSendOperator.class);
@@ -56,6 +60,9 @@ public class MailboxSendOperator extends MultiStageOperator {
 
   private final MultiStageOperator _dataTableBlockBaseOperator;
   private final BlockExchange _exchange;
+  private final List<RexExpression> _collationKeys;
+  private final List<RelFieldCollation.Direction> _collationDirections;
+  private final boolean _isSortOnSender;
 
   @VisibleForTesting
   interface BlockExchangeFactory {
@@ -70,16 +77,19 @@ public class MailboxSendOperator extends MultiStageOperator {
   }
 
   public MailboxSendOperator(OpChainExecutionContext context, MultiStageOperator dataTableBlockBaseOperator,
-      RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector, int senderStageId,
+      RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector, List<RexExpression> collationKeys,
+      List<RelFieldCollation.Direction> collationDirections, boolean isSortOnSender, int senderStageId,
       int receiverStageId) {
-    this(context, dataTableBlockBaseOperator, exchangeType, keySelector,
+    this(context, dataTableBlockBaseOperator, exchangeType, keySelector, collationKeys, collationDirections,
+        isSortOnSender,
         (server) -> toMailboxId(server, context.getRequestId(), senderStageId, receiverStageId, context.getServer()),
         BlockExchange::getExchange, receiverStageId);
   }
 
   @VisibleForTesting
   MailboxSendOperator(OpChainExecutionContext context, MultiStageOperator dataTableBlockBaseOperator,
-      RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector,
+      RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector, List<RexExpression> collationKeys,
+      List<RelFieldCollation.Direction> collationDirections, boolean isSortOnSender,
       MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory blockExchangeFactory, int receiverStageId) {
     super(context);
     _dataTableBlockBaseOperator = dataTableBlockBaseOperator;
@@ -116,6 +126,10 @@ public class MailboxSendOperator extends MultiStageOperator {
         blockExchangeFactory.build(context.getMailboxService(), receivingMailboxes, exchangeType, keySelector, splitter,
             context.getDeadlineMs());
 
+    _collationKeys = collationKeys;
+    _collationDirections = collationDirections;
+    _isSortOnSender = isSortOnSender;
+
     Preconditions.checkState(SUPPORTED_EXCHANGE_TYPE.contains(exchangeType),
         String.format("Exchange type '%s' is not supported yet", exchangeType));
   }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 9eeea863a9..7668dc0397 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -21,7 +21,6 @@ package org.apache.pinot.query.runtime.operator;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.PriorityQueue;
@@ -33,6 +32,7 @@ import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.utils.SortUtils;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,15 +56,15 @@ public class SortOperator extends MultiStageOperator {
 
   public SortOperator(OpChainExecutionContext context, MultiStageOperator upstreamOperator,
       List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections, int fetch, int offset,
-      DataSchema dataSchema) {
-    this(context, upstreamOperator, collationKeys, collationDirections, fetch, offset, dataSchema,
+      DataSchema dataSchema, boolean isInputSorted) {
+    this(context, upstreamOperator, collationKeys, collationDirections, fetch, offset, dataSchema, isInputSorted,
         SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY);
   }
 
   @VisibleForTesting
   SortOperator(OpChainExecutionContext context, MultiStageOperator upstreamOperator, List<RexExpression> collationKeys,
       List<RelFieldCollation.Direction> collationDirections, int fetch, int offset, DataSchema dataSchema,
-      int defaultHolderCapacity) {
+      boolean isInputSorted, int defaultHolderCapacity) {
     super(context);
     _upstreamOperator = upstreamOperator;
     _fetch = fetch;
@@ -73,13 +73,15 @@ public class SortOperator extends MultiStageOperator {
     _upstreamErrorBlock = null;
     _isSortedBlockConstructed = false;
     _numRowsToKeep = _fetch > 0 ? _fetch + _offset : defaultHolderCapacity;
-    // When there's no collationKeys, the SortOperator is a simple selection with row trim on limit & offset
-    if (collationKeys.isEmpty()) {
+    // Under the following circumstances, the SortOperator is a simple selection with row trim on limit & offset:
+    // - There are no collationKeys
+    // - 'isInputSorted' is set to true indicating that the data was already sorted
+    if (collationKeys.isEmpty() || isInputSorted) {
       _priorityQueue = null;
       _rows = new ArrayList<>();
     } else {
       _priorityQueue = new PriorityQueue<>(_numRowsToKeep,
-          new SortComparator(collationKeys, collationDirections, dataSchema, false));
+          new SortUtils.SortComparator(collationKeys, collationDirections, dataSchema, false));
       _rows = null;
     }
   }
@@ -174,45 +176,4 @@ public class SortOperator extends MultiStageOperator {
       }
     }
   }
-
-  private static class SortComparator implements Comparator<Object[]> {
-    private final int _size;
-    private final int[] _valueIndices;
-    private final int[] _multipliers;
-    private final boolean[] _useDoubleComparison;
-
-    public SortComparator(List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections,
-        DataSchema dataSchema, boolean isNullHandlingEnabled) {
-      DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
-      _size = collationKeys.size();
-      _valueIndices = new int[_size];
-      _multipliers = new int[_size];
-      _useDoubleComparison = new boolean[_size];
-      for (int i = 0; i < _size; i++) {
-        _valueIndices[i] = ((RexExpression.InputRef) collationKeys.get(i)).getIndex();
-        _multipliers[i] = collationDirections.get(i).isDescending() ? 1 : -1;
-        _useDoubleComparison[i] = columnDataTypes[_valueIndices[i]].isNumber();
-      }
-    }
-
-    @Override
-    public int compare(Object[] o1, Object[] o2) {
-      for (int i = 0; i < _size; i++) {
-        int index = _valueIndices[i];
-        Object v1 = o1[index];
-        Object v2 = o2[index];
-        int result;
-        if (_useDoubleComparison[i]) {
-          result = Double.compare(((Number) v1).doubleValue(), ((Number) v2).doubleValue());
-        } else {
-          //noinspection unchecked
-          result = ((Comparable) v1).compareTo(v2);
-        }
-        if (result != 0) {
-          return result * _multipliers[i];
-        }
-      }
-      return 0;
-    }
-  }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java
new file mode 100644
index 0000000000..820402f360
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.utils;
+
+import java.util.Comparator;
+import java.util.List;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.logical.RexExpression;
+
+
+public class SortUtils {
+
+  private SortUtils() {
+  }
+
+  public static class SortComparator implements Comparator<Object[]> {
+    private final int _size;
+    private final int[] _valueIndices;
+    private final int[] _multipliers;
+    private final boolean[] _useDoubleComparison;
+
+    public SortComparator(List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections,
+        DataSchema dataSchema, boolean isNullHandlingEnabled) {
+      DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+      _size = collationKeys.size();
+      _valueIndices = new int[_size];
+      _multipliers = new int[_size];
+      _useDoubleComparison = new boolean[_size];
+      for (int i = 0; i < _size; i++) {
+        _valueIndices[i] = ((RexExpression.InputRef) collationKeys.get(i)).getIndex();
+        _multipliers[i] = collationDirections.get(i).isDescending() ? 1 : -1;
+        _useDoubleComparison[i] = columnDataTypes[_valueIndices[i]].isNumber();
+      }
+    }
+
+    @Override
+    public int compare(Object[] o1, Object[] o2) {
+      for (int i = 0; i < _size; i++) {
+        int index = _valueIndices[i];
+        Object v1 = o1[index];
+        Object v2 = o2[index];
+        int result;
+        if (_useDoubleComparison[i]) {
+          result = Double.compare(((Number) v1).doubleValue(), ((Number) v2).doubleValue());
+        } else {
+          //noinspection unchecked
+          result = ((Comparable) v1).compareTo(v2);
+        }
+        if (result != 0) {
+          return result * _multipliers[i];
+        }
+      }
+      return 0;
+    }
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 4a9892a4dd..ca89173205 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -63,7 +63,8 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
   public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node, PlanRequestContext context) {
     MailboxReceiveOperator mailboxReceiveOperator =
         new MailboxReceiveOperator(context.getOpChainExecutionContext(), node.getExchangeType(),
-            node.getSenderStageId(), node.getStageId());
+            node.getCollationKeys(), node.getCollationDirections(), node.isSortOnSender(), node.isSortOnReceiver(),
+            node.getDataSchema(), node.getSenderStageId(), node.getStageId());
     context.addReceivingMailboxes(mailboxReceiveOperator.getSendingMailbox());
     return mailboxReceiveOperator;
   }
@@ -72,7 +73,8 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
   public MultiStageOperator visitMailboxSend(MailboxSendNode node, PlanRequestContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
     return new MailboxSendOperator(context.getOpChainExecutionContext(), nextOperator, node.getExchangeType(),
-        node.getPartitionKeySelector(), node.getStageId(), node.getReceiverStageId());
+        node.getPartitionKeySelector(), node.getCollationKeys(), node.getCollationDirections(), node.isSortOnSender(),
+        node.getStageId(), node.getReceiverStageId());
   }
 
   @Override
@@ -120,8 +122,10 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
   @Override
   public MultiStageOperator visitSort(SortNode node, PlanRequestContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
+    boolean isInputSorted =
+        nextOperator instanceof MailboxReceiveOperator && ((MailboxReceiveOperator) nextOperator).hasCollationKeys();
     return new SortOperator(context.getOpChainExecutionContext(), nextOperator, node.getCollationKeys(),
-        node.getCollationDirections(), node.getFetch(), node.getOffset(), node.getDataSchema());
+        node.getCollationDirections(), node.getFetch(), node.getOffset(), node.getDataSchema(), isInputSorted);
   }
 
   @Override
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index a295ef06f6..4d01c67105 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -21,6 +21,7 @@ package org.apache.pinot.query.service.dispatch;
 import com.google.common.annotations.VisibleForTesting;
 import io.grpc.Deadline;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -282,7 +283,8 @@ public class QueryDispatcher {
         new OpChainExecutionContext(mailboxService, jobId, stageId, server, timeoutMs, timeoutMs, stageMetadataMap);
     // timeout is set for reduce stage
     MailboxReceiveOperator mailboxReceiveOperator =
-        new MailboxReceiveOperator(context, RelDistribution.Type.RANDOM_DISTRIBUTED, stageId, reducerStageId);
+        new MailboxReceiveOperator(context, RelDistribution.Type.RANDOM_DISTRIBUTED, Collections.emptyList(),
+            Collections.emptyList(), false, false, dataSchema, stageId, reducerStageId);
     return mailboxReceiveOperator;
   }
 
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index ebd712a0c4..bfbb977e7c 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -20,15 +20,20 @@ package org.apache.pinot.query.runtime.operator;
 
 import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Random;
 import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.routing.VirtualServer;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -43,6 +48,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
+import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.STRING;
 
 
 public class MailboxReceiveOperatorTest {
@@ -66,6 +72,8 @@ public class MailboxReceiveOperatorTest {
 
   private final VirtualServerAddress _testAddr = new VirtualServerAddress("test", 123, 0);
 
+  private final Random _random = new Random();
+
   @BeforeMethod
   public void setUp() {
     _mocks = MockitoAnnotations.openMocks(this);
@@ -80,12 +88,24 @@ public class MailboxReceiveOperatorTest {
   @Test
   public void shouldTimeoutOnExtraLongSleep()
       throws InterruptedException {
+    // Choose whether to exercise the ordering code path or not
+    List<RexExpression> collationKeys = new ArrayList<>();
+    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+    boolean sortOnReceiver = false;
+    if (_random.nextBoolean()) {
+      collationKeys.add(new RexExpression.InputRef(0));
+      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+      sortOnReceiver = true;
+    }
+
     // shorter timeoutMs should result in error.
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, 10L, 10L,
             new HashMap<>());
     MailboxReceiveOperator receiveOp =
-        new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, 456, 789, 10L);
+        new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, collationKeys,
+            collationDirections, false, sortOnReceiver, inSchema, 456, 789, 10L);
     Thread.sleep(200L);
     TransferableBlock mailbox = receiveOp.nextBlock();
     Assert.assertTrue(mailbox.isErrorBlock());
@@ -95,13 +115,15 @@ public class MailboxReceiveOperatorTest {
     // longer timeout or default timeout (10s) doesn't result in error.
     context = new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, 2000L, 2000L,
         new HashMap<>());
-    receiveOp = new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, 456, 789, 2000L);
+    receiveOp = new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, collationKeys,
+        collationDirections, false, sortOnReceiver, inSchema, 456, 789, 2000L);
     Thread.sleep(200L);
     mailbox = receiveOp.nextBlock();
     Assert.assertFalse(mailbox.isErrorBlock());
     context = new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
         Long.MAX_VALUE, new HashMap<>());
-    receiveOp = new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, 456, 789, null);
+    receiveOp = new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, collationKeys,
+        collationDirections, false, sortOnReceiver, inSchema, 456, 789, null);
     Thread.sleep(200L);
     mailbox = receiveOp.nextBlock();
     Assert.assertFalse(mailbox.isErrorBlock());
@@ -120,12 +142,24 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_server2.getHostname()).thenReturn("singleton");
     Mockito.when(_server2.getQueryMailboxPort()).thenReturn(123);
 
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+
+    // Choose whether to exercise the ordering code path or not
+    List<RexExpression> collationKeys = new ArrayList<>();
+    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+    boolean sortOnReceiver = false;
+    if (_random.nextBoolean()) {
+      collationKeys.add(new RexExpression.InputRef(0));
+      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+      sortOnReceiver = true;
+    }
+
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
     MailboxReceiveOperator receiveOp =
-        new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON, 456,
-            789, null);
+        new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
+            collationKeys, collationDirections, false, sortOnReceiver, inSchema, 456, 789, null);
   }
 
   @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
@@ -139,11 +173,24 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_server2.getHostname()).thenReturn("singleton");
     Mockito.when(_server2.getQueryMailboxPort()).thenReturn(123);
 
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+
+    // Choose whether to exercise the ordering code path or not
+    List<RexExpression> collationKeys = new ArrayList<>();
+    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+    boolean sortOnReceiver = false;
+    if (_random.nextBoolean()) {
+      collationKeys.add(new RexExpression.InputRef(0));
+      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+      sortOnReceiver = true;
+    }
+
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
     MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
-        RelDistribution.Type.RANGE_DISTRIBUTED, 456, 789, null);
+        RelDistribution.Type.RANGE_DISTRIBUTED, collationKeys, collationDirections, false, sortOnReceiver, inSchema,
+        456, 789, null);
   }
 
   @Test
@@ -167,13 +214,26 @@ public class MailboxReceiveOperatorTest {
     String toHost = "toHost";
     VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0);
 
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+
+    // Choose whether to exercise the ordering code path or not
+    List<RexExpression> collationKeys = new ArrayList<>();
+    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+    boolean sortOnReceiver = false;
+    if (_random.nextBoolean()) {
+      collationKeys.add(new RexExpression.InputRef(0));
+      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+      sortOnReceiver = true;
+    }
+
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
-            stageId, DEFAULT_RECEIVER_STAGE_ID, null);
+            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+            null);
 
     // Receive end of stream block directly when there is no match.
     Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
@@ -205,13 +265,26 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
     Mockito.when(_mailbox.isClosed()).thenReturn(true);
 
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+
+    // Choose whether to exercise the ordering code path or not
+    List<RexExpression> collationKeys = new ArrayList<>();
+    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+    boolean sortOnReceiver = false;
+    if (_random.nextBoolean()) {
+      collationKeys.add(new RexExpression.InputRef(0));
+      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+      sortOnReceiver = true;
+    }
+
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
-            stageId, DEFAULT_RECEIVER_STAGE_ID, null);
+            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+            null);
 
     // Receive end of stream block directly when mailbox is close.
     Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
@@ -245,13 +318,27 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_mailbox.isClosed()).thenReturn(false);
     // Receive null mailbox during timeout.
     Mockito.when(_mailbox.receive()).thenReturn(null);
+
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+
+    // Choose whether to exercise the ordering code path or not
+    List<RexExpression> collationKeys = new ArrayList<>();
+    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+    boolean sortOnReceiver = false;
+    if (_random.nextBoolean()) {
+      collationKeys.add(new RexExpression.InputRef(0));
+      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+      sortOnReceiver = true;
+    }
+
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
-            stageId, DEFAULT_RECEIVER_STAGE_ID, null);
+            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+            null);
     // Receive NoOpBlock.
     Assert.assertTrue(receiveOp.nextBlock().isNoOpBlock());
   }
@@ -283,13 +370,27 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
     Mockito.when(_mailbox.isClosed()).thenReturn(false);
     Mockito.when(_mailbox.receive()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+
+    // Choose whether to exercise the ordering code path or not
+    List<RexExpression> collationKeys = new ArrayList<>();
+    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+    boolean sortOnReceiver = false;
+    if (_random.nextBoolean()) {
+      collationKeys.add(new RexExpression.InputRef(0));
+      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+      sortOnReceiver = true;
+    }
+
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
-            stageId, DEFAULT_RECEIVER_STAGE_ID, null);
+            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+            null);
     // Receive EosBloc.
     Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
   }
@@ -322,15 +423,32 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_mailbox.isClosed()).thenReturn(false);
     Object[] expRow = new Object[]{1, 1};
     DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
-    Mockito.when(_mailbox.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow));
+    Mockito.when(_mailbox.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow),
+        TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+
+    // Choose whether to exercise the ordering code path or not
+    List<RexExpression> collationKeys = new ArrayList<>();
+    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+    boolean sortOnReceiver = false;
+    if (_random.nextBoolean()) {
+      collationKeys.add(new RexExpression.InputRef(0));
+      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+      sortOnReceiver = true;
+    }
+
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
-            stageId, DEFAULT_RECEIVER_STAGE_ID, null);
+            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+            null);
     TransferableBlock receivedBlock = receiveOp.nextBlock();
+    while (receivedBlock.isNoOpBlock()) {
+      receivedBlock = receiveOp.nextBlock();
+    }
     List<Object[]> resultRows = receivedBlock.getContainer();
     Assert.assertEquals(resultRows.size(), 1);
     Assert.assertEquals(resultRows.get(0), expRow);
@@ -363,14 +481,27 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
     Mockito.when(_mailbox.isClosed()).thenReturn(false);
     Exception e = new Exception("errorBlock");
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
     Mockito.when(_mailbox.receive()).thenReturn(TransferableBlockUtils.getErrorTransferableBlock(e));
+
+    // Choose whether to exercise the ordering code path or not
+    List<RexExpression> collationKeys = new ArrayList<>();
+    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+    boolean sortOnReceiver = false;
+    if (_random.nextBoolean()) {
+      collationKeys.add(new RexExpression.InputRef(0));
+      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+      sortOnReceiver = true;
+    }
+
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
-            stageId, DEFAULT_RECEIVER_STAGE_ID, null);
+            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+            null);
     TransferableBlock receivedBlock = receiveOp.nextBlock();
     Assert.assertTrue(receivedBlock.isErrorBlock());
     MetadataBlock error = (MetadataBlock) receivedBlock.getDataBlock();
@@ -407,15 +538,31 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_mailbox2.isClosed()).thenReturn(false);
     Object[] expRow = new Object[]{1, 1};
     DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
-    Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow));
+    Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow),
+        TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    // Choose whether to exercise the ordering code path or not
+    List<RexExpression> collationKeys = new ArrayList<>();
+    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+    boolean sortOnReceiver = false;
+    if (_random.nextBoolean()) {
+      collationKeys.add(new RexExpression.InputRef(0));
+      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+      sortOnReceiver = true;
+    }
+
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
-            stageId, DEFAULT_RECEIVER_STAGE_ID, null);
+            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+            null);
     TransferableBlock receivedBlock = receiveOp.nextBlock();
+    while (receivedBlock.isNoOpBlock()) {
+      receivedBlock = receiveOp.nextBlock();
+    }
     List<Object[]> resultRows = receivedBlock.getContainer();
     Assert.assertEquals(resultRows.size(), 1);
     Assert.assertEquals(resultRows.get(0), expRow);
@@ -444,7 +591,7 @@ public class MailboxReceiveOperatorTest {
         new VirtualServerAddress(server1Host, server1Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
     Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
     Mockito.when(_mailbox.isClosed()).thenReturn(false);
-    Mockito.when(_mailbox.receive()).thenReturn(null);
+    Mockito.when(_mailbox.receive()).thenReturn(null, TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     JsonMailboxIdentifier expectedMailboxId2 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
         new VirtualServerAddress(server2Host, server2Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
@@ -452,15 +599,31 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_mailbox2.isClosed()).thenReturn(false);
     Object[] expRow = new Object[]{1, 1};
     DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
-    Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow));
+    Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow),
+        TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    // Choose whether to exercise the ordering code path or not
+    List<RexExpression> collationKeys = new ArrayList<>();
+    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+    boolean sortOnReceiver = false;
+    if (_random.nextBoolean()) {
+      collationKeys.add(new RexExpression.InputRef(0));
+      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+      sortOnReceiver = true;
+    }
+
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
-            stageId, DEFAULT_RECEIVER_STAGE_ID, null);
+            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+            null);
     TransferableBlock receivedBlock = receiveOp.nextBlock();
+    while (receivedBlock.isNoOpBlock()) {
+      receivedBlock = receiveOp.nextBlock();
+    }
     List<Object[]> resultRows = receivedBlock.getContainer();
     Assert.assertEquals(resultRows.size(), 1);
     Assert.assertEquals(resultRows.get(0), expRow);
@@ -508,7 +671,8 @@ public class MailboxReceiveOperatorTest {
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
-            stageId, DEFAULT_RECEIVER_STAGE_ID, null);
+            Collections.emptyList(), Collections.emptyList(), false, false, inSchema, stageId,
+            DEFAULT_RECEIVER_STAGE_ID, null);
     // Receive first block from first server.
     TransferableBlock receivedBlock = receiveOp.nextBlock();
     List<Object[]> resultRows = receivedBlock.getContainer();
@@ -560,13 +724,25 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
     Mockito.when(_mailbox2.isClosed()).thenReturn(false);
     Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3));
+
+    // Choose whether to exercise the ordering code path or not
+    List<RexExpression> collationKeys = new ArrayList<>();
+    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+    boolean sortOnReceiver = false;
+    if (_random.nextBoolean()) {
+      collationKeys.add(new RexExpression.InputRef(0));
+      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+      sortOnReceiver = true;
+    }
+
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
-            stageId, DEFAULT_RECEIVER_STAGE_ID, null);
+            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+            null);
     // Receive error block from first server.
     TransferableBlock receivedBlock = receiveOp.nextBlock();
     Assert.assertTrue(receivedBlock.isErrorBlock());
@@ -606,14 +782,191 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
     Mockito.when(_mailbox2.isClosed()).thenReturn(false);
     Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3));
+
+    // Choose whether to exercise the ordering code path or not
+    List<RexExpression> collationKeys = new ArrayList<>();
+    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+    boolean sortOnReceiver = false;
+    if (_random.nextBoolean()) {
+      collationKeys.add(new RexExpression.InputRef(0));
+      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+      sortOnReceiver = true;
+    }
+
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
-            stageId, DEFAULT_RECEIVER_STAGE_ID, null);
+            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+            null);
     TransferableBlock receivedBlock = receiveOp.nextBlock();
     Assert.assertTrue(receivedBlock.isErrorBlock(), "server-1 should have returned an error-block");
   }
+
+  @Test
+  public void shouldReceiveMailboxFromTwoServersWithCollationKey()
+      throws Exception {
+    String server1Host = "hash1";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(server1Host);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    String server2Host = "hash2";
+    int server2Port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(server2Host);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2Port);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+    VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0);
+
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+    JsonMailboxIdentifier expectedMailboxId1 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+        new VirtualServerAddress(server1Host, server1Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(false);
+    Object[] expRow1 = new Object[]{3, 3};
+    Object[] expRow2 = new Object[]{1, 1};
+    Mockito.when(_mailbox.receive())
+        .thenReturn(OperatorTestUtil.block(inSchema, expRow1), OperatorTestUtil.block(inSchema, expRow2),
+            TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    Object[] expRow3 = new Object[]{4, 2};
+    Object[] expRow4 = new Object[]{2, 4};
+    Object[] expRow5 = new Object[]{-1, 95};
+    JsonMailboxIdentifier expectedMailboxId2 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+        new VirtualServerAddress(server2Host, server2Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
+    Mockito.when(_mailbox2.isClosed()).thenReturn(false);
+    Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3),
+        OperatorTestUtil.block(inSchema, expRow4), OperatorTestUtil.block(inSchema, expRow5),
+        TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    // Setup the collation key and direction
+    List<RexExpression> collationKeys = new ArrayList<>(Collections.singletonList(new RexExpression.InputRef(0)));
+    RelFieldCollation.Direction direction = _random.nextBoolean() ? RelFieldCollation.Direction.ASCENDING
+        : RelFieldCollation.Direction.DESCENDING;
+    List<RelFieldCollation.Direction> collationDirection = new ArrayList<>(Collections.singletonList(direction));
+
+    OpChainExecutionContext context =
+        new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
+            Long.MAX_VALUE, new HashMap<>());
+
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirection, false, true, inSchema, stageId,
+        DEFAULT_RECEIVER_STAGE_ID, null);
+
+    // Receive a set of no-op blocks and skip over them
+    TransferableBlock receivedBlock = receiveOp.nextBlock();
+    while (receivedBlock.isNoOpBlock()) {
+      receivedBlock = receiveOp.nextBlock();
+    }
+    List<Object[]> resultRows = receivedBlock.getContainer();
+    // All blocks should be returned together since ordering was required
+    Assert.assertEquals(resultRows.size(), 5);
+    if (direction == RelFieldCollation.Direction.ASCENDING) {
+      Assert.assertEquals(resultRows.get(0), expRow5);
+      Assert.assertEquals(resultRows.get(1), expRow2);
+      Assert.assertEquals(resultRows.get(2), expRow4);
+      Assert.assertEquals(resultRows.get(3), expRow1);
+      Assert.assertEquals(resultRows.get(4), expRow3);
+    } else {
+      Assert.assertEquals(resultRows.get(0), expRow3);
+      Assert.assertEquals(resultRows.get(1), expRow1);
+      Assert.assertEquals(resultRows.get(2), expRow4);
+      Assert.assertEquals(resultRows.get(3), expRow2);
+      Assert.assertEquals(resultRows.get(4), expRow5);
+    }
+
+    receivedBlock = receiveOp.nextBlock();
+    Assert.assertTrue(receivedBlock.isEndOfStreamBlock());
+  }
+
+  @Test
+  public void shouldReceiveMailboxFromTwoServersWithCollationKeyTwoColumns()
+      throws Exception {
+    String server1Host = "hash1";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(server1Host);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    String server2Host = "hash2";
+    int server2Port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(server2Host);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2Port);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+    VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0);
+
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2", "col3"},
+        new DataSchema.ColumnDataType[]{INT, INT, STRING});
+    JsonMailboxIdentifier expectedMailboxId1 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+        new VirtualServerAddress(server1Host, server1Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(false);
+    Object[] expRow1 = new Object[]{3, 3, "queen"};
+    Object[] expRow2 = new Object[]{1, 1, "pink floyd"};
+    Mockito.when(_mailbox.receive())
+        .thenReturn(OperatorTestUtil.block(inSchema, expRow1), OperatorTestUtil.block(inSchema, expRow2),
+            TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    Object[] expRow3 = new Object[]{42, 2, "pink floyd"};
+    Object[] expRow4 = new Object[]{2, 4, "aerosmith"};
+    Object[] expRow5 = new Object[]{-1, 95, "foo fighters"};
+    JsonMailboxIdentifier expectedMailboxId2 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+        new VirtualServerAddress(server2Host, server2Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
+    Mockito.when(_mailbox2.isClosed()).thenReturn(false);
+    Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3),
+        OperatorTestUtil.block(inSchema, expRow4), OperatorTestUtil.block(inSchema, expRow5),
+        TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    // Setup the collation key and direction
+    List<RexExpression> collationKeys = new ArrayList<>(Arrays.asList(new RexExpression.InputRef(2),
+        new RexExpression.InputRef(0)));
+    RelFieldCollation.Direction direction1 = _random.nextBoolean() ? RelFieldCollation.Direction.ASCENDING
+        : RelFieldCollation.Direction.DESCENDING;
+    RelFieldCollation.Direction direction2 = RelFieldCollation.Direction.ASCENDING;
+    List<RelFieldCollation.Direction> collationDirection = new ArrayList<>(Arrays.asList(direction1, direction2));
+
+    OpChainExecutionContext context =
+        new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
+            Long.MAX_VALUE, new HashMap<>());
+
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirection, false, true, inSchema, stageId,
+        DEFAULT_RECEIVER_STAGE_ID, null);
+
+    // Receive a set of no-op blocks and skip over them
+    TransferableBlock receivedBlock = receiveOp.nextBlock();
+    while (receivedBlock.isNoOpBlock()) {
+      receivedBlock = receiveOp.nextBlock();
+    }
+    List<Object[]> resultRows = receivedBlock.getContainer();
+    // All blocks should be returned together since ordering was required
+    Assert.assertEquals(resultRows.size(), 5);
+    if (direction1 == RelFieldCollation.Direction.ASCENDING) {
+      Assert.assertEquals(resultRows.get(0), expRow4);
+      Assert.assertEquals(resultRows.get(1), expRow5);
+      Assert.assertEquals(resultRows.get(2), expRow2);
+      Assert.assertEquals(resultRows.get(3), expRow3);
+      Assert.assertEquals(resultRows.get(4), expRow1);
+    } else {
+      Assert.assertEquals(resultRows.get(0), expRow1);
+      Assert.assertEquals(resultRows.get(1), expRow2);
+      Assert.assertEquals(resultRows.get(2), expRow3);
+      Assert.assertEquals(resultRows.get(3), expRow5);
+      Assert.assertEquals(resultRows.get(4), expRow4);
+    }
+
+    receivedBlock = receiveOp.nextBlock();
+    Assert.assertTrue(receivedBlock.isEndOfStreamBlock());
+  }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 13735ed883..72d5ba66b8 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -90,7 +90,7 @@ public class MailboxSendOperatorTest {
     OpChainExecutionContext context = getOpChainContext(deadlineMs);
 
     MailboxSendOperator operator =
-        new MailboxSendOperator(context, _input, RelDistribution.Type.HASH_DISTRIBUTED, _selector,
+        new MailboxSendOperator(context, _input, RelDistribution.Type.HASH_DISTRIBUTED, _selector, null, null, false,
             server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID,
                 DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, DEFAULT_RECEIVER_STAGE_ID);
 
@@ -112,7 +112,7 @@ public class MailboxSendOperatorTest {
     OpChainExecutionContext context = getOpChainContext(deadlineMs);
 
     MailboxSendOperator operator =
-        new MailboxSendOperator(context, _input, RelDistribution.Type.HASH_DISTRIBUTED, _selector,
+        new MailboxSendOperator(context, _input, RelDistribution.Type.HASH_DISTRIBUTED, _selector, null, null, false,
             server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID,
                 DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, DEFAULT_RECEIVER_STAGE_ID);
     TransferableBlock errorBlock = TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!"));
@@ -134,7 +134,7 @@ public class MailboxSendOperatorTest {
     OpChainExecutionContext context = getOpChainContext(deadlineMs);
 
     MailboxSendOperator operator =
-        new MailboxSendOperator(context, _input, RelDistribution.Type.HASH_DISTRIBUTED, _selector,
+        new MailboxSendOperator(context, _input, RelDistribution.Type.HASH_DISTRIBUTED, _selector, null, null, false,
             server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID,
                 DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, DEFAULT_RECEIVER_STAGE_ID);
     Mockito.when(_input.nextBlock()).thenThrow(new RuntimeException("foo!"));
@@ -157,7 +157,7 @@ public class MailboxSendOperatorTest {
     OpChainExecutionContext context = getOpChainContext(deadlineMs);
 
     MailboxSendOperator operator =
-        new MailboxSendOperator(context, _input, RelDistribution.Type.HASH_DISTRIBUTED, _selector,
+        new MailboxSendOperator(context, _input, RelDistribution.Type.HASH_DISTRIBUTED, _selector, null, null, false,
             server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID,
                 DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, DEFAULT_RECEIVER_STAGE_ID);
 
@@ -180,7 +180,7 @@ public class MailboxSendOperatorTest {
     OpChainExecutionContext context = getOpChainContext(deadlineMs);
 
     MailboxSendOperator operator =
-        new MailboxSendOperator(context, _input, RelDistribution.Type.HASH_DISTRIBUTED, _selector,
+        new MailboxSendOperator(context, _input, RelDistribution.Type.HASH_DISTRIBUTED, _selector, null, null, false,
             server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID,
                 DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, DEFAULT_RECEIVER_STAGE_ID);
     TransferableBlock dataBlock = block(new DataSchema(new String[]{}, new DataSchema.ColumnDataType[]{}));
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
index 11e525221d..f2edadeae5 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
@@ -70,7 +70,7 @@ public class SortOperatorTest {
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
 
     Mockito.when(_input.nextBlock())
         .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!")));
@@ -89,7 +89,7 @@ public class SortOperatorTest {
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
 
     Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
 
@@ -107,7 +107,7 @@ public class SortOperatorTest {
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
 
     Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
@@ -125,7 +125,7 @@ public class SortOperatorTest {
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
 
     Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -141,6 +141,31 @@ public class SortOperatorTest {
     Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
   }
 
+  @Test
+  public void shouldConsumeAndSkipSortInputOneBlockWithTwoRowsInputSorted() {
+    // Given:
+    List<RexExpression> collation = collation(0);
+    List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+    DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
+    SortOperator op = new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0,
+        schema, true);
+
+    // Purposefully setting input as unsorted order for validation but 'isInputSorted' should only be true if actually
+    // sorted
+    Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    // When:
+    TransferableBlock block = op.nextBlock(); // construct
+    TransferableBlock block2 = op.nextBlock(); // eos
+
+    // Then:
+    Assert.assertEquals(block.getNumRows(), 2);
+    Assert.assertEquals(block.getContainer().get(0), new Object[]{2});
+    Assert.assertEquals(block.getContainer().get(1), new Object[]{1});
+    Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
+  }
+
   @Test
   public void shouldConsumeAndSortOnNonZeroIdxCollation() {
     // Given:
@@ -148,7 +173,7 @@ public class SortOperatorTest {
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"ignored", "sort"}, new DataSchema.ColumnDataType[]{INT, INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
 
     Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{1, 2}, new Object[]{2, 1}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -171,7 +196,7 @@ public class SortOperatorTest {
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{STRING});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
 
     Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{"b"}, new Object[]{"a"}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -194,7 +219,7 @@ public class SortOperatorTest {
     List<Direction> directions = ImmutableList.of(Direction.DESCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
 
     Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -217,7 +242,7 @@ public class SortOperatorTest {
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 1, schema);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 1, schema, false);
 
     Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -233,6 +258,30 @@ public class SortOperatorTest {
     Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
   }
 
+  @Test
+  public void shouldOffsetSortInputOneBlockWithThreeRowsInputSorted() {
+    // Given:
+    List<RexExpression> collation = collation(0);
+    List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+    DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
+    SortOperator op = new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 1,
+        schema, true);
+
+    // Set input rows as sorted since input is expected to be sorted
+    Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{1}, new Object[]{2}, new Object[]{3}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    // When:
+    TransferableBlock block = op.nextBlock(); // construct
+    TransferableBlock block2 = op.nextBlock(); // eos
+
+    // Then:
+    Assert.assertEquals(block.getNumRows(), 2);
+    Assert.assertEquals(block.getContainer().get(0), new Object[]{2});
+    Assert.assertEquals(block.getContainer().get(1), new Object[]{3});
+    Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
+  }
+
   @Test
   public void shouldOffsetLimitSortInputOneBlockWithThreeRows() {
     // Given:
@@ -240,7 +289,7 @@ public class SortOperatorTest {
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 1, 1, schema);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 1, 1, schema, false);
 
     Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -255,6 +304,29 @@ public class SortOperatorTest {
     Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
   }
 
+  @Test
+  public void shouldOffsetLimitSortInputOneBlockWithThreeRowsInputSorted() {
+    // Given:
+    List<RexExpression> collation = collation(0);
+    List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+    DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
+    SortOperator op = new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 1, 1,
+        schema, true);
+
+    // Set input rows as sorted since input is expected to be sorted
+    Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{1}, new Object[]{2}, new Object[]{3}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    // When:
+    TransferableBlock block = op.nextBlock(); // construct
+    TransferableBlock block2 = op.nextBlock(); // eos
+
+    // Then:
+    Assert.assertEquals(block.getNumRows(), 1);
+    Assert.assertEquals(block.getContainer().get(0), new Object[]{2});
+    Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
+  }
+
   @Test
   public void shouldRespectDefaultLimit() {
     // Given:
@@ -262,7 +334,7 @@ public class SortOperatorTest {
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 0, 0, schema, 1);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 0, 0, schema, false, 1);
 
     Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -284,7 +356,7 @@ public class SortOperatorTest {
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, -1, 0, schema);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, -1, 0, schema, false);
 
     Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -305,7 +377,7 @@ public class SortOperatorTest {
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
 
     Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}))
         .thenReturn(block(schema, new Object[]{1}))
@@ -322,6 +394,31 @@ public class SortOperatorTest {
     Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
   }
 
+  @Test
+  public void shouldConsumeAndSortTwoInputBlocksWithOneRowEachInputSorted() {
+    // Given:
+    List<RexExpression> collation = collation(0);
+    List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+    DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
+    SortOperator op = new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0,
+        schema, true);
+
+    // Set input rows as sorted since input is expected to be sorted
+    Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{1}))
+        .thenReturn(block(schema, new Object[]{2}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    // When:
+    TransferableBlock block = op.nextBlock(); // construct
+    TransferableBlock block2 = op.nextBlock(); // eos
+
+    // Then:
+    Assert.assertEquals(block.getNumRows(), 2);
+    Assert.assertEquals(block.getContainer().get(0), new Object[]{1});
+    Assert.assertEquals(block.getContainer().get(1), new Object[]{2});
+    Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
+  }
+
   @Test
   public void shouldBreakTiesUsingSecondCollationKey() {
     // Given:
@@ -329,7 +426,7 @@ public class SortOperatorTest {
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING, Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"first", "second"}, new DataSchema.ColumnDataType[]{INT, INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
 
     Mockito.when(_input.nextBlock())
         .thenReturn(block(schema, new Object[]{1, 2}, new Object[]{1, 1}, new Object[]{1, 3}))
@@ -354,7 +451,7 @@ public class SortOperatorTest {
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING, Direction.DESCENDING);
     DataSchema schema = new DataSchema(new String[]{"first", "second"}, new DataSchema.ColumnDataType[]{INT, INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
 
     Mockito.when(_input.nextBlock())
         .thenReturn(block(schema, new Object[]{1, 2}, new Object[]{1, 1}, new Object[]{1, 3}))
@@ -379,7 +476,7 @@ public class SortOperatorTest {
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
 
     Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}))
         .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()).thenReturn(block(schema, new Object[]{1}))
@@ -397,6 +494,32 @@ public class SortOperatorTest {
     Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
   }
 
+  @Test
+  public void shouldHandleNoOpUpstreamBlockWhileConstructingInputSorted() {
+    // Given:
+    List<RexExpression> collation = collation(0);
+    List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+    DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
+    SortOperator op = new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0,
+        schema, true);
+
+    // Set input rows as sorted since input is expected to be sorted
+    Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{1}))
+        .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()).thenReturn(block(schema, new Object[]{2}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    // When:
+    op.nextBlock(); // consume up until NOOP, create NOOP
+    TransferableBlock block = op.nextBlock(); // construct
+    TransferableBlock block2 = op.nextBlock(); // eos
+
+    // Then:
+    Assert.assertEquals(block.getNumRows(), 2);
+    Assert.assertEquals(block.getContainer().get(0), new Object[]{1});
+    Assert.assertEquals(block.getContainer().get(1), new Object[]{2});
+    Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
+  }
+
   private static List<RexExpression> collation(int... indexes) {
     return Arrays.stream(indexes).mapToObj(RexExpression.InputRef::new).collect(Collectors.toList());
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org