You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/04/07 17:47:15 UTC

[pinot] branch master updated: [multistage] Split MailboxReceiveOperator into sorted and non-sorted versions (#10570)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6283ee7fb8 [multistage] Split MailboxReceiveOperator into sorted and non-sorted versions (#10570)
6283ee7fb8 is described below

commit 6283ee7fb8e583166966f4f5a520310f135c50d3
Author: Sonam Mandal <so...@linkedin.com>
AuthorDate: Fri Apr 7 10:47:08 2023 -0700

    [multistage] Split MailboxReceiveOperator into sorted and non-sorted versions (#10570)
    
    * Split MailboxReceiveOperator into sorted and non-sorted versions
      - sorted mailbox receiver will keep polling for data from all the mailboxes until it gets only 'null' blocks or gets EOS from all mailboxes.
      - non-sorted mailbox receiver will return immediately when data arrives (behavior identical to prior to #10408
    * Fix tests - ORDER BY with only limit should set isSortOnReceiver to false
      - when there's fetch/offset in SortNode but no Collation, skip sorting thus speed up performance
---
 .../rel/rules/PinotSortExchangeNodeInsertRule.java |   2 +-
 .../test/resources/queries/BasicQueryPlans.json    |   4 +-
 .../resources/queries/WindowFunctionPlans.json     |  20 +-
 .../operator/BaseMailboxReceiveOperator.java       | 136 ++++++++
 .../runtime/operator/MailboxReceiveOperator.java   | 176 +---------
 .../pinot/query/runtime/operator/SortOperator.java |   4 +-
 .../operator/SortedMailboxReceiveOperator.java     | 200 ++++++++++++
 .../query/runtime/operator/utils/SortUtils.java    |  13 +-
 .../query/runtime/plan/PhysicalPlanVisitor.java    |  24 +-
 .../query/service/dispatch/QueryDispatcher.java    |   4 +-
 .../operator/MailboxReceiveOperatorTest.java       | 361 +--------------------
 ....java => SortedMailboxReceiveOperatorTest.java} | 332 ++++++++-----------
 12 files changed, 541 insertions(+), 735 deletions(-)

diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
index 28a5018a08..56d08fd9ed 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
@@ -70,7 +70,7 @@ public class PinotSortExchangeNodeInsertRule extends RelOptRule {
         RelDistributions.hash(Collections.emptyList()),
         sort.getCollation(),
         false,
-        true);
+        !sort.getCollation().getKeys().isEmpty());
     call.transformTo(LogicalSort.create(exchange, sort.getCollation(), sort.offset, sort.fetch));
   }
 }
diff --git a/pinot-query-planner/src/test/resources/queries/BasicQueryPlans.json b/pinot-query-planner/src/test/resources/queries/BasicQueryPlans.json
index f90469bc5e..22a6946d00 100644
--- a/pinot-query-planner/src/test/resources/queries/BasicQueryPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/BasicQueryPlans.json
@@ -39,7 +39,7 @@
           "Execution Plan",
           "\nLogicalProject(EXPR$0=[dateTrunc('DAY', $4)])",
           "\n  LogicalSort(offset=[0], fetch=[10])",
-          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
           "\n      LogicalSort(fetch=[10])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
@@ -52,7 +52,7 @@
           "Execution Plan",
           "\nLogicalProject(day=[dateTrunc('DAY', $4)])",
           "\n  LogicalSort(offset=[0], fetch=[10])",
-          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
           "\n      LogicalSort(fetch=[10])",
           "\n        LogicalTableScan(table=[[a]])",
           "\n"
diff --git a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
index f2cb699f19..ee277e54b0 100644
--- a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
@@ -77,7 +77,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(offset=[0], fetch=[10])",
-          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
           "\n    LogicalSort(fetch=[10])",
           "\n      LogicalProject(col1=[$0], $1=[$2])",
           "\n        LogicalWindow(window#0=[window(aggs [SUM($1)])])",
@@ -300,7 +300,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(offset=[0], fetch=[100])",
-          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
           "\n    LogicalSort(fetch=[100])",
           "\n      LogicalProject(col1=[$0], $1=[$3], $2=[$4])",
           "\n        LogicalWindow(window#0=[window(aggs [SUM($2), COUNT($1)])])",
@@ -536,7 +536,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(offset=[0], fetch=[10])",
-          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
           "\n    LogicalSort(fetch=[10])",
           "\n      LogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n        LogicalWindow(window#0=[window(partition {1} aggs [SUM($2), COUNT($2)])])",
@@ -785,7 +785,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(offset=[0], fetch=[10])",
-          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
           "\n    LogicalSort(fetch=[10])",
           "\n      LogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
           "\n        LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1), MIN($1)])])",
@@ -1097,7 +1097,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(offset=[0], fetch=[10])",
-          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
           "\n    LogicalSort(fetch=[10])",
           "\n      LogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n        LogicalWindow(window#0=[window(order by [1] aggs [SUM($2), COUNT($2)])])",
@@ -1265,7 +1265,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(offset=[0], fetch=[10])",
-          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
           "\n    LogicalSort(fetch=[10])",
           "\n      LogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
           "\n        LogicalWindow(window#0=[window(order by [0] aggs [SUM($1), COUNT($1), MIN($1)])])",
@@ -1433,7 +1433,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(offset=[0], fetch=[10])",
-          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
           "\n    LogicalSort(fetch=[10])",
           "\n      LogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n        LogicalWindow(window#0=[window(partition {1} order by [1] aggs [SUM($2), COUNT($2)])])",
@@ -1640,7 +1640,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(offset=[0], fetch=[10])",
-          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
           "\n    LogicalSort(fetch=[10])",
           "\n      LogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
           "\n        LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1), COUNT($1), MIN($1)])])",
@@ -1846,7 +1846,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(offset=[0], fetch=[10])",
-          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
           "\n    LogicalSort(fetch=[10])",
           "\n      LogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
           "\n        LogicalWindow(window#0=[window(partition {1} order by [0] aggs [SUM($2), COUNT($2)])])",
@@ -2014,7 +2014,7 @@
         "output": [
           "Execution Plan",
           "\nLogicalSort(offset=[0], fetch=[10])",
-          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
           "\n    LogicalSort(fetch=[10])",
           "\n      LogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)], EXPR$2=[$5])",
           "\n        LogicalWindow(window#0=[window(partition {0} order by [1] aggs [SUM($2), COUNT($2), MIN($2)])])",
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
new file mode 100644
index 0000000000..0237c11393
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.routing.VirtualServer;
+import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.service.QueryConfig;
+
+
+/**
+ * Base class to be used by the various MailboxReceiveOperators such as the sorted and non-sorted versions. This
+ * class contains the common logic needed for MailboxReceive
+ *
+ * BaseMailboxReceiveOperator receives mailbox from mailboxService from sendingStageInstances.
+ * We use sendingStageInstance to deduce mailboxId and fetch the content from mailboxService.
+ * When exchangeType is Singleton, we find the mapping mailbox for the mailboxService. If not found, use empty list.
+ * When exchangeType is non-Singleton, we pull from each instance in round-robin way to get matched mailbox content.
+ */
+public abstract class BaseMailboxReceiveOperator extends MultiStageOperator {
+
+  // TODO: Unify SUPPORTED_EXCHANGE_TYPES with MailboxSendOperator.
+  protected static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPES =
+      ImmutableSet.of(RelDistribution.Type.BROADCAST_DISTRIBUTED, RelDistribution.Type.HASH_DISTRIBUTED,
+          RelDistribution.Type.SINGLETON, RelDistribution.Type.RANDOM_DISTRIBUTED);
+
+  protected final MailboxService<TransferableBlock> _mailboxService;
+  protected final RelDistribution.Type _exchangeType;
+  protected final List<MailboxIdentifier> _sendingMailbox;
+  protected final long _deadlineTimestampNano;
+  protected int _serverIdx;
+  protected TransferableBlock _upstreamErrorBlock;
+
+  protected static MailboxIdentifier toMailboxId(VirtualServer sender, long jobId, int senderStageId,
+      int receiverStageId, VirtualServerAddress receiver) {
+    return new JsonMailboxIdentifier(
+        String.format("%s_%s", jobId, senderStageId),
+        new VirtualServerAddress(sender),
+        receiver,
+        senderStageId,
+        receiverStageId);
+  }
+
+  public BaseMailboxReceiveOperator(OpChainExecutionContext context, List<VirtualServer> sendingStageInstances,
+      RelDistribution.Type exchangeType, int senderStageId, int receiverStageId, Long timeoutMs) {
+    super(context);
+    _mailboxService = context.getMailboxService();
+    VirtualServerAddress receiver = context.getServer();
+    long jobId = context.getRequestId();
+    Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(exchangeType),
+        "Exchange/Distribution type: " + exchangeType + " is not supported!");
+    long timeoutNano = (timeoutMs != null ? timeoutMs : QueryConfig.DEFAULT_MAILBOX_TIMEOUT_MS) * 1_000_000L;
+    _deadlineTimestampNano = timeoutNano + System.nanoTime();
+
+    _exchangeType = exchangeType;
+    if (_exchangeType == RelDistribution.Type.SINGLETON) {
+      VirtualServer singletonInstance = null;
+      for (VirtualServer serverInstance : sendingStageInstances) {
+        if (serverInstance.getHostname().equals(_mailboxService.getHostname())
+            && serverInstance.getQueryMailboxPort() == _mailboxService.getMailboxPort()) {
+          Preconditions.checkState(singletonInstance == null, "multiple instance found for singleton exchange type!");
+          singletonInstance = serverInstance;
+        }
+      }
+
+      if (singletonInstance == null) {
+        // TODO: fix WorkerManager assignment, this should not happen if we properly assign workers.
+        // see: https://github.com/apache/pinot/issues/9611
+        _sendingMailbox = Collections.emptyList();
+      } else {
+        _sendingMailbox =
+            Collections.singletonList(toMailboxId(singletonInstance, jobId, senderStageId, receiverStageId, receiver));
+      }
+    } else {
+      _sendingMailbox = new ArrayList<>(sendingStageInstances.size());
+      for (VirtualServer instance : sendingStageInstances) {
+        _sendingMailbox.add(toMailboxId(instance, jobId, senderStageId, receiverStageId, receiver));
+      }
+    }
+    _upstreamErrorBlock = null;
+    _serverIdx = 0;
+  }
+
+  public List<MailboxIdentifier> getSendingMailbox() {
+    return _sendingMailbox;
+  }
+
+  @Override
+  public List<MultiStageOperator> getChildOperators() {
+    return ImmutableList.of();
+  }
+
+  @Override
+  public void close() {
+    super.close();
+    for (MailboxIdentifier sendingMailbox : _sendingMailbox) {
+      _mailboxService.releaseReceivingMailbox(sendingMailbox);
+    }
+  }
+
+  @Override
+  public void cancel(Throwable t) {
+    super.cancel(t);
+    for (MailboxIdentifier sendingMailbox : _sendingMailbox) {
+      _mailboxService.releaseReceivingMailbox(sendingMailbox);
+    }
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index f65f9b8d15..b2e70c1dcf 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -18,35 +18,17 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
-import org.apache.calcite.rel.RelFieldCollation;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.ReceivingMailbox;
-import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.routing.VirtualServer;
-import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
-import org.apache.pinot.query.runtime.operator.utils.SortUtils;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.query.service.QueryConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,119 +36,22 @@ import org.slf4j.LoggerFactory;
 /**
  * This {@code MailboxReceiveOperator} receives data from a {@link ReceivingMailbox} and serve it out from the
  * {@link MultiStageOperator#getNextBlock()}()} API.
- *
- *  MailboxReceiveOperator receives mailbox from mailboxService from sendingStageInstances.
- *  We use sendingStageInstance to deduce mailboxId and fetch the content from mailboxService.
- *  When exchangeType is Singleton, we find the mapping mailbox for the mailboxService. If not found, use empty list.
- *  When exchangeType is non-Singleton, we pull from each instance in round-robin way to get matched mailbox content.
- *
- *  TODO: Once sorting on the {@code MailboxSendOperator} is available, modify this to use a k-way merge instead of
- *        resorting via the PriorityQueue.
  */
-public class MailboxReceiveOperator extends MultiStageOperator {
+public class MailboxReceiveOperator extends BaseMailboxReceiveOperator {
   private static final Logger LOGGER = LoggerFactory.getLogger(MailboxReceiveOperator.class);
   private static final String EXPLAIN_NAME = "MAILBOX_RECEIVE";
 
-  // TODO: Unify SUPPORTED_EXCHANGE_TYPES with MailboxSendOperator.
-  private static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPES =
-      ImmutableSet.of(RelDistribution.Type.BROADCAST_DISTRIBUTED, RelDistribution.Type.HASH_DISTRIBUTED,
-          RelDistribution.Type.SINGLETON, RelDistribution.Type.RANDOM_DISTRIBUTED);
-
-  private final MailboxService<TransferableBlock> _mailboxService;
-  private final RelDistribution.Type _exchangeType;
-  private final List<RexExpression> _collationKeys;
-  private final List<RelFieldCollation.Direction> _collationDirections;
-  private final boolean _isSortOnSender;
-  private final boolean _isSortOnReceiver;
-  private final DataSchema _dataSchema;
-  private final List<MailboxIdentifier> _sendingMailbox;
-  private final long _deadlineTimestampNano;
-  private final PriorityQueue<Object[]> _priorityQueue;
-  private int _serverIdx;
-  private TransferableBlock _upstreamErrorBlock;
-  private boolean _isSortedBlockConstructed;
-
-  private static MailboxIdentifier toMailboxId(VirtualServer sender, long jobId, int senderStageId,
-      int receiverStageId, VirtualServerAddress receiver) {
-    return new JsonMailboxIdentifier(
-        String.format("%s_%s", jobId, senderStageId),
-        new VirtualServerAddress(sender),
-        receiver,
-        senderStageId,
-        receiverStageId);
-  }
-
-  public MailboxReceiveOperator(OpChainExecutionContext context, RelDistribution.Type exchangeType,
-      List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections, boolean isSortOnSender,
-      boolean isSortOnReceiver, DataSchema dataSchema, int senderStageId, int receiverStageId) {
-    this(context, context.getMetadataMap().get(senderStageId).getServerInstances(), exchangeType, collationKeys,
-        collationDirections, isSortOnSender, isSortOnReceiver, dataSchema, senderStageId,
+  public MailboxReceiveOperator(OpChainExecutionContext context, RelDistribution.Type exchangeType, int senderStageId,
+      int receiverStageId) {
+    this(context, context.getMetadataMap().get(senderStageId).getServerInstances(), exchangeType, senderStageId,
         receiverStageId, context.getTimeoutMs());
   }
 
   // TODO: Move deadlineInNanoSeconds to OperatorContext.
   // TODO: Remove boxed timeoutMs value from here and use long deadlineMs from context.
   public MailboxReceiveOperator(OpChainExecutionContext context, List<VirtualServer> sendingStageInstances,
-      RelDistribution.Type exchangeType, List<RexExpression> collationKeys,
-      List<RelFieldCollation.Direction> collationDirections, boolean isSortOnSender, boolean isSortOnReceiver,
-      DataSchema dataSchema, int senderStageId, int receiverStageId, Long timeoutMs) {
-    super(context);
-    _mailboxService = context.getMailboxService();
-    VirtualServerAddress receiver = context.getServer();
-    long jobId = context.getRequestId();
-    Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(exchangeType),
-        "Exchange/Distribution type: " + exchangeType + " is not supported!");
-    long timeoutNano = (timeoutMs != null ? timeoutMs : QueryConfig.DEFAULT_MAILBOX_TIMEOUT_MS) * 1_000_000L;
-    _deadlineTimestampNano = timeoutNano + System.nanoTime();
-
-    _exchangeType = exchangeType;
-    if (_exchangeType == RelDistribution.Type.SINGLETON) {
-      VirtualServer singletonInstance = null;
-      for (VirtualServer serverInstance : sendingStageInstances) {
-        if (serverInstance.getHostname().equals(_mailboxService.getHostname())
-            && serverInstance.getQueryMailboxPort() == _mailboxService.getMailboxPort()) {
-          Preconditions.checkState(singletonInstance == null, "multiple instance found for singleton exchange type!");
-          singletonInstance = serverInstance;
-        }
-      }
-
-      if (singletonInstance == null) {
-        // TODO: fix WorkerManager assignment, this should not happen if we properly assign workers.
-        // see: https://github.com/apache/pinot/issues/9611
-        _sendingMailbox = Collections.emptyList();
-      } else {
-        _sendingMailbox =
-            Collections.singletonList(toMailboxId(singletonInstance, jobId, senderStageId, receiverStageId, receiver));
-      }
-    } else {
-      _sendingMailbox = new ArrayList<>(sendingStageInstances.size());
-      for (VirtualServer instance : sendingStageInstances) {
-        _sendingMailbox.add(toMailboxId(instance, jobId, senderStageId, receiverStageId, receiver));
-      }
-    }
-    _collationKeys = collationKeys;
-    _collationDirections = collationDirections;
-    _isSortOnSender = isSortOnSender;
-    _isSortOnReceiver = isSortOnReceiver;
-    _dataSchema = dataSchema;
-    if (CollectionUtils.isEmpty(collationKeys) || !_isSortOnReceiver) {
-      _priorityQueue = null;
-    } else {
-      _priorityQueue = new PriorityQueue<>(new SortUtils.SortComparator(collationKeys, collationDirections,
-          dataSchema, false));
-    }
-    _upstreamErrorBlock = null;
-    _serverIdx = 0;
-    _isSortedBlockConstructed = false;
-  }
-
-  public List<MailboxIdentifier> getSendingMailbox() {
-    return _sendingMailbox;
-  }
-
-  @Override
-  public List<MultiStageOperator> getChildOperators() {
-    return ImmutableList.of();
+      RelDistribution.Type exchangeType, int senderStageId, int receiverStageId, Long timeoutMs) {
+    super(context, sendingStageInstances, exchangeType, senderStageId, receiverStageId, timeoutMs);
   }
 
   @Nullable
@@ -178,7 +63,6 @@ public class MailboxReceiveOperator extends MultiStageOperator {
   @Override
   protected TransferableBlock getNextBlock() {
     if (_upstreamErrorBlock != null) {
-      cleanUpResourcesOnError();
       return _upstreamErrorBlock;
     } else if (System.nanoTime() >= _deadlineTimestampNano) {
       return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
@@ -201,20 +85,12 @@ public class MailboxReceiveOperator extends MultiStageOperator {
           // Get null block when pulling times out from mailbox.
           if (block != null) {
             if (block.isErrorBlock()) {
-              cleanUpResourcesOnError();
               _upstreamErrorBlock =
                   TransferableBlockUtils.getErrorTransferableBlock(block.getDataBlock().getExceptions());
               return _upstreamErrorBlock;
             }
             if (!block.isEndOfStreamBlock()) {
-              if (_priorityQueue != null) {
-                // Ordering is enabled, add rows to the PriorityQueue
-                List<Object[]> container = block.getContainer();
-                _priorityQueue.addAll(container);
-              } else {
-                // Ordering is not enabled, return the input block as is
-                return block;
-              }
+              return block;
             } else {
               if (_opChainStats != null && !block.getResultMetadata().isEmpty()) {
                 for (Map.Entry<String, OperatorStats> entry : block.getResultMetadata().entrySet()) {
@@ -231,18 +107,6 @@ public class MailboxReceiveOperator extends MultiStageOperator {
       }
     }
 
-    if (((openMailboxCount == 0) || (openMailboxCount <= eosMailboxCount))
-        && (!CollectionUtils.isEmpty(_priorityQueue)) && !_isSortedBlockConstructed) {
-      // Some data is present in the PriorityQueue, these need to be sent upstream
-      LinkedList<Object[]> rows = new LinkedList<>();
-      while (_priorityQueue.size() > 0) {
-        Object[] row = _priorityQueue.poll();
-        rows.addFirst(row);
-      }
-      _isSortedBlockConstructed = true;
-      return new TransferableBlock(rows, _dataSchema, DataBlock.Type.ROW);
-    }
-
     // there are two conditions in which we should return EOS: (1) there were
     // no mailboxes to open (this shouldn't happen because the second condition
     // should be hit first, but is defensive) (2) every mailbox that was opened
@@ -253,30 +117,4 @@ public class MailboxReceiveOperator extends MultiStageOperator {
             : TransferableBlockUtils.getEndOfStreamTransferableBlock();
     return block;
   }
-
-  private void cleanUpResourcesOnError() {
-    if (_priorityQueue != null) {
-      _priorityQueue.clear();
-    }
-  }
-
-  public boolean hasCollationKeys() {
-    return !CollectionUtils.isEmpty(_collationKeys);
-  }
-
-  @Override
-  public void close() {
-    super.close();
-    for (MailboxIdentifier sendingMailbox : _sendingMailbox) {
-      _mailboxService.releaseReceivingMailbox(sendingMailbox);
-    }
-  }
-
-  @Override
-  public void cancel(Throwable t) {
-    super.cancel(t);
-    for (MailboxIdentifier sendingMailbox : _sendingMailbox) {
-      _mailboxService.releaseReceivingMailbox(sendingMailbox);
-    }
-  }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 7668dc0397..1350242508 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -80,8 +80,10 @@ public class SortOperator extends MultiStageOperator {
       _priorityQueue = null;
       _rows = new ArrayList<>();
     } else {
+      // Use the opposite direction as specified by the collation directions since we need the PriorityQueue to decide
+      // which elements to keep and which to remove based on the limits.
       _priorityQueue = new PriorityQueue<>(_numRowsToKeep,
-          new SortUtils.SortComparator(collationKeys, collationDirections, dataSchema, false));
+          new SortUtils.SortComparator(collationKeys, collationDirections, dataSchema, false, true));
       _rows = null;
     }
   }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
new file mode 100644
index 0000000000..b565ed2d4c
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import javax.annotation.Nullable;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.routing.VirtualServer;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.utils.SortUtils;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This {@code SortedMailboxReceiveOperator} receives data from a {@link ReceivingMailbox} and serve it out from the
+ * {@link MultiStageOperator#getNextBlock()}()} API in a sorted manner.
+ *
+ *  TODO: Once sorting on the {@code MailboxSendOperator} is available, modify this to use a k-way merge instead of
+ *        resorting via the PriorityQueue.
+ */
+public class SortedMailboxReceiveOperator extends BaseMailboxReceiveOperator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SortedMailboxReceiveOperator.class);
+  private static final String EXPLAIN_NAME = "SORTED_MAILBOX_RECEIVE";
+
+  private final List<RexExpression> _collationKeys;
+  private final List<RelFieldCollation.Direction> _collationDirections;
+  private final boolean _isSortOnSender;
+  private final boolean _isSortOnReceiver;
+  private final DataSchema _dataSchema;
+  private final PriorityQueue<Object[]> _priorityQueue;
+  private boolean _isSortedBlockConstructed;
+
+  public SortedMailboxReceiveOperator(OpChainExecutionContext context, RelDistribution.Type exchangeType,
+      List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections, boolean isSortOnSender,
+      boolean isSortOnReceiver, DataSchema dataSchema, int senderStageId, int receiverStageId) {
+    this(context, context.getMetadataMap().get(senderStageId).getServerInstances(), exchangeType, collationKeys,
+        collationDirections, isSortOnSender, isSortOnReceiver, dataSchema, senderStageId,
+        receiverStageId, context.getTimeoutMs());
+  }
+
+  // TODO: Move deadlineInNanoSeconds to OperatorContext.
+  // TODO: Remove boxed timeoutMs value from here and use long deadlineMs from context.
+  public SortedMailboxReceiveOperator(OpChainExecutionContext context, List<VirtualServer> sendingStageInstances,
+      RelDistribution.Type exchangeType, List<RexExpression> collationKeys,
+      List<RelFieldCollation.Direction> collationDirections, boolean isSortOnSender, boolean isSortOnReceiver,
+      DataSchema dataSchema, int senderStageId, int receiverStageId, Long timeoutMs) {
+    super(context, sendingStageInstances, exchangeType, senderStageId, receiverStageId, timeoutMs);
+    _collationKeys = collationKeys;
+    _collationDirections = collationDirections;
+    _isSortOnSender = isSortOnSender;
+    _isSortOnReceiver = isSortOnReceiver;
+    _dataSchema = dataSchema;
+    Preconditions.checkState(!CollectionUtils.isEmpty(collationKeys) && isSortOnReceiver,
+        "Collation keys should exist and sorting must be enabled otherwise use non-sorted MailboxReceiveOperator");
+    // No need to switch the direction since all rows will be stored in the priority queue without applying limits
+    _priorityQueue = new PriorityQueue<>(new SortUtils.SortComparator(collationKeys, collationDirections,
+        dataSchema, false, false));
+    _isSortedBlockConstructed = false;
+  }
+
+  @Nullable
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  @Override
+  protected TransferableBlock getNextBlock() {
+    if (_upstreamErrorBlock != null) {
+      return _upstreamErrorBlock;
+    } else if (System.nanoTime() >= _deadlineTimestampNano) {
+      return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
+    }
+
+    int startingIdx = _serverIdx;
+    int openMailboxCount = 0;
+    int eosMailboxCount = 0;
+    boolean foundNonNullTransferableBlock;
+    // For all non-singleton distribution, we poll from every instance to check mailbox content.
+    // Since this operator needs to wait for all incoming data before it can send the data to the
+    // upstream operators, this operator will keep trying to poll from all mailboxes until it only
+    // receives null blocks or EOS for all mailboxes. This operator does not need to yield itself
+    // for backpressure at the moment but once support is added for k-way merge when input data is
+    // sorted this operator will have to return some data blocks without waiting for all the data.
+    // TODO: Fix wasted CPU cycles on waiting for servers that are not supposed to give content.
+    do {
+      // Reset the following for each loop
+      foundNonNullTransferableBlock = false;
+      openMailboxCount = 0;
+      eosMailboxCount = 0;
+      for (int i = 0; i < _sendingMailbox.size(); i++) {
+        // this implements a round-robin mailbox iterator, so we don't starve any mailboxes
+        _serverIdx = (startingIdx + i) % _sendingMailbox.size();
+        MailboxIdentifier mailboxId = _sendingMailbox.get(_serverIdx);
+        try {
+          ReceivingMailbox<TransferableBlock> mailbox = _mailboxService.getReceivingMailbox(mailboxId);
+          if (!mailbox.isClosed()) {
+            openMailboxCount++;
+            TransferableBlock block = mailbox.receive();
+            // Get null block when pulling times out from mailbox.
+            if (block != null) {
+              foundNonNullTransferableBlock = true;
+              if (block.isErrorBlock()) {
+                _upstreamErrorBlock =
+                    TransferableBlockUtils.getErrorTransferableBlock(block.getDataBlock().getExceptions());
+                return _upstreamErrorBlock;
+              }
+              if (!block.isEndOfStreamBlock()) {
+                // Add rows to the PriorityQueue to order them
+                List<Object[]> container = block.getContainer();
+                _priorityQueue.addAll(container);
+              } else {
+                if (_opChainStats != null && !block.getResultMetadata().isEmpty()) {
+                  for (Map.Entry<String, OperatorStats> entry : block.getResultMetadata().entrySet()) {
+                    _opChainStats.getOperatorStatsMap().compute(entry.getKey(), (_key, _value) -> entry.getValue());
+                  }
+                }
+                eosMailboxCount++;
+              }
+            }
+          }
+        } catch (Exception e) {
+          return TransferableBlockUtils.getErrorTransferableBlock(
+              new RuntimeException(String.format("Error polling mailbox=%s", mailboxId), e));
+        }
+      }
+    } while (foundNonNullTransferableBlock && ((openMailboxCount > 0) && (openMailboxCount > eosMailboxCount)));
+
+    if (((openMailboxCount == 0) || (openMailboxCount == eosMailboxCount))
+        && (!CollectionUtils.isEmpty(_priorityQueue)) && !_isSortedBlockConstructed) {
+      // Some data is present in the PriorityQueue, these need to be sent upstream
+      List<Object[]> rows = new ArrayList<>();
+      while (_priorityQueue.size() > 0) {
+        Object[] row = _priorityQueue.poll();
+        rows.add(row);
+      }
+      _isSortedBlockConstructed = true;
+      return new TransferableBlock(rows, _dataSchema, DataBlock.Type.ROW);
+    }
+
+    // there are two conditions in which we should return EOS: (1) there were
+    // no mailboxes to open (this shouldn't happen because the second condition
+    // should be hit first, but is defensive) (2) every mailbox that was opened
+    // returned an EOS block. in every other scenario, there are mailboxes that
+    // are not yet exhausted and we should wait for more data to be available
+    TransferableBlock block =
+        openMailboxCount > 0 && openMailboxCount > eosMailboxCount ? TransferableBlockUtils.getNoOpTransferableBlock()
+            : TransferableBlockUtils.getEndOfStreamTransferableBlock();
+    return block;
+  }
+
+  private void cleanUpResources() {
+    if (_priorityQueue != null) {
+      _priorityQueue.clear();
+    }
+  }
+
+  @Override
+  public void close() {
+    super.close();
+    cleanUpResources();
+  }
+
+  @Override
+  public void cancel(Throwable t) {
+    super.cancel(t);
+    cleanUpResources();
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java
index 820402f360..adc00d9c78 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java
@@ -36,8 +36,16 @@ public class SortUtils {
     private final int[] _multipliers;
     private final boolean[] _useDoubleComparison;
 
+    /**
+     * Sort comparator for use with priority queues
+     * @param collationKeys collation keys to sort on
+     * @param collationDirections collation direction for each collation key to sort on
+     * @param dataSchema data schema to use
+     * @param isNullHandlingEnabled 'true' if null handling is enabled. Not supported yet
+     * @param switchDirections 'true' if the opposite sort direction should be used as what is specified
+     */
     public SortComparator(List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections,
-        DataSchema dataSchema, boolean isNullHandlingEnabled) {
+        DataSchema dataSchema, boolean isNullHandlingEnabled, boolean switchDirections) {
       DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
       _size = collationKeys.size();
       _valueIndices = new int[_size];
@@ -45,7 +53,8 @@ public class SortUtils {
       _useDoubleComparison = new boolean[_size];
       for (int i = 0; i < _size; i++) {
         _valueIndices[i] = ((RexExpression.InputRef) collationKeys.get(i)).getIndex();
-        _multipliers[i] = collationDirections.get(i).isDescending() ? 1 : -1;
+        _multipliers[i] = switchDirections ? (collationDirections.get(i).isDescending() ? 1 : -1)
+            : (collationDirections.get(i).isDescending() ? -1 : 1);
         _useDoubleComparison[i] = columnDataTypes[_valueIndices[i]].isNumber();
       }
     }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index ca89173205..1890f1306c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -39,6 +39,7 @@ import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
 import org.apache.pinot.query.runtime.operator.MultiStageOperator;
 import org.apache.pinot.query.runtime.operator.OpChain;
 import org.apache.pinot.query.runtime.operator.SortOperator;
+import org.apache.pinot.query.runtime.operator.SortedMailboxReceiveOperator;
 import org.apache.pinot.query.runtime.operator.TransformOperator;
 import org.apache.pinot.query.runtime.operator.WindowAggregateOperator;
 
@@ -61,12 +62,20 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
 
   @Override
   public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node, PlanRequestContext context) {
-    MailboxReceiveOperator mailboxReceiveOperator =
-        new MailboxReceiveOperator(context.getOpChainExecutionContext(), node.getExchangeType(),
-            node.getCollationKeys(), node.getCollationDirections(), node.isSortOnSender(), node.isSortOnReceiver(),
-            node.getDataSchema(), node.getSenderStageId(), node.getStageId());
-    context.addReceivingMailboxes(mailboxReceiveOperator.getSendingMailbox());
-    return mailboxReceiveOperator;
+    if (node.isSortOnReceiver()) {
+      SortedMailboxReceiveOperator sortedMailboxReceiveOperator =
+          new SortedMailboxReceiveOperator(context.getOpChainExecutionContext(), node.getExchangeType(),
+              node.getCollationKeys(), node.getCollationDirections(), node.isSortOnSender(), node.isSortOnReceiver(),
+              node.getDataSchema(), node.getSenderStageId(), node.getStageId());
+      context.addReceivingMailboxes(sortedMailboxReceiveOperator.getSendingMailbox());
+      return sortedMailboxReceiveOperator;
+    } else {
+      MailboxReceiveOperator mailboxReceiveOperator =
+          new MailboxReceiveOperator(context.getOpChainExecutionContext(), node.getExchangeType(),
+              node.getSenderStageId(), node.getStageId());
+      context.addReceivingMailboxes(mailboxReceiveOperator.getSendingMailbox());
+      return mailboxReceiveOperator;
+    }
   }
 
   @Override
@@ -122,8 +131,7 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
   @Override
   public MultiStageOperator visitSort(SortNode node, PlanRequestContext context) {
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
-    boolean isInputSorted =
-        nextOperator instanceof MailboxReceiveOperator && ((MailboxReceiveOperator) nextOperator).hasCollationKeys();
+    boolean isInputSorted = nextOperator instanceof SortedMailboxReceiveOperator;
     return new SortOperator(context.getOpChainExecutionContext(), nextOperator, node.getCollationKeys(),
         node.getCollationDirections(), node.getFetch(), node.getOffset(), node.getDataSchema(), isInputSorted);
   }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 43345b3b08..011fed8472 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -21,7 +21,6 @@ package org.apache.pinot.query.service.dispatch;
 import com.google.common.annotations.VisibleForTesting;
 import io.grpc.Deadline;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -291,8 +290,7 @@ public class QueryDispatcher {
       DataSchema dataSchema, OpChainExecutionContext context) {
     // timeout is set for reduce stage
     MailboxReceiveOperator mailboxReceiveOperator =
-        new MailboxReceiveOperator(context, RelDistribution.Type.RANDOM_DISTRIBUTED, Collections.emptyList(),
-            Collections.emptyList(), false, false, dataSchema, stageId, reducerStageId);
+        new MailboxReceiveOperator(context, RelDistribution.Type.RANDOM_DISTRIBUTED, stageId, reducerStageId);
     return mailboxReceiveOperator;
   }
 
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index bfbb977e7c..7e932067e1 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -20,20 +20,16 @@ package org.apache.pinot.query.runtime.operator;
 
 import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Random;
 import org.apache.calcite.rel.RelDistribution;
-import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.ReceivingMailbox;
-import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.routing.VirtualServer;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -48,7 +44,6 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
-import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.STRING;
 
 
 public class MailboxReceiveOperatorTest {
@@ -88,24 +83,12 @@ public class MailboxReceiveOperatorTest {
   @Test
   public void shouldTimeoutOnExtraLongSleep()
       throws InterruptedException {
-    // Choose whether to exercise the ordering code path or not
-    List<RexExpression> collationKeys = new ArrayList<>();
-    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
-    boolean sortOnReceiver = false;
-    if (_random.nextBoolean()) {
-      collationKeys.add(new RexExpression.InputRef(0));
-      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
-      sortOnReceiver = true;
-    }
-
     // shorter timeoutMs should result in error.
-    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, 10L, 10L,
             new HashMap<>());
     MailboxReceiveOperator receiveOp =
-        new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, collationKeys,
-            collationDirections, false, sortOnReceiver, inSchema, 456, 789, 10L);
+        new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, 456, 789, 10L);
     Thread.sleep(200L);
     TransferableBlock mailbox = receiveOp.nextBlock();
     Assert.assertTrue(mailbox.isErrorBlock());
@@ -115,15 +98,13 @@ public class MailboxReceiveOperatorTest {
     // longer timeout or default timeout (10s) doesn't result in error.
     context = new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, 2000L, 2000L,
         new HashMap<>());
-    receiveOp = new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, collationKeys,
-        collationDirections, false, sortOnReceiver, inSchema, 456, 789, 2000L);
+    receiveOp = new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, 456, 789, 2000L);
     Thread.sleep(200L);
     mailbox = receiveOp.nextBlock();
     Assert.assertFalse(mailbox.isErrorBlock());
     context = new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
         Long.MAX_VALUE, new HashMap<>());
-    receiveOp = new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, collationKeys,
-        collationDirections, false, sortOnReceiver, inSchema, 456, 789, null);
+    receiveOp = new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, 456, 789, null);
     Thread.sleep(200L);
     mailbox = receiveOp.nextBlock();
     Assert.assertFalse(mailbox.isErrorBlock());
@@ -142,24 +123,12 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_server2.getHostname()).thenReturn("singleton");
     Mockito.when(_server2.getQueryMailboxPort()).thenReturn(123);
 
-    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
-
-    // Choose whether to exercise the ordering code path or not
-    List<RexExpression> collationKeys = new ArrayList<>();
-    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
-    boolean sortOnReceiver = false;
-    if (_random.nextBoolean()) {
-      collationKeys.add(new RexExpression.InputRef(0));
-      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
-      sortOnReceiver = true;
-    }
-
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
-            collationKeys, collationDirections, false, sortOnReceiver, inSchema, 456, 789, null);
+            456, 789, null);
   }
 
   @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
@@ -173,24 +142,11 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_server2.getHostname()).thenReturn("singleton");
     Mockito.when(_server2.getQueryMailboxPort()).thenReturn(123);
 
-    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
-
-    // Choose whether to exercise the ordering code path or not
-    List<RexExpression> collationKeys = new ArrayList<>();
-    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
-    boolean sortOnReceiver = false;
-    if (_random.nextBoolean()) {
-      collationKeys.add(new RexExpression.InputRef(0));
-      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
-      sortOnReceiver = true;
-    }
-
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
     MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
-        RelDistribution.Type.RANGE_DISTRIBUTED, collationKeys, collationDirections, false, sortOnReceiver, inSchema,
-        456, 789, null);
+        RelDistribution.Type.RANGE_DISTRIBUTED, 456, 789, null);
   }
 
   @Test
@@ -214,26 +170,13 @@ public class MailboxReceiveOperatorTest {
     String toHost = "toHost";
     VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0);
 
-    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
-
-    // Choose whether to exercise the ordering code path or not
-    List<RexExpression> collationKeys = new ArrayList<>();
-    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
-    boolean sortOnReceiver = false;
-    if (_random.nextBoolean()) {
-      collationKeys.add(new RexExpression.InputRef(0));
-      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
-      sortOnReceiver = true;
-    }
-
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
-            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
-            null);
+            stageId, DEFAULT_RECEIVER_STAGE_ID, null);
 
     // Receive end of stream block directly when there is no match.
     Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
@@ -265,26 +208,13 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
     Mockito.when(_mailbox.isClosed()).thenReturn(true);
 
-    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
-
-    // Choose whether to exercise the ordering code path or not
-    List<RexExpression> collationKeys = new ArrayList<>();
-    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
-    boolean sortOnReceiver = false;
-    if (_random.nextBoolean()) {
-      collationKeys.add(new RexExpression.InputRef(0));
-      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
-      sortOnReceiver = true;
-    }
-
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
-            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
-            null);
+            stageId, DEFAULT_RECEIVER_STAGE_ID, null);
 
     // Receive end of stream block directly when mailbox is close.
     Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
@@ -319,26 +249,13 @@ public class MailboxReceiveOperatorTest {
     // Receive null mailbox during timeout.
     Mockito.when(_mailbox.receive()).thenReturn(null);
 
-    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
-
-    // Choose whether to exercise the ordering code path or not
-    List<RexExpression> collationKeys = new ArrayList<>();
-    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
-    boolean sortOnReceiver = false;
-    if (_random.nextBoolean()) {
-      collationKeys.add(new RexExpression.InputRef(0));
-      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
-      sortOnReceiver = true;
-    }
-
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
-            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
-            null);
+            stageId, DEFAULT_RECEIVER_STAGE_ID, null);
     // Receive NoOpBlock.
     Assert.assertTrue(receiveOp.nextBlock().isNoOpBlock());
   }
@@ -371,26 +288,13 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_mailbox.isClosed()).thenReturn(false);
     Mockito.when(_mailbox.receive()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
-    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
-
-    // Choose whether to exercise the ordering code path or not
-    List<RexExpression> collationKeys = new ArrayList<>();
-    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
-    boolean sortOnReceiver = false;
-    if (_random.nextBoolean()) {
-      collationKeys.add(new RexExpression.InputRef(0));
-      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
-      sortOnReceiver = true;
-    }
-
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
-            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
-            null);
+            stageId, DEFAULT_RECEIVER_STAGE_ID, null);
     // Receive EosBloc.
     Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
   }
@@ -426,25 +330,13 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_mailbox.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow),
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
-
-    // Choose whether to exercise the ordering code path or not
-    List<RexExpression> collationKeys = new ArrayList<>();
-    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
-    boolean sortOnReceiver = false;
-    if (_random.nextBoolean()) {
-      collationKeys.add(new RexExpression.InputRef(0));
-      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
-      sortOnReceiver = true;
-    }
-
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
-            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
-            null);
+            stageId, DEFAULT_RECEIVER_STAGE_ID, null);
     TransferableBlock receivedBlock = receiveOp.nextBlock();
     while (receivedBlock.isNoOpBlock()) {
       receivedBlock = receiveOp.nextBlock();
@@ -484,24 +376,13 @@ public class MailboxReceiveOperatorTest {
     DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
     Mockito.when(_mailbox.receive()).thenReturn(TransferableBlockUtils.getErrorTransferableBlock(e));
 
-    // Choose whether to exercise the ordering code path or not
-    List<RexExpression> collationKeys = new ArrayList<>();
-    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
-    boolean sortOnReceiver = false;
-    if (_random.nextBoolean()) {
-      collationKeys.add(new RexExpression.InputRef(0));
-      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
-      sortOnReceiver = true;
-    }
-
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
-            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
-            null);
+            stageId, DEFAULT_RECEIVER_STAGE_ID, null);
     TransferableBlock receivedBlock = receiveOp.nextBlock();
     Assert.assertTrue(receivedBlock.isErrorBlock());
     MetadataBlock error = (MetadataBlock) receivedBlock.getDataBlock();
@@ -541,24 +422,13 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow),
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
-    // Choose whether to exercise the ordering code path or not
-    List<RexExpression> collationKeys = new ArrayList<>();
-    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
-    boolean sortOnReceiver = false;
-    if (_random.nextBoolean()) {
-      collationKeys.add(new RexExpression.InputRef(0));
-      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
-      sortOnReceiver = true;
-    }
-
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
-            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
-            null);
+            stageId, DEFAULT_RECEIVER_STAGE_ID, null);
     TransferableBlock receivedBlock = receiveOp.nextBlock();
     while (receivedBlock.isNoOpBlock()) {
       receivedBlock = receiveOp.nextBlock();
@@ -602,24 +472,13 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow),
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
-    // Choose whether to exercise the ordering code path or not
-    List<RexExpression> collationKeys = new ArrayList<>();
-    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
-    boolean sortOnReceiver = false;
-    if (_random.nextBoolean()) {
-      collationKeys.add(new RexExpression.InputRef(0));
-      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
-      sortOnReceiver = true;
-    }
-
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
-            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
-            null);
+            stageId, DEFAULT_RECEIVER_STAGE_ID, null);
     TransferableBlock receivedBlock = receiveOp.nextBlock();
     while (receivedBlock.isNoOpBlock()) {
       receivedBlock = receiveOp.nextBlock();
@@ -671,8 +530,7 @@ public class MailboxReceiveOperatorTest {
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
-            Collections.emptyList(), Collections.emptyList(), false, false, inSchema, stageId,
-            DEFAULT_RECEIVER_STAGE_ID, null);
+            stageId, DEFAULT_RECEIVER_STAGE_ID, null);
     // Receive first block from first server.
     TransferableBlock receivedBlock = receiveOp.nextBlock();
     List<Object[]> resultRows = receivedBlock.getContainer();
@@ -725,24 +583,13 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_mailbox2.isClosed()).thenReturn(false);
     Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3));
 
-    // Choose whether to exercise the ordering code path or not
-    List<RexExpression> collationKeys = new ArrayList<>();
-    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
-    boolean sortOnReceiver = false;
-    if (_random.nextBoolean()) {
-      collationKeys.add(new RexExpression.InputRef(0));
-      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
-      sortOnReceiver = true;
-    }
-
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
-            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
-            null);
+            stageId, DEFAULT_RECEIVER_STAGE_ID, null);
     // Receive error block from first server.
     TransferableBlock receivedBlock = receiveOp.nextBlock();
     Assert.assertTrue(receivedBlock.isErrorBlock());
@@ -783,190 +630,14 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_mailbox2.isClosed()).thenReturn(false);
     Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3));
 
-    // Choose whether to exercise the ordering code path or not
-    List<RexExpression> collationKeys = new ArrayList<>();
-    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
-    boolean sortOnReceiver = false;
-    if (_random.nextBoolean()) {
-      collationKeys.add(new RexExpression.InputRef(0));
-      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
-      sortOnReceiver = true;
-    }
-
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
     MailboxReceiveOperator receiveOp =
         new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
-            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
-            null);
+            stageId, DEFAULT_RECEIVER_STAGE_ID, null);
     TransferableBlock receivedBlock = receiveOp.nextBlock();
     Assert.assertTrue(receivedBlock.isErrorBlock(), "server-1 should have returned an error-block");
   }
-
-  @Test
-  public void shouldReceiveMailboxFromTwoServersWithCollationKey()
-      throws Exception {
-    String server1Host = "hash1";
-    int server1Port = 123;
-    Mockito.when(_server1.getHostname()).thenReturn(server1Host);
-    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
-
-    String server2Host = "hash2";
-    int server2Port = 456;
-    Mockito.when(_server2.getHostname()).thenReturn(server2Host);
-    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2Port);
-
-    int jobId = 456;
-    int stageId = 0;
-    int toPort = 8888;
-    String toHost = "toHost";
-    VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0);
-
-    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
-    JsonMailboxIdentifier expectedMailboxId1 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
-        new VirtualServerAddress(server1Host, server1Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
-    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
-    Mockito.when(_mailbox.isClosed()).thenReturn(false);
-    Object[] expRow1 = new Object[]{3, 3};
-    Object[] expRow2 = new Object[]{1, 1};
-    Mockito.when(_mailbox.receive())
-        .thenReturn(OperatorTestUtil.block(inSchema, expRow1), OperatorTestUtil.block(inSchema, expRow2),
-            TransferableBlockUtils.getEndOfStreamTransferableBlock());
-
-    Object[] expRow3 = new Object[]{4, 2};
-    Object[] expRow4 = new Object[]{2, 4};
-    Object[] expRow5 = new Object[]{-1, 95};
-    JsonMailboxIdentifier expectedMailboxId2 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
-        new VirtualServerAddress(server2Host, server2Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
-    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
-    Mockito.when(_mailbox2.isClosed()).thenReturn(false);
-    Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3),
-        OperatorTestUtil.block(inSchema, expRow4), OperatorTestUtil.block(inSchema, expRow5),
-        TransferableBlockUtils.getEndOfStreamTransferableBlock());
-
-    // Setup the collation key and direction
-    List<RexExpression> collationKeys = new ArrayList<>(Collections.singletonList(new RexExpression.InputRef(0)));
-    RelFieldCollation.Direction direction = _random.nextBoolean() ? RelFieldCollation.Direction.ASCENDING
-        : RelFieldCollation.Direction.DESCENDING;
-    List<RelFieldCollation.Direction> collationDirection = new ArrayList<>(Collections.singletonList(direction));
-
-    OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
-
-    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
-        RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirection, false, true, inSchema, stageId,
-        DEFAULT_RECEIVER_STAGE_ID, null);
-
-    // Receive a set of no-op blocks and skip over them
-    TransferableBlock receivedBlock = receiveOp.nextBlock();
-    while (receivedBlock.isNoOpBlock()) {
-      receivedBlock = receiveOp.nextBlock();
-    }
-    List<Object[]> resultRows = receivedBlock.getContainer();
-    // All blocks should be returned together since ordering was required
-    Assert.assertEquals(resultRows.size(), 5);
-    if (direction == RelFieldCollation.Direction.ASCENDING) {
-      Assert.assertEquals(resultRows.get(0), expRow5);
-      Assert.assertEquals(resultRows.get(1), expRow2);
-      Assert.assertEquals(resultRows.get(2), expRow4);
-      Assert.assertEquals(resultRows.get(3), expRow1);
-      Assert.assertEquals(resultRows.get(4), expRow3);
-    } else {
-      Assert.assertEquals(resultRows.get(0), expRow3);
-      Assert.assertEquals(resultRows.get(1), expRow1);
-      Assert.assertEquals(resultRows.get(2), expRow4);
-      Assert.assertEquals(resultRows.get(3), expRow2);
-      Assert.assertEquals(resultRows.get(4), expRow5);
-    }
-
-    receivedBlock = receiveOp.nextBlock();
-    Assert.assertTrue(receivedBlock.isEndOfStreamBlock());
-  }
-
-  @Test
-  public void shouldReceiveMailboxFromTwoServersWithCollationKeyTwoColumns()
-      throws Exception {
-    String server1Host = "hash1";
-    int server1Port = 123;
-    Mockito.when(_server1.getHostname()).thenReturn(server1Host);
-    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
-
-    String server2Host = "hash2";
-    int server2Port = 456;
-    Mockito.when(_server2.getHostname()).thenReturn(server2Host);
-    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2Port);
-
-    int jobId = 456;
-    int stageId = 0;
-    int toPort = 8888;
-    String toHost = "toHost";
-    VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0);
-
-    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2", "col3"},
-        new DataSchema.ColumnDataType[]{INT, INT, STRING});
-    JsonMailboxIdentifier expectedMailboxId1 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
-        new VirtualServerAddress(server1Host, server1Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
-    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
-    Mockito.when(_mailbox.isClosed()).thenReturn(false);
-    Object[] expRow1 = new Object[]{3, 3, "queen"};
-    Object[] expRow2 = new Object[]{1, 1, "pink floyd"};
-    Mockito.when(_mailbox.receive())
-        .thenReturn(OperatorTestUtil.block(inSchema, expRow1), OperatorTestUtil.block(inSchema, expRow2),
-            TransferableBlockUtils.getEndOfStreamTransferableBlock());
-
-    Object[] expRow3 = new Object[]{42, 2, "pink floyd"};
-    Object[] expRow4 = new Object[]{2, 4, "aerosmith"};
-    Object[] expRow5 = new Object[]{-1, 95, "foo fighters"};
-    JsonMailboxIdentifier expectedMailboxId2 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
-        new VirtualServerAddress(server2Host, server2Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
-    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
-    Mockito.when(_mailbox2.isClosed()).thenReturn(false);
-    Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3),
-        OperatorTestUtil.block(inSchema, expRow4), OperatorTestUtil.block(inSchema, expRow5),
-        TransferableBlockUtils.getEndOfStreamTransferableBlock());
-
-    // Setup the collation key and direction
-    List<RexExpression> collationKeys = new ArrayList<>(Arrays.asList(new RexExpression.InputRef(2),
-        new RexExpression.InputRef(0)));
-    RelFieldCollation.Direction direction1 = _random.nextBoolean() ? RelFieldCollation.Direction.ASCENDING
-        : RelFieldCollation.Direction.DESCENDING;
-    RelFieldCollation.Direction direction2 = RelFieldCollation.Direction.ASCENDING;
-    List<RelFieldCollation.Direction> collationDirection = new ArrayList<>(Arrays.asList(direction1, direction2));
-
-    OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
-            Long.MAX_VALUE, new HashMap<>());
-
-    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
-        RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirection, false, true, inSchema, stageId,
-        DEFAULT_RECEIVER_STAGE_ID, null);
-
-    // Receive a set of no-op blocks and skip over them
-    TransferableBlock receivedBlock = receiveOp.nextBlock();
-    while (receivedBlock.isNoOpBlock()) {
-      receivedBlock = receiveOp.nextBlock();
-    }
-    List<Object[]> resultRows = receivedBlock.getContainer();
-    // All blocks should be returned together since ordering was required
-    Assert.assertEquals(resultRows.size(), 5);
-    if (direction1 == RelFieldCollation.Direction.ASCENDING) {
-      Assert.assertEquals(resultRows.get(0), expRow4);
-      Assert.assertEquals(resultRows.get(1), expRow5);
-      Assert.assertEquals(resultRows.get(2), expRow2);
-      Assert.assertEquals(resultRows.get(3), expRow3);
-      Assert.assertEquals(resultRows.get(4), expRow1);
-    } else {
-      Assert.assertEquals(resultRows.get(0), expRow1);
-      Assert.assertEquals(resultRows.get(1), expRow2);
-      Assert.assertEquals(resultRows.get(2), expRow3);
-      Assert.assertEquals(resultRows.get(3), expRow5);
-      Assert.assertEquals(resultRows.get(4), expRow4);
-    }
-
-    receivedBlock = receiveOp.nextBlock();
-    Assert.assertTrue(receivedBlock.isEndOfStreamBlock());
-  }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
similarity index 77%
copy from pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
copy to pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
index bfbb977e7c..659f7e8860 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
@@ -51,7 +51,7 @@ import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
 import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.STRING;
 
 
-public class MailboxReceiveOperatorTest {
+public class SortedMailboxReceiveOperatorTest {
 
   private static final int DEFAULT_RECEIVER_STAGE_ID = 10;
 
@@ -88,24 +88,19 @@ public class MailboxReceiveOperatorTest {
   @Test
   public void shouldTimeoutOnExtraLongSleep()
       throws InterruptedException {
-    // Choose whether to exercise the ordering code path or not
     List<RexExpression> collationKeys = new ArrayList<>();
     List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
-    boolean sortOnReceiver = false;
-    if (_random.nextBoolean()) {
-      collationKeys.add(new RexExpression.InputRef(0));
-      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
-      sortOnReceiver = true;
-    }
+    collationKeys.add(new RexExpression.InputRef(0));
+    collationDirections.add(RelFieldCollation.Direction.ASCENDING);
 
     // shorter timeoutMs should result in error.
     DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, 10L, 10L,
             new HashMap<>());
-    MailboxReceiveOperator receiveOp =
-        new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, collationKeys,
-            collationDirections, false, sortOnReceiver, inSchema, 456, 789, 10L);
+    SortedMailboxReceiveOperator receiveOp =
+        new SortedMailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, collationKeys,
+            collationDirections, false, true, inSchema, 456, 789, 10L);
     Thread.sleep(200L);
     TransferableBlock mailbox = receiveOp.nextBlock();
     Assert.assertTrue(mailbox.isErrorBlock());
@@ -115,15 +110,15 @@ public class MailboxReceiveOperatorTest {
     // longer timeout or default timeout (10s) doesn't result in error.
     context = new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, 2000L, 2000L,
         new HashMap<>());
-    receiveOp = new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, collationKeys,
-        collationDirections, false, sortOnReceiver, inSchema, 456, 789, 2000L);
+    receiveOp = new SortedMailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON,
+        collationKeys, collationDirections, false, true, inSchema, 456, 789, 2000L);
     Thread.sleep(200L);
     mailbox = receiveOp.nextBlock();
     Assert.assertFalse(mailbox.isErrorBlock());
     context = new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
         Long.MAX_VALUE, new HashMap<>());
-    receiveOp = new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, collationKeys,
-        collationDirections, false, sortOnReceiver, inSchema, 456, 789, null);
+    receiveOp = new SortedMailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON,
+        collationKeys, collationDirections, false, true, inSchema, 456, 789, null);
     Thread.sleep(200L);
     mailbox = receiveOp.nextBlock();
     Assert.assertFalse(mailbox.isErrorBlock());
@@ -144,22 +139,17 @@ public class MailboxReceiveOperatorTest {
 
     DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
 
-    // Choose whether to exercise the ordering code path or not
     List<RexExpression> collationKeys = new ArrayList<>();
     List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
-    boolean sortOnReceiver = false;
-    if (_random.nextBoolean()) {
-      collationKeys.add(new RexExpression.InputRef(0));
-      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
-      sortOnReceiver = true;
-    }
+    collationKeys.add(new RexExpression.InputRef(0));
+    collationDirections.add(RelFieldCollation.Direction.ASCENDING);
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
-    MailboxReceiveOperator receiveOp =
-        new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
-            collationKeys, collationDirections, false, sortOnReceiver, inSchema, 456, 789, null);
+    SortedMailboxReceiveOperator receiveOp =
+        new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
+            collationKeys, collationDirections, false, true, inSchema, 456, 789, null);
   }
 
   @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
@@ -175,22 +165,17 @@ public class MailboxReceiveOperatorTest {
 
     DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
 
-    // Choose whether to exercise the ordering code path or not
     List<RexExpression> collationKeys = new ArrayList<>();
     List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
-    boolean sortOnReceiver = false;
-    if (_random.nextBoolean()) {
-      collationKeys.add(new RexExpression.InputRef(0));
-      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
-      sortOnReceiver = true;
-    }
+    collationKeys.add(new RexExpression.InputRef(0));
+    collationDirections.add(RelFieldCollation.Direction.ASCENDING);
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
-    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
-        RelDistribution.Type.RANGE_DISTRIBUTED, collationKeys, collationDirections, false, sortOnReceiver, inSchema,
-        456, 789, null);
+    SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
+        ImmutableList.of(_server1, _server2), RelDistribution.Type.RANGE_DISTRIBUTED, collationKeys,
+        collationDirections, false, true, inSchema, 456, 789, null);
   }
 
   @Test
@@ -216,23 +201,18 @@ public class MailboxReceiveOperatorTest {
 
     DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
 
-    // Choose whether to exercise the ordering code path or not
     List<RexExpression> collationKeys = new ArrayList<>();
     List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
-    boolean sortOnReceiver = false;
-    if (_random.nextBoolean()) {
-      collationKeys.add(new RexExpression.InputRef(0));
-      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
-      sortOnReceiver = true;
-    }
+    collationKeys.add(new RexExpression.InputRef(0));
+    collationDirections.add(RelFieldCollation.Direction.ASCENDING);
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
-    MailboxReceiveOperator receiveOp =
-        new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
-            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+    SortedMailboxReceiveOperator receiveOp =
+        new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
+            collationKeys, collationDirections, false, true, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
             null);
 
     // Receive end of stream block directly when there is no match.
@@ -267,23 +247,18 @@ public class MailboxReceiveOperatorTest {
 
     DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
 
-    // Choose whether to exercise the ordering code path or not
     List<RexExpression> collationKeys = new ArrayList<>();
     List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
-    boolean sortOnReceiver = false;
-    if (_random.nextBoolean()) {
-      collationKeys.add(new RexExpression.InputRef(0));
-      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
-      sortOnReceiver = true;
-    }
+    collationKeys.add(new RexExpression.InputRef(0));
+    collationDirections.add(RelFieldCollation.Direction.ASCENDING);
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
-    MailboxReceiveOperator receiveOp =
-        new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
-            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+    SortedMailboxReceiveOperator receiveOp =
+        new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
+            collationKeys, collationDirections, false, true, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
             null);
 
     // Receive end of stream block directly when mailbox is close.
@@ -321,23 +296,18 @@ public class MailboxReceiveOperatorTest {
 
     DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
 
-    // Choose whether to exercise the ordering code path or not
     List<RexExpression> collationKeys = new ArrayList<>();
     List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
-    boolean sortOnReceiver = false;
-    if (_random.nextBoolean()) {
-      collationKeys.add(new RexExpression.InputRef(0));
-      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
-      sortOnReceiver = true;
-    }
+    collationKeys.add(new RexExpression.InputRef(0));
+    collationDirections.add(RelFieldCollation.Direction.ASCENDING);
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
-    MailboxReceiveOperator receiveOp =
-        new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
-            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+    SortedMailboxReceiveOperator receiveOp =
+        new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
+            collationKeys, collationDirections, false, true, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
             null);
     // Receive NoOpBlock.
     Assert.assertTrue(receiveOp.nextBlock().isNoOpBlock());
@@ -373,23 +343,18 @@ public class MailboxReceiveOperatorTest {
 
     DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
 
-    // Choose whether to exercise the ordering code path or not
     List<RexExpression> collationKeys = new ArrayList<>();
     List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
-    boolean sortOnReceiver = false;
-    if (_random.nextBoolean()) {
-      collationKeys.add(new RexExpression.InputRef(0));
-      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
-      sortOnReceiver = true;
-    }
+    collationKeys.add(new RexExpression.InputRef(0));
+    collationDirections.add(RelFieldCollation.Direction.ASCENDING);
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
-    MailboxReceiveOperator receiveOp =
-        new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
-            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+    SortedMailboxReceiveOperator receiveOp =
+        new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
+            collationKeys, collationDirections, false, true, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
             null);
     // Receive EosBloc.
     Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
@@ -426,24 +391,18 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_mailbox.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow),
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
-
-    // Choose whether to exercise the ordering code path or not
     List<RexExpression> collationKeys = new ArrayList<>();
     List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
-    boolean sortOnReceiver = false;
-    if (_random.nextBoolean()) {
-      collationKeys.add(new RexExpression.InputRef(0));
-      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
-      sortOnReceiver = true;
-    }
+    collationKeys.add(new RexExpression.InputRef(0));
+    collationDirections.add(RelFieldCollation.Direction.ASCENDING);
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
-    MailboxReceiveOperator receiveOp =
-        new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
-            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+    SortedMailboxReceiveOperator receiveOp =
+        new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
+            collationKeys, collationDirections, false, true, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
             null);
     TransferableBlock receivedBlock = receiveOp.nextBlock();
     while (receivedBlock.isNoOpBlock()) {
@@ -484,23 +443,18 @@ public class MailboxReceiveOperatorTest {
     DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
     Mockito.when(_mailbox.receive()).thenReturn(TransferableBlockUtils.getErrorTransferableBlock(e));
 
-    // Choose whether to exercise the ordering code path or not
     List<RexExpression> collationKeys = new ArrayList<>();
     List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
-    boolean sortOnReceiver = false;
-    if (_random.nextBoolean()) {
-      collationKeys.add(new RexExpression.InputRef(0));
-      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
-      sortOnReceiver = true;
-    }
+    collationKeys.add(new RexExpression.InputRef(0));
+    collationDirections.add(RelFieldCollation.Direction.ASCENDING);
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
-    MailboxReceiveOperator receiveOp =
-        new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
-            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+    SortedMailboxReceiveOperator receiveOp =
+        new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
+            collationKeys, collationDirections, false, true, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
             null);
     TransferableBlock receivedBlock = receiveOp.nextBlock();
     Assert.assertTrue(receivedBlock.isErrorBlock());
@@ -508,8 +462,9 @@ public class MailboxReceiveOperatorTest {
     Assert.assertTrue(error.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains("errorBlock"));
   }
 
-  @Test
-  public void shouldReceiveMailboxFromTwoServersOneClose()
+  @Test(expectedExceptions = IllegalStateException.class,
+      expectedExceptionsMessageRegExp = ".*Collation keys should exist.*")
+  public void shouldThrowOnEmptyCollationKey()
       throws Exception {
     String server1Host = "hash1";
     int server1Port = 123;
@@ -527,49 +482,60 @@ public class MailboxReceiveOperatorTest {
     String toHost = "toHost";
     VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0);
 
-    JsonMailboxIdentifier expectedMailboxId1 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
-        new VirtualServerAddress(server1Host, server1Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
-    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
-    Mockito.when(_mailbox.isClosed()).thenReturn(true);
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+
+    List<RexExpression> collationKeys = new ArrayList<>();
+    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+
+    OpChainExecutionContext context =
+        new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
+            Long.MAX_VALUE, new HashMap<>());
+
+    SortedMailboxReceiveOperator receiveOp =
+        new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
+            RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirections, false, true, inSchema, stageId,
+            DEFAULT_RECEIVER_STAGE_ID, null);
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class,
+      expectedExceptionsMessageRegExp = ".*sorting must be enabled.*")
+  public void shouldThrowOnShouldSortOnReceiverFalse()
+      throws Exception {
+    String server1Host = "hash1";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(server1Host);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    String server2Host = "hash2";
+    int server2Port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(server2Host);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2Port);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+    VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0);
 
-    JsonMailboxIdentifier expectedMailboxId2 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
-        new VirtualServerAddress(server2Host, server2Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
-    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
-    Mockito.when(_mailbox2.isClosed()).thenReturn(false);
-    Object[] expRow = new Object[]{1, 1};
     DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
-    Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow),
-        TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
-    // Choose whether to exercise the ordering code path or not
     List<RexExpression> collationKeys = new ArrayList<>();
     List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
-    boolean sortOnReceiver = false;
-    if (_random.nextBoolean()) {
-      collationKeys.add(new RexExpression.InputRef(0));
-      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
-      sortOnReceiver = true;
-    }
+    collationKeys.add(new RexExpression.InputRef(0));
+    collationDirections.add(RelFieldCollation.Direction.ASCENDING);
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
-    MailboxReceiveOperator receiveOp =
-        new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
-            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
-            null);
-    TransferableBlock receivedBlock = receiveOp.nextBlock();
-    while (receivedBlock.isNoOpBlock()) {
-      receivedBlock = receiveOp.nextBlock();
-    }
-    List<Object[]> resultRows = receivedBlock.getContainer();
-    Assert.assertEquals(resultRows.size(), 1);
-    Assert.assertEquals(resultRows.get(0), expRow);
+    SortedMailboxReceiveOperator receiveOp =
+        new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
+            RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirections, false, false, inSchema, stageId,
+            DEFAULT_RECEIVER_STAGE_ID, null);
   }
 
   @Test
-  public void shouldReceiveMailboxFromTwoServersOneNull()
+  public void shouldReceiveMailboxFromTwoServersOneClose()
       throws Exception {
     String server1Host = "hash1";
     int server1Port = 123;
@@ -590,8 +556,7 @@ public class MailboxReceiveOperatorTest {
     JsonMailboxIdentifier expectedMailboxId1 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
         new VirtualServerAddress(server1Host, server1Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
     Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
-    Mockito.when(_mailbox.isClosed()).thenReturn(false);
-    Mockito.when(_mailbox.receive()).thenReturn(null, TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_mailbox.isClosed()).thenReturn(true);
 
     JsonMailboxIdentifier expectedMailboxId2 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
         new VirtualServerAddress(server2Host, server2Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
@@ -602,24 +567,19 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow),
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
-    // Choose whether to exercise the ordering code path or not
     List<RexExpression> collationKeys = new ArrayList<>();
     List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
-    boolean sortOnReceiver = false;
-    if (_random.nextBoolean()) {
-      collationKeys.add(new RexExpression.InputRef(0));
-      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
-      sortOnReceiver = true;
-    }
+    collationKeys.add(new RexExpression.InputRef(0));
+    collationDirections.add(RelFieldCollation.Direction.ASCENDING);
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
-    MailboxReceiveOperator receiveOp =
-        new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
-            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
-            null);
+    SortedMailboxReceiveOperator receiveOp =
+        new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
+            RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirections, false, true, inSchema, stageId,
+            DEFAULT_RECEIVER_STAGE_ID, null);
     TransferableBlock receivedBlock = receiveOp.nextBlock();
     while (receivedBlock.isNoOpBlock()) {
       receivedBlock = receiveOp.nextBlock();
@@ -630,7 +590,7 @@ public class MailboxReceiveOperatorTest {
   }
 
   @Test
-  public void shouldReceiveMailboxFromTwoServers()
+  public void shouldReceiveMailboxFromTwoServersOneNull()
       throws Exception {
     String server1Host = "hash1";
     int server1Port = 123;
@@ -648,47 +608,41 @@ public class MailboxReceiveOperatorTest {
     String toHost = "toHost";
     VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0);
 
-    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
     JsonMailboxIdentifier expectedMailboxId1 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
         new VirtualServerAddress(server1Host, server1Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
     Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
     Mockito.when(_mailbox.isClosed()).thenReturn(false);
-    Object[] expRow1 = new Object[]{1, 1};
-    Object[] expRow2 = new Object[]{2, 2};
-    Mockito.when(_mailbox.receive())
-        .thenReturn(OperatorTestUtil.block(inSchema, expRow1), OperatorTestUtil.block(inSchema, expRow2),
-            TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_mailbox.receive()).thenReturn(null, TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
-    Object[] expRow3 = new Object[]{3, 3};
     JsonMailboxIdentifier expectedMailboxId2 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
         new VirtualServerAddress(server2Host, server2Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
     Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
     Mockito.when(_mailbox2.isClosed()).thenReturn(false);
-    Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3));
+    Object[] expRow = new Object[]{1, 1};
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+    Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow),
+        TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    List<RexExpression> collationKeys = new ArrayList<>();
+    List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+    collationKeys.add(new RexExpression.InputRef(0));
+    collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
-    MailboxReceiveOperator receiveOp =
-        new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
-            Collections.emptyList(), Collections.emptyList(), false, false, inSchema, stageId,
+    SortedMailboxReceiveOperator receiveOp =
+        new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
+            RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirections, false, true, inSchema, stageId,
             DEFAULT_RECEIVER_STAGE_ID, null);
-    // Receive first block from first server.
     TransferableBlock receivedBlock = receiveOp.nextBlock();
+    while (receivedBlock.isNoOpBlock()) {
+      receivedBlock = receiveOp.nextBlock();
+    }
     List<Object[]> resultRows = receivedBlock.getContainer();
     Assert.assertEquals(resultRows.size(), 1);
-    Assert.assertEquals(resultRows.get(0), expRow1);
-    // Receive second block from first server.
-    receivedBlock = receiveOp.nextBlock();
-    resultRows = receivedBlock.getContainer();
-    Assert.assertEquals(resultRows.size(), 1);
-    Assert.assertEquals(resultRows.get(0), expRow2);
-
-    // Receive from second server.
-    receivedBlock = receiveOp.nextBlock();
-    resultRows = receivedBlock.getContainer();
-    Assert.assertEquals(resultRows.size(), 1);
-    Assert.assertEquals(resultRows.get(0), expRow3);
+    Assert.assertEquals(resultRows.get(0), expRow);
   }
 
   @Test
@@ -725,24 +679,19 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_mailbox2.isClosed()).thenReturn(false);
     Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3));
 
-    // Choose whether to exercise the ordering code path or not
     List<RexExpression> collationKeys = new ArrayList<>();
     List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
-    boolean sortOnReceiver = false;
-    if (_random.nextBoolean()) {
-      collationKeys.add(new RexExpression.InputRef(0));
-      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
-      sortOnReceiver = true;
-    }
+    collationKeys.add(new RexExpression.InputRef(0));
+    collationDirections.add(RelFieldCollation.Direction.ASCENDING);
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
-    MailboxReceiveOperator receiveOp =
-        new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
-            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
-            null);
+    SortedMailboxReceiveOperator receiveOp =
+        new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
+            RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirections, false, true, inSchema, stageId,
+            DEFAULT_RECEIVER_STAGE_ID, null);
     // Receive error block from first server.
     TransferableBlock receivedBlock = receiveOp.nextBlock();
     Assert.assertTrue(receivedBlock.isErrorBlock());
@@ -783,24 +732,19 @@ public class MailboxReceiveOperatorTest {
     Mockito.when(_mailbox2.isClosed()).thenReturn(false);
     Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3));
 
-    // Choose whether to exercise the ordering code path or not
     List<RexExpression> collationKeys = new ArrayList<>();
     List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
-    boolean sortOnReceiver = false;
-    if (_random.nextBoolean()) {
-      collationKeys.add(new RexExpression.InputRef(0));
-      collationDirections.add(RelFieldCollation.Direction.ASCENDING);
-      sortOnReceiver = true;
-    }
+    collationKeys.add(new RexExpression.InputRef(0));
+    collationDirections.add(RelFieldCollation.Direction.ASCENDING);
 
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
-    MailboxReceiveOperator receiveOp =
-        new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
-            collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
-            null);
+    SortedMailboxReceiveOperator receiveOp =
+        new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
+            RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirections, false, true, inSchema, stageId,
+            DEFAULT_RECEIVER_STAGE_ID, null);
     TransferableBlock receivedBlock = receiveOp.nextBlock();
     Assert.assertTrue(receivedBlock.isErrorBlock(), "server-1 should have returned an error-block");
   }
@@ -856,9 +800,9 @@ public class MailboxReceiveOperatorTest {
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
-    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
-        RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirection, false, true, inSchema, stageId,
-        DEFAULT_RECEIVER_STAGE_ID, null);
+    SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
+        ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirection,
+        false, true, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID, null);
 
     // Receive a set of no-op blocks and skip over them
     TransferableBlock receivedBlock = receiveOp.nextBlock();
@@ -940,9 +884,9 @@ public class MailboxReceiveOperatorTest {
         new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
             Long.MAX_VALUE, new HashMap<>());
 
-    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
-        RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirection, false, true, inSchema, stageId,
-        DEFAULT_RECEIVER_STAGE_ID, null);
+    SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
+        ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirection,
+        false, true, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID, null);
 
     // Receive a set of no-op blocks and skip over them
     TransferableBlock receivedBlock = receiveOp.nextBlock();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org