You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/10/19 15:16:22 UTC

[pinot] branch master updated: [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance (#9484)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 510051a011 [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance (#9484)
510051a011 is described below

commit 510051a011114c3129acc8307275b7715f087c57
Author: Ankit Sultana <an...@uber.com>
AuthorDate: Wed Oct 19 20:46:15 2022 +0530

    [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance (#9484)
    
    Features changes:
    1. `MailboxService` uses `TransferableBlock`. The serde to `MailboxContent` is moved within GrpcMailbox.
    2. Adds a new `InMemoryMailboxService`. If the stage is local, then the `TransferableBlock` can be passed from sender to receiver simply using a queue.
    3. Adds a `MultiplexingMailboxService` which can choose either gRPC based mailbox-service or in-memory mailbox-service.
    
    Optimizations:
    1. This avoids MailboxContent serde if sender/receiver stages are local. This can save ~10% latency overhead
      - exact number varies depending on query pattern
      - most helpful if the stages deal with large data
    2. Saves memory. If we use gRPC mailbox then we'll copy the buffer to create MailboxContent.
    3. will enable possibility to avoid `List<Object[]>` ==> `DataTable` ==> `List<Object[]>` serde with #9561
---
 .../MultiStageBrokerRequestHandler.java            |   8 +-
 .../pinot/query/mailbox/GrpcMailboxService.java    |  16 ++--
 .../pinot/query/mailbox/GrpcReceivingMailbox.java  |  54 +++++++++--
 .../pinot/query/mailbox/GrpcSendingMailbox.java    |  25 ++++-
 .../query/mailbox/InMemoryMailboxService.java      |  95 +++++++++++++++++++
 .../query/mailbox/InMemoryReceivingMailbox.java    |  66 +++++++++++++
 .../query/mailbox/InMemorySendingMailbox.java      |  56 +++++++++++
 .../pinot/query/mailbox/MailboxIdentifier.java     |   7 ++
 .../apache/pinot/query/mailbox/MailboxService.java |   4 +-
 .../query/mailbox/MultiplexingMailboxService.java  |  86 +++++++++++++++++
 .../pinot/query/mailbox/ReceivingMailbox.java      |   5 +-
 .../query/mailbox/StringMailboxIdentifier.java     |   5 +
 .../channel/MailboxContentStreamObserver.java      |  10 +-
 .../apache/pinot/query/runtime/QueryRunner.java    |   7 +-
 .../runtime/executor/PhysicalPlanVisitor.java      |   7 +-
 .../runtime/executor/WorkerQueryExecutor.java      |   5 +-
 .../runtime/operator/MailboxReceiveOperator.java   |  34 ++-----
 .../runtime/operator/MailboxSendOperator.java      |  37 ++------
 .../pinot/query/service/QueryDispatcher.java       |   5 +-
 .../query/mailbox/GrpcMailboxServiceTest.java      |  84 +++++++++--------
 .../query/mailbox/InMemoryMailboxServiceTest.java  | 104 +++++++++++++++++++++
 .../mailbox/MultiplexingMailboxServiceTest.java    |  90 ++++++++++++++++++
 22 files changed, 679 insertions(+), 131 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 238ce36570..0b1a67efbb 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -34,7 +34,6 @@ import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
-import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.response.BrokerResponse;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
@@ -44,10 +43,11 @@ import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.QueryEnvironment;
 import org.apache.pinot.query.catalog.PinotCatalog;
-import org.apache.pinot.query.mailbox.GrpcMailboxService;
 import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.MultiplexingMailboxService;
 import org.apache.pinot.query.planner.QueryPlan;
 import org.apache.pinot.query.routing.WorkerManager;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.service.QueryConfig;
 import org.apache.pinot.query.service.QueryDispatcher;
 import org.apache.pinot.query.type.TypeFactory;
@@ -67,7 +67,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
   private final String _reducerHostname;
   private final int _reducerPort;
 
-  private final MailboxService<Mailbox.MailboxContent> _mailboxService;
+  private final MailboxService<TransferableBlock> _mailboxService;
   private final QueryEnvironment _queryEnvironment;
   private final QueryDispatcher _queryDispatcher;
 
@@ -91,7 +91,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
         CalciteSchemaBuilder.asRootSchema(new PinotCatalog(tableCache)),
         new WorkerManager(_reducerHostname, _reducerPort, routingManager));
     _queryDispatcher = new QueryDispatcher();
-    _mailboxService = new GrpcMailboxService(_reducerHostname, _reducerPort, config);
+    _mailboxService = MultiplexingMailboxService.newInstance(_reducerHostname, _reducerPort, config);
 
     // TODO: move this to a startUp() function.
     _mailboxService.start();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
index bdc36a7571..f12a520c2e 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
@@ -20,8 +20,8 @@ package org.apache.pinot.query.mailbox;
 
 import io.grpc.ManagedChannel;
 import java.util.concurrent.ConcurrentHashMap;
-import org.apache.pinot.common.proto.Mailbox.MailboxContent;
 import org.apache.pinot.query.mailbox.channel.ChannelManager;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.spi.env.PinotConfiguration;
 
 
@@ -42,16 +42,16 @@ import org.apache.pinot.spi.env.PinotConfiguration;
  *   to open 2 mailboxes, they should use {job_id}_1 and {job_id}_2 to distinguish the 2 different mailbox.</li>
  * </ul>
  */
-public class GrpcMailboxService implements MailboxService<MailboxContent> {
+public class GrpcMailboxService implements MailboxService<TransferableBlock> {
   // channel manager
   private final ChannelManager _channelManager;
   private final String _hostname;
   private final int _mailboxPort;
 
   // maintaining a list of registered mailboxes.
-  private final ConcurrentHashMap<String, ReceivingMailbox<MailboxContent>> _receivingMailboxMap =
+  private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>> _receivingMailboxMap =
       new ConcurrentHashMap<>();
-  private final ConcurrentHashMap<String, SendingMailbox<MailboxContent>> _sendingMailboxMap =
+  private final ConcurrentHashMap<String, SendingMailbox<TransferableBlock>> _sendingMailboxMap =
       new ConcurrentHashMap<>();
 
   public GrpcMailboxService(String hostname, int mailboxPort, PinotConfiguration extraConfig) {
@@ -84,16 +84,16 @@ public class GrpcMailboxService implements MailboxService<MailboxContent> {
    * Register a mailbox, mailbox needs to be registered before use.
    * @param mailboxId the id of the mailbox.
    */
-  public SendingMailbox<MailboxContent> getSendingMailbox(String mailboxId) {
-    return _sendingMailboxMap.computeIfAbsent(mailboxId, (mId) -> new GrpcSendingMailbox(mId, this));
+  public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier mailboxId) {
+    return _sendingMailboxMap.computeIfAbsent(mailboxId.toString(), (mId) -> new GrpcSendingMailbox(mId, this));
   }
 
   /**
    * Register a mailbox, mailbox needs to be registered before use.
    * @param mailboxId the id of the mailbox.
    */
-  public ReceivingMailbox<MailboxContent> getReceivingMailbox(String mailboxId) {
-    return _receivingMailboxMap.computeIfAbsent(mailboxId, (mId) -> new GrpcReceivingMailbox(mId, this));
+  public ReceivingMailbox<TransferableBlock> getReceivingMailbox(MailboxIdentifier mailboxId) {
+    return _receivingMailboxMap.computeIfAbsent(mailboxId.toString(), (mId) -> new GrpcReceivingMailbox(mId, this));
   }
 
   public ManagedChannel getChannel(String mailboxId) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
index 2ce0e54a4e..3ce41f07b1 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
@@ -18,17 +18,25 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.datablock.BaseDataBlock;
+import org.apache.pinot.common.datablock.DataBlockUtils;
+import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.proto.Mailbox.MailboxContent;
 import org.apache.pinot.query.mailbox.channel.MailboxContentStreamObserver;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 
 
 /**
  * GRPC implementation of the {@link ReceivingMailbox}.
  */
-public class GrpcReceivingMailbox implements ReceivingMailbox<MailboxContent> {
+public class GrpcReceivingMailbox implements ReceivingMailbox<TransferableBlock> {
   private static final long DEFAULT_MAILBOX_INIT_TIMEOUT = 100L;
   private final GrpcMailboxService _mailboxService;
   private final String _mailboxId;
@@ -50,15 +58,24 @@ public class GrpcReceivingMailbox implements ReceivingMailbox<MailboxContent> {
     }
   }
 
+  /**
+   * Polls the underlying channel and converts the received data into a TransferableBlock. This may return null in the
+   * following cases:
+   *
+   * <p>
+   *  1. If the mailbox hasn't initialized yet. This means we haven't received any data yet.
+   *  2. If the received block from the sender is a data-block with 0 rows.
+   * </p>
+   */
   @Override
-  public MailboxContent receive()
+  public TransferableBlock receive()
       throws Exception {
-    MailboxContent mailboxContent = null;
-    if (waitForInitialize()) {
-      mailboxContent = _contentStreamObserver.poll();
-      _totalMsgReceived.incrementAndGet();
+    if (!waitForInitialize()) {
+      return null;
     }
-    return mailboxContent;
+    MailboxContent mailboxContent = _contentStreamObserver.poll();
+    _totalMsgReceived.incrementAndGet();
+    return fromMailboxContent(mailboxContent);
   }
 
   @Override
@@ -85,4 +102,27 @@ public class GrpcReceivingMailbox implements ReceivingMailbox<MailboxContent> {
   public String getMailboxId() {
     return _mailboxId;
   }
+
+  /**
+   * Converts the data sent by a {@link GrpcSendingMailbox} to a {@link TransferableBlock}.
+   *
+   * @param mailboxContent data sent by a GrpcSendingMailbox.
+   * @return null if the received MailboxContent is a data-block with 0 rows.
+   * @throws IOException if the MailboxContent cannot be converted to a TransferableBlock.
+   */
+  @Nullable
+  private TransferableBlock fromMailboxContent(MailboxContent mailboxContent)
+      throws IOException {
+    ByteBuffer byteBuffer = mailboxContent.getPayload().asReadOnlyByteBuffer();
+    if (byteBuffer.hasRemaining()) {
+      BaseDataBlock dataBlock = DataBlockUtils.getDataBlock(byteBuffer);
+      if (dataBlock instanceof MetadataBlock && !dataBlock.getExceptions().isEmpty()) {
+        return TransferableBlockUtils.getErrorTransferableBlock(dataBlock.getExceptions());
+      }
+      if (dataBlock.getNumberOfRows() > 0) {
+        return new TransferableBlock(dataBlock);
+      }
+    }
+    return null;
+  }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index 506d66e01c..cffbd21e65 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -18,18 +18,25 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import com.google.protobuf.ByteString;
 import io.grpc.ManagedChannel;
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pinot.common.datablock.BaseDataBlock;
+import org.apache.pinot.common.datablock.MetadataBlock;
+import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.proto.Mailbox.MailboxContent;
 import org.apache.pinot.common.proto.PinotMailboxGrpc;
 import org.apache.pinot.query.mailbox.channel.ChannelUtils;
 import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
 
 /**
  * GRPC implementation of the {@link SendingMailbox}.
  */
-public class GrpcSendingMailbox implements SendingMailbox<MailboxContent> {
+public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
   private final GrpcMailboxService _mailboxService;
   private final String _mailboxId;
   private final AtomicBoolean _initialized = new AtomicBoolean(false);
@@ -58,12 +65,13 @@ public class GrpcSendingMailbox implements SendingMailbox<MailboxContent> {
   }
 
   @Override
-  public void send(MailboxContent data)
+  public void send(TransferableBlock block)
       throws UnsupportedOperationException {
     if (!_initialized.get()) {
       // initialization is special
       init();
     }
+    MailboxContent data = toMailboxContent(block.getDataBlock());
     _statusStreamObserver.send(data);
     _totalMsgSent.incrementAndGet();
   }
@@ -77,4 +85,17 @@ public class GrpcSendingMailbox implements SendingMailbox<MailboxContent> {
   public String getMailboxId() {
     return _mailboxId;
   }
+
+  private MailboxContent toMailboxContent(BaseDataBlock dataBlock) {
+    try {
+      Mailbox.MailboxContent.Builder builder = Mailbox.MailboxContent.newBuilder().setMailboxId(_mailboxId)
+          .setPayload(ByteString.copyFrom(dataBlock.toBytes()));
+      if (dataBlock instanceof MetadataBlock) {
+        builder.putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true");
+      }
+      return builder.build();
+    } catch (IOException e) {
+      throw new RuntimeException("Error converting to mailbox content", e);
+    }
+  }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java
new file mode 100644
index 0000000000..8165c4a5f0
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemoryMailboxService implements MailboxService<TransferableBlock> {
+  // channel manager
+  private final String _hostname;
+  private final int _mailboxPort;
+  static final int DEFAULT_CHANNEL_CAPACITY = 5;
+  // TODO: This should come from a config and should be consistent with the timeout for GrpcMailboxService
+  static final int DEFAULT_CHANNEL_TIMEOUT_SECONDS = 1;
+
+  private final ConcurrentHashMap<String, InMemoryMailboxState> _mailboxStateMap = new ConcurrentHashMap<>();
+
+  public InMemoryMailboxService(String hostname, int mailboxPort) {
+    _hostname = hostname;
+    _mailboxPort = mailboxPort;
+  }
+
+  @Override
+  public void start() {
+  }
+
+  @Override
+  public void shutdown() {
+  }
+
+  @Override
+  public String getHostname() {
+    return _hostname;
+  }
+
+  @Override
+  public int getMailboxPort() {
+    return _mailboxPort;
+  }
+
+  public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier mailboxId) {
+    Preconditions.checkState(mailboxId.isLocal(), "Cannot use in-memory mailbox service for non-local transport");
+    String mId = mailboxId.toString();
+    return _mailboxStateMap.computeIfAbsent(mId, this::newMailboxState)._sendingMailbox;
+  }
+
+  public ReceivingMailbox<TransferableBlock> getReceivingMailbox(MailboxIdentifier mailboxId) {
+    Preconditions.checkState(mailboxId.isLocal(), "Cannot use in-memory mailbox service for non-local transport");
+    String mId = mailboxId.toString();
+    return _mailboxStateMap.computeIfAbsent(mId, this::newMailboxState)._receivingMailbox;
+  }
+
+  InMemoryMailboxState newMailboxState(String mailboxId) {
+    BlockingQueue<TransferableBlock> queue = createDefaultChannel();
+    return new InMemoryMailboxState(new InMemorySendingMailbox(mailboxId, queue),
+        new InMemoryReceivingMailbox(mailboxId, queue), queue);
+  }
+
+  private ArrayBlockingQueue<TransferableBlock> createDefaultChannel() {
+    return new ArrayBlockingQueue<>(DEFAULT_CHANNEL_CAPACITY);
+  }
+
+  static class InMemoryMailboxState {
+    ReceivingMailbox<TransferableBlock> _receivingMailbox;
+    SendingMailbox<TransferableBlock> _sendingMailbox;
+    BlockingQueue<TransferableBlock> _queue;
+
+    InMemoryMailboxState(SendingMailbox<TransferableBlock> sendingMailbox,
+        ReceivingMailbox<TransferableBlock> receivingMailbox, BlockingQueue<TransferableBlock> queue) {
+      _receivingMailbox = receivingMailbox;
+      _sendingMailbox = sendingMailbox;
+      _queue = queue;
+    }
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java
new file mode 100644
index 0000000000..536312aa02
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemoryReceivingMailbox implements ReceivingMailbox<TransferableBlock> {
+  private final String _mailboxId;
+  private final BlockingQueue<TransferableBlock> _queue;
+  private volatile boolean _closed;
+
+  public InMemoryReceivingMailbox(String mailboxId, BlockingQueue<TransferableBlock> queue) {
+    _mailboxId = mailboxId;
+    _queue = queue;
+    _closed = false;
+  }
+
+  @Override
+  public String getMailboxId() {
+    return _mailboxId;
+  }
+
+  @Override
+  public TransferableBlock receive()
+      throws Exception {
+    TransferableBlock block = _queue.poll(
+        InMemoryMailboxService.DEFAULT_CHANNEL_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    // If the poll timed out, we return a null since MailboxReceiveOperator can continue to check other mailboxes
+    if (block == null) {
+      return null;
+    }
+    if (block.isEndOfStreamBlock()) {
+      _closed = true;
+    }
+    return block;
+  }
+
+  @Override
+  public boolean isInitialized() {
+    return true;
+  }
+
+  @Override
+  public boolean isClosed() {
+    return _closed && _queue.size() == 0;
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
new file mode 100644
index 0000000000..40047a9f2d
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemorySendingMailbox implements SendingMailbox<TransferableBlock> {
+  private final BlockingQueue<TransferableBlock> _queue;
+  private final String _mailboxId;
+
+  public InMemorySendingMailbox(String mailboxId, BlockingQueue<TransferableBlock> queue) {
+    _mailboxId = mailboxId;
+    _queue = queue;
+  }
+
+  @Override
+  public String getMailboxId() {
+    return _mailboxId;
+  }
+
+  @Override
+  public void send(TransferableBlock data)
+      throws UnsupportedOperationException {
+    try {
+      if (!_queue.offer(
+          data, InMemoryMailboxService.DEFAULT_CHANNEL_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
+        throw new RuntimeException(String.format("Timed out when sending block in mailbox=%s", _mailboxId));
+      }
+    } catch (InterruptedException e) {
+      throw new RuntimeException("Interrupted trying to send data through the channel", e);
+    }
+  }
+
+  @Override
+  public void complete() {
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdentifier.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdentifier.java
index ee72494cdc..8646f508ae 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdentifier.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdentifier.java
@@ -54,4 +54,11 @@ public interface MailboxIdentifier {
    * @return receiver port
    */
   int getToPort();
+
+  /**
+   * Checks whether sender and receiver are in the same JVM.
+   *
+   * @return true if sender and receiver are in the same JVM.
+   */
+  boolean isLocal();
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
index 1943b12efb..234dd78e98 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
@@ -58,7 +58,7 @@ public interface MailboxService<T> {
    * @param mailboxId mailbox identifier.
    * @return a receiving mailbox.
    */
-  ReceivingMailbox<T> getReceivingMailbox(String mailboxId);
+  ReceivingMailbox<T> getReceivingMailbox(MailboxIdentifier mailboxId);
 
   /**
    * Look up a sending mailbox by {@link MailboxIdentifier}.
@@ -66,5 +66,5 @@ public interface MailboxService<T> {
    * @param mailboxId mailbox identifier.
    * @return a sending mailbox.
    */
-  SendingMailbox<T> getSendingMailbox(String mailboxId);
+  SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId);
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MultiplexingMailboxService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MultiplexingMailboxService.java
new file mode 100644
index 0000000000..2a7a49c742
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MultiplexingMailboxService.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import com.clearspring.analytics.util.Preconditions;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+
+/**
+ * A wrapper over {@link GrpcMailboxService} and {@link InMemoryMailboxService} that can delegate the data-transfer
+ * to the in-memory mailbox service whenever possible.
+ */
+public class MultiplexingMailboxService implements MailboxService<TransferableBlock> {
+  private final GrpcMailboxService _grpcMailboxService;
+  // TODO: Add config to disable in memory mailbox.
+  private final InMemoryMailboxService _inMemoryMailboxService;
+
+  MultiplexingMailboxService(GrpcMailboxService grpcMailboxService,
+      InMemoryMailboxService inMemoryReceivingMailbox) {
+    Preconditions.checkState(grpcMailboxService.getHostname().equals(inMemoryReceivingMailbox.getHostname()));
+    Preconditions.checkState(grpcMailboxService.getMailboxPort() == inMemoryReceivingMailbox.getMailboxPort());
+    _grpcMailboxService = grpcMailboxService;
+    _inMemoryMailboxService = inMemoryReceivingMailbox;
+  }
+
+  @Override
+  public void start() {
+    _grpcMailboxService.start();
+    _inMemoryMailboxService.start();
+  }
+
+  @Override
+  public void shutdown() {
+    _grpcMailboxService.shutdown();
+    _inMemoryMailboxService.shutdown();
+  }
+
+  @Override
+  public String getHostname() {
+    return _grpcMailboxService.getHostname();
+  }
+
+  @Override
+  public int getMailboxPort() {
+    return _grpcMailboxService.getMailboxPort();
+  }
+
+  @Override
+  public ReceivingMailbox<TransferableBlock> getReceivingMailbox(MailboxIdentifier mailboxId) {
+    if (mailboxId.isLocal()) {
+      return _inMemoryMailboxService.getReceivingMailbox(mailboxId);
+    }
+    return _grpcMailboxService.getReceivingMailbox(mailboxId);
+  }
+
+  @Override
+  public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier mailboxId) {
+    if (mailboxId.isLocal()) {
+      return _inMemoryMailboxService.getSendingMailbox(mailboxId);
+    }
+    return _grpcMailboxService.getSendingMailbox(mailboxId);
+  }
+
+  public static MultiplexingMailboxService newInstance(String hostname, int port,
+      PinotConfiguration pinotConfiguration) {
+    return new MultiplexingMailboxService(new GrpcMailboxService(hostname, port, pinotConfiguration),
+        new InMemoryMailboxService(hostname, port));
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
index 9e17d827f2..ea999807d0 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
@@ -28,14 +28,15 @@ package org.apache.pinot.query.mailbox;
 public interface ReceivingMailbox<T> {
 
   /**
-   * get the unique identifier for the mailbox.
+   * Get the unique identifier for the mailbox.
    *
    * @return Mailbox ID.
    */
   String getMailboxId();
 
   /**
-   * receive a data packet from the mailbox.
+   * Receive a data packet from the mailbox. Depending on the implementation, this may return null. The caller should
+   * use {@link ReceivingMailbox#isClosed()} to determine if the sender is done sending and the channel is closed.
    * @return data packet.
    * @throws Exception
    */
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/StringMailboxIdentifier.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/StringMailboxIdentifier.java
index 21712273e2..c3d3078780 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/StringMailboxIdentifier.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/StringMailboxIdentifier.java
@@ -82,6 +82,11 @@ public class StringMailboxIdentifier implements MailboxIdentifier {
     return _toPort;
   }
 
+  @Override
+  public boolean isLocal() {
+    return _fromHost.equals(_toHost) && _fromPort == _toPort;
+  }
+
   @Override
   public String toString() {
     return _mailboxIdString;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
index 6d89f44130..7ae582c9a9 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.query.mailbox.GrpcMailboxService;
 import org.apache.pinot.query.mailbox.GrpcReceivingMailbox;
+import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,6 +65,12 @@ public class MailboxContentStreamObserver implements StreamObserver<Mailbox.Mail
     _isEnabledFeedback = isEnabledFeedback;
   }
 
+  /**
+   * This method may return null. It can happen if there's a large enough gap between the time the receiver received
+   * the last block and the StreamObserver was detected as complete. In that case, the MailboxReceiveOperator would
+   * try to call this method again but since there's no new content to be received, we'll exit the loop. Returning
+   * null here means that MailboxReceiveOperator won't consider this as an error.
+   */
   public Mailbox.MailboxContent poll() {
     while (!isCompleted()) {
       try {
@@ -85,7 +92,8 @@ public class MailboxContentStreamObserver implements StreamObserver<Mailbox.Mail
   @Override
   public void onNext(Mailbox.MailboxContent mailboxContent) {
     _mailboxId = mailboxContent.getMailboxId();
-    GrpcReceivingMailbox receivingMailbox = (GrpcReceivingMailbox) _mailboxService.getReceivingMailbox(_mailboxId);
+    GrpcReceivingMailbox receivingMailbox =
+        (GrpcReceivingMailbox) _mailboxService.getReceivingMailbox(new StringMailboxIdentifier(_mailboxId));
     receivingMailbox.init(this);
     if (!mailboxContent.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY)) {
       // when the receiving end receives a message put it in the mailbox queue.
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index a19658c586..1ff15eb189 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -32,7 +32,6 @@ import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
@@ -41,8 +40,8 @@ import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.transport.ServerInstance;
-import org.apache.pinot.query.mailbox.GrpcMailboxService;
 import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.MultiplexingMailboxService;
 import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.query.planner.stage.MailboxSendNode;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -68,7 +67,7 @@ public class QueryRunner {
   private WorkerQueryExecutor _workerExecutor;
   private HelixManager _helixManager;
   private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
-  private MailboxService<Mailbox.MailboxContent> _mailboxService;
+  private MailboxService<TransferableBlock> _mailboxService;
   private String _hostname;
   private int _port;
 
@@ -84,7 +83,7 @@ public class QueryRunner {
     _port = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, QueryConfig.DEFAULT_QUERY_RUNNER_PORT);
     _helixManager = helixManager;
     try {
-      _mailboxService = new GrpcMailboxService(_hostname, _port, config);
+      _mailboxService = MultiplexingMailboxService.newInstance(_hostname, _port, config);
       _serverExecutor = new ServerQueryExecutorV1Impl();
       _serverExecutor.init(config, instanceDataManager, serverMetrics);
       _workerExecutor = new WorkerQueryExecutor();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanVisitor.java
index b4cc73d5ac..6af229aa78 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanVisitor.java
@@ -21,7 +21,6 @@ package org.apache.pinot.query.runtime.executor;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
-import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.mailbox.MailboxService;
@@ -60,18 +59,18 @@ import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
  */
 public class PhysicalPlanVisitor implements StageNodeVisitor<Operator<TransferableBlock>, Void> {
 
-  private final MailboxService<Mailbox.MailboxContent> _mailboxService;
+  private final MailboxService<TransferableBlock> _mailboxService;
   private final String _hostName;
   private final int _port;
   private final long _requestId;
   private final Map<Integer, StageMetadata> _metadataMap;
 
-  public static Operator<TransferableBlock> build(MailboxService<Mailbox.MailboxContent> mailboxService,
+  public static Operator<TransferableBlock> build(MailboxService<TransferableBlock> mailboxService,
       String hostName, int port, long requestId, Map<Integer, StageMetadata> metadataMap, StageNode node) {
     return node.visit(new PhysicalPlanVisitor(mailboxService, hostName, port, requestId, metadataMap), null);
   }
 
-  private PhysicalPlanVisitor(MailboxService<Mailbox.MailboxContent> mailboxService, String hostName, int port,
+  private PhysicalPlanVisitor(MailboxService<TransferableBlock> mailboxService, String hostName, int port,
       long requestId, Map<Integer, StageMetadata> metadataMap) {
     _mailboxService = mailboxService;
     _hostName = hostName;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index 5fb7d05d69..08de5ca7e0 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -21,7 +21,6 @@ package org.apache.pinot.query.runtime.executor;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.request.context.ThreadTimer;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.util.trace.TraceRunnable;
@@ -45,12 +44,12 @@ public class WorkerQueryExecutor {
   private static final Logger LOGGER = LoggerFactory.getLogger(WorkerQueryExecutor.class);
   private PinotConfiguration _config;
   private ServerMetrics _serverMetrics;
-  private MailboxService<Mailbox.MailboxContent> _mailboxService;
+  private MailboxService<TransferableBlock> _mailboxService;
   private String _hostName;
   private int _port;
 
   public void init(PinotConfiguration config, ServerMetrics serverMetrics,
-      MailboxService<Mailbox.MailboxContent> mailboxService, String hostName, int port) {
+      MailboxService<TransferableBlock> mailboxService, String hostName, int port) {
     _config = config;
     _serverMetrics = serverMetrics;
     _mailboxService = mailboxService;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 8f549bdaa5..781eaccc2c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -19,20 +19,16 @@
 package org.apache.pinot.query.runtime.operator;
 
 import com.google.common.base.Preconditions;
-import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
-import org.apache.pinot.common.datablock.BaseDataBlock;
-import org.apache.pinot.common.datablock.DataBlockUtils;
-import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.ReceivingMailbox;
 import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
@@ -52,7 +48,7 @@ public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
   private static final Logger LOGGER = LoggerFactory.getLogger(MailboxReceiveOperator.class);
   private static final String EXPLAIN_NAME = "MAILBOX_RECEIVE";
 
-  private final MailboxService<Mailbox.MailboxContent> _mailboxService;
+  private final MailboxService<TransferableBlock> _mailboxService;
   private final RelDistribution.Type _exchangeType;
   private final KeySelector<Object[], Object[]> _keySelector;
   private final List<ServerInstance> _sendingStageInstances;
@@ -64,7 +60,7 @@ public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
   private final long _timeout;
   private TransferableBlock _upstreamErrorBlock;
 
-  public MailboxReceiveOperator(MailboxService<Mailbox.MailboxContent> mailboxService, DataSchema dataSchema,
+  public MailboxReceiveOperator(MailboxService<TransferableBlock> mailboxService, DataSchema dataSchema,
       List<ServerInstance> sendingStageInstances, RelDistribution.Type exchangeType,
       KeySelector<Object[], Object[]> keySelector, String hostName, int port, long jobId, int stageId) {
     _dataSchema = dataSchema;
@@ -123,26 +119,16 @@ public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
       hasOpenedMailbox = false;
       for (ServerInstance sendingInstance : _sendingStageInstances) {
         try {
-          ReceivingMailbox<Mailbox.MailboxContent> receivingMailbox =
+          ReceivingMailbox<TransferableBlock> receivingMailbox =
               _mailboxService.getReceivingMailbox(toMailboxId(sendingInstance));
           // TODO this is not threadsafe.
           // make sure only one thread is checking receiving mailbox and calling receive() then close()
           if (!receivingMailbox.isClosed()) {
             hasOpenedMailbox = true;
-            Mailbox.MailboxContent mailboxContent = receivingMailbox.receive();
-            if (mailboxContent != null) {
-              ByteBuffer byteBuffer = mailboxContent.getPayload().asReadOnlyByteBuffer();
-              if (byteBuffer.hasRemaining()) {
-                BaseDataBlock dataBlock = DataBlockUtils.getDataBlock(byteBuffer);
-                if (dataBlock instanceof MetadataBlock && !dataBlock.getExceptions().isEmpty()) {
-                  _upstreamErrorBlock = TransferableBlockUtils.getErrorTransferableBlock(dataBlock.getExceptions());
-                  return _upstreamErrorBlock;
-                }
-                if (dataBlock.getNumberOfRows() > 0) {
-                  // here we only return data table block when it is not empty.
-                  return new TransferableBlock(dataBlock);
-                }
-              }
+            TransferableBlock transferableBlock = receivingMailbox.receive();
+            if (transferableBlock != null && !transferableBlock.isEndOfStreamBlock()) {
+              // Return the block only if it has some valid data
+              return transferableBlock;
             }
           }
         } catch (Exception e) {
@@ -162,8 +148,8 @@ public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
     return _exchangeType;
   }
 
-  private String toMailboxId(ServerInstance serverInstance) {
+  private MailboxIdentifier toMailboxId(ServerInstance serverInstance) {
     return new StringMailboxIdentifier(String.format("%s_%s", _jobId, _stageId), serverInstance.getHostname(),
-        serverInstance.getQueryMailboxPort(), _hostName, _port).toString();
+        serverInstance.getQueryMailboxPort(), _hostName, _port);
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index db3bbe8610..a53864ab56 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -20,8 +20,6 @@ package org.apache.pinot.query.runtime.operator;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
-import com.google.protobuf.ByteString;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -31,15 +29,14 @@ import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.common.datablock.BaseDataBlock;
-import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
-import org.apache.pinot.query.mailbox.channel.ChannelUtils;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -68,11 +65,11 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
   private final int _serverPort;
   private final long _jobId;
   private final int _stageId;
-  private final MailboxService<Mailbox.MailboxContent> _mailboxService;
+  private final MailboxService<TransferableBlock> _mailboxService;
   private final DataSchema _dataSchema;
   private Operator<TransferableBlock> _dataTableBlockBaseOperator;
 
-  public MailboxSendOperator(MailboxService<Mailbox.MailboxContent> mailboxService, DataSchema dataSchema,
+  public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService, DataSchema dataSchema,
       Operator<TransferableBlock> dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances,
       RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector, String hostName, int port,
       long jobId, int stageId) {
@@ -187,8 +184,7 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
   }
 
   private void sendDataTableBlockToServers(List<ServerInstance> servers, TransferableBlock transferableBlock,
-      BaseDataBlock.Type type, boolean isEndOfStream)
-      throws IOException {
+      BaseDataBlock.Type type, boolean isEndOfStream) {
     if (isEndOfStream) {
       for (ServerInstance server : servers) {
         sendDataTableBlock(server, transferableBlock, true);
@@ -206,30 +202,17 @@ public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
   }
 
   private void sendDataTableBlock(ServerInstance serverInstance, TransferableBlock transferableBlock,
-      boolean isEndOfStream)
-      throws IOException {
-    String mailboxId = toMailboxId(serverInstance);
-    SendingMailbox<Mailbox.MailboxContent> sendingMailbox = _mailboxService.getSendingMailbox(mailboxId);
-    Mailbox.MailboxContent mailboxContent = toMailboxContent(mailboxId, transferableBlock.getDataBlock().toBytes(),
-        isEndOfStream);
-    sendingMailbox.send(mailboxContent);
+      boolean isEndOfStream) {
+    MailboxIdentifier mailboxId = toMailboxId(serverInstance);
+    SendingMailbox<TransferableBlock> sendingMailbox = _mailboxService.getSendingMailbox(mailboxId);
+    sendingMailbox.send(transferableBlock);
     if (isEndOfStream) {
       sendingMailbox.complete();
     }
   }
 
-  private Mailbox.MailboxContent toMailboxContent(String mailboxId, byte[] dataBlockBytes, boolean isMetadataBlock)
-      throws IOException {
-    Mailbox.MailboxContent.Builder builder =
-        Mailbox.MailboxContent.newBuilder().setMailboxId(mailboxId).setPayload(ByteString.copyFrom(dataBlockBytes));
-    if (isMetadataBlock) {
-      builder.putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true");
-    }
-    return builder.build();
-  }
-
-  private String toMailboxId(ServerInstance serverInstance) {
+  private StringMailboxIdentifier toMailboxId(ServerInstance serverInstance) {
     return new StringMailboxIdentifier(String.format("%s_%s", _jobId, _stageId), _serverHostName, _serverPort,
-        serverInstance.getHostname(), serverInstance.getQueryMailboxPort()).toString();
+        serverInstance.getHostname(), serverInstance.getQueryMailboxPort());
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index 700881e04e..b29700dde0 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -32,7 +32,6 @@ import org.apache.pinot.common.datablock.BaseDataBlock;
 import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
 import org.apache.pinot.common.proto.Worker;
 import org.apache.pinot.common.response.broker.ResultTable;
@@ -65,7 +64,7 @@ public class QueryDispatcher {
   }
 
   public ResultTable submitAndReduce(long requestId, QueryPlan queryPlan,
-      MailboxService<Mailbox.MailboxContent> mailboxService, long timeoutNano)
+      MailboxService<TransferableBlock> mailboxService, long timeoutNano)
       throws Exception {
     // submit all the distributed stages.
     int reduceStageId = submit(requestId, queryPlan);
@@ -199,7 +198,7 @@ public class QueryDispatcher {
   }
 
   @VisibleForTesting
-  public static MailboxReceiveOperator createReduceStageOperator(MailboxService<Mailbox.MailboxContent> mailboxService,
+  public static MailboxReceiveOperator createReduceStageOperator(MailboxService<TransferableBlock> mailboxService,
       List<ServerInstance> sendingInstances, long jobId, int stageId, DataSchema dataSchema, String hostname,
       int port) {
     MailboxReceiveOperator mailboxReceiveOperator =
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
index 5c1e952c5d..4fd942068e 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
@@ -19,17 +19,12 @@
 package org.apache.pinot.query.mailbox;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.protobuf.ByteString;
-import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.datablock.BaseDataBlock;
-import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.datablock.MetadataBlock;
-import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.query.mailbox.channel.ChannelUtils;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
@@ -38,28 +33,31 @@ import org.testng.annotations.Test;
 
 public class GrpcMailboxServiceTest extends GrpcMailboxServiceTestBase {
 
+  private static final DataSchema TEST_DATA_SCHEMA = new DataSchema(new String[]{"foo", "bar"},
+      new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING});
+
   @Test
   public void testHappyPath()
       throws Exception {
     Preconditions.checkState(_mailboxServices.size() >= 2);
     Map.Entry<Integer, GrpcMailboxService> sender = _mailboxServices.firstEntry();
     Map.Entry<Integer, GrpcMailboxService> receiver = _mailboxServices.lastEntry();
-    String mailboxId =
-        String.format("happyPath:localhost:%d:localhost:%d", sender.getKey(), receiver.getKey());
-    SendingMailbox<Mailbox.MailboxContent> sendingMailbox = sender.getValue().getSendingMailbox(mailboxId);
-    ReceivingMailbox<Mailbox.MailboxContent> receivingMailbox = receiver.getValue().getReceivingMailbox(mailboxId);
+    StringMailboxIdentifier mailboxId = new StringMailboxIdentifier(
+        "happypath", "localhost", sender.getKey(), "localhost", receiver.getKey());
+    SendingMailbox<TransferableBlock> sendingMailbox = sender.getValue().getSendingMailbox(mailboxId);
+    ReceivingMailbox<TransferableBlock> receivingMailbox = receiver.getValue().getReceivingMailbox(mailboxId);
 
     // create mock object
-    Mailbox.MailboxContent testContent = getTestMailboxContent(mailboxId);
-    sendingMailbox.send(testContent);
+    TransferableBlock testBlock = getTestTransferableBlock();
+    sendingMailbox.send(testBlock);
 
     // wait for receiving mailbox to be created.
     TestUtils.waitForCondition(aVoid -> {
       return receivingMailbox.isInitialized();
     }, 5000L, "Receiving mailbox initialize failed!");
 
-    Mailbox.MailboxContent receivedContent = receivingMailbox.receive();
-    Assert.assertEquals(receivedContent, testContent);
+    TransferableBlock receivedBlock = receivingMailbox.receive();
+    Assert.assertEquals(receivedBlock.getDataBlock().toBytes(), testBlock.getDataBlock().toBytes());
 
     sendingMailbox.complete();
 
@@ -68,19 +66,23 @@ public class GrpcMailboxServiceTest extends GrpcMailboxServiceTestBase {
     }, 5000L, "Receiving mailbox is not closed properly!");
   }
 
+  /**
+   * Simulates a case where the sender tries to send a very large message. The receiver should receive a
+   * MetadataBlock with an exception to indicate failure.
+   */
   @Test
   public void testGrpcException()
       throws Exception {
     Preconditions.checkState(_mailboxServices.size() >= 2);
     Map.Entry<Integer, GrpcMailboxService> sender = _mailboxServices.firstEntry();
     Map.Entry<Integer, GrpcMailboxService> receiver = _mailboxServices.lastEntry();
-    String mailboxId =
-        String.format("exception:localhost:%d:localhost:%d", sender.getKey(), receiver.getKey());
-    SendingMailbox<Mailbox.MailboxContent> sendingMailbox = sender.getValue().getSendingMailbox(mailboxId);
-    ReceivingMailbox<Mailbox.MailboxContent> receivingMailbox = receiver.getValue().getReceivingMailbox(mailboxId);
+    StringMailboxIdentifier mailboxId = new StringMailboxIdentifier(
+        "exception", "localhost", sender.getKey(), "localhost", receiver.getKey());
+    GrpcSendingMailbox sendingMailbox = (GrpcSendingMailbox) sender.getValue().getSendingMailbox(mailboxId);
+    GrpcReceivingMailbox receivingMailbox = (GrpcReceivingMailbox) receiver.getValue().getReceivingMailbox(mailboxId);
 
     // create mock object
-    Mailbox.MailboxContent testContent = getTooLargeMailboxContent(mailboxId);
+    TransferableBlock testContent = getTooLargeTransferableBlock();
     sendingMailbox.send(testContent);
 
     // wait for receiving mailbox to be created.
@@ -88,30 +90,32 @@ public class GrpcMailboxServiceTest extends GrpcMailboxServiceTestBase {
       return receivingMailbox.isInitialized();
     }, 5000L, "Receiving mailbox initialize failed!");
 
-    Mailbox.MailboxContent receivedContent = receivingMailbox.receive();
+    TransferableBlock receivedContent = receivingMailbox.receive();
     Assert.assertNotNull(receivedContent);
-    ByteBuffer byteBuffer = receivedContent.getPayload().asReadOnlyByteBuffer();
-    Assert.assertTrue(byteBuffer.hasRemaining());
-    BaseDataBlock dataBlock = DataBlockUtils.getDataBlock(byteBuffer);
-    Assert.assertTrue(dataBlock instanceof MetadataBlock && !dataBlock.getExceptions().isEmpty());
+    BaseDataBlock receivedDataBlock = receivedContent.getDataBlock();
+    Assert.assertTrue(receivedDataBlock instanceof MetadataBlock);
+    Assert.assertFalse(receivedDataBlock.getExceptions().isEmpty());
+  }
+
+  private TransferableBlock getTestTransferableBlock() {
+    List<Object[]> rows = new ArrayList<>();
+    rows.add(createRow(0, "test_string"));
+    return new TransferableBlock(rows, TEST_DATA_SCHEMA, BaseDataBlock.Type.ROW);
   }
 
-  private Mailbox.MailboxContent getTestMailboxContent(String mailboxId)
-      throws IOException {
-    return Mailbox.MailboxContent.newBuilder().setMailboxId(mailboxId)
-        .putAllMetadata(ImmutableMap.of("key", "value", ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true"))
-        .setPayload(ByteString.copyFrom(new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(new DataSchema(
-            new String[]{"foo", "bar"},
-            new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING}))
-        ).getDataBlock().toBytes()))
-        .build();
+  private TransferableBlock getTooLargeTransferableBlock() {
+    final int size = 1_000_000;
+    List<Object[]> rows = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      rows.add(createRow(0, "test_string"));
+    }
+    return new TransferableBlock(rows, TEST_DATA_SCHEMA, BaseDataBlock.Type.ROW);
   }
 
-  private Mailbox.MailboxContent getTooLargeMailboxContent(String mailboxId)
-      throws IOException {
-    return Mailbox.MailboxContent.newBuilder().setMailboxId(mailboxId)
-        .putAllMetadata(ImmutableMap.of("key", "value", ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true"))
-        .setPayload(ByteString.copyFrom(new byte[16_000_000]))
-        .build();
+  private Object[] createRow(int intValue, String stringValue) {
+    Object[] row = new Object[2];
+    row[0] = intValue;
+    row[1] = stringValue;
+    return row;
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
new file mode 100644
index 0000000000..6f41434e03
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.datablock.BaseDataBlock;
+import org.apache.pinot.common.datablock.DataBlockUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class InMemoryMailboxServiceTest {
+
+  private static final DataSchema TEST_DATA_SCHEMA = new DataSchema(new String[]{"foo", "bar"},
+      new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING});
+
+  @Test
+  public void testHappyPath()
+      throws Exception {
+    InMemoryMailboxService mailboxService = new InMemoryMailboxService("localhost", 0);
+    final StringMailboxIdentifier mailboxId = new StringMailboxIdentifier(
+        "happyPathJob", "localhost", 0, "localhost", 0);
+    InMemoryReceivingMailbox receivingMailbox = (InMemoryReceivingMailbox) mailboxService.getReceivingMailbox(
+        mailboxId);
+    InMemorySendingMailbox sendingMailbox = (InMemorySendingMailbox) mailboxService.getSendingMailbox(mailboxId);
+
+    // Sends are non-blocking as long as channel capacity is not breached
+    for (int i = 0; i < InMemoryMailboxService.DEFAULT_CHANNEL_CAPACITY; i++) {
+      sendingMailbox.send(getTestTransferableBlock(i, i + 1 == InMemoryMailboxService.DEFAULT_CHANNEL_CAPACITY));
+    }
+    sendingMailbox.complete();
+
+    // Iterate 1 less time than the loop above
+    for (int i = 0; i + 1 < InMemoryMailboxService.DEFAULT_CHANNEL_CAPACITY; i++) {
+      TransferableBlock receivedBlock = receivingMailbox.receive();
+      List<Object[]> receivedContainer = receivedBlock.getContainer();
+      Assert.assertEquals(receivedContainer.size(), 1);
+      Object[] row = receivedContainer.get(0);
+      Assert.assertEquals(row.length, 2);
+      Assert.assertEquals((int) row[0], i);
+
+      // Receiving mailbox is considered closed if the underlying channel is closed AND the channel is empty, i.e.
+      // all the queued blocks are consumed.
+      Assert.assertFalse(receivingMailbox.isClosed());
+    }
+    // Receive the last block
+    Assert.assertTrue(receivingMailbox.receive().isEndOfStreamBlock());
+    Assert.assertTrue(receivingMailbox.isClosed());
+  }
+
+  /**
+   * Mailbox receiver/sender won't be created if the mailbox-id is not local.
+   */
+  @Test
+  public void testNonLocalMailboxId() {
+    InMemoryMailboxService mailboxService = new InMemoryMailboxService("localhost", 0);
+    final StringMailboxIdentifier mailboxId = new StringMailboxIdentifier(
+        "happyPathJob", "localhost", 0, "localhost", 1);
+
+    // Test getReceivingMailbox
+    try {
+      mailboxService.getReceivingMailbox(mailboxId);
+      Assert.fail("Method call above should have failed");
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(e.getMessage().contains("non-local transport"));
+    }
+
+    // Test getSendingMailbox
+    try {
+      mailboxService.getSendingMailbox(mailboxId);
+      Assert.fail("Method call above should have failed");
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(e.getMessage().contains("non-local transport"));
+    }
+  }
+
+  private TransferableBlock getTestTransferableBlock(int index, boolean isEndOfStream) {
+    if (isEndOfStream) {
+      return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(TEST_DATA_SCHEMA));
+    }
+    List<Object[]> rows = new ArrayList<>(index);
+    rows.add(new Object[]{index, "test_data"});
+    return new TransferableBlock(rows, TEST_DATA_SCHEMA, BaseDataBlock.Type.ROW);
+  }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java
new file mode 100644
index 0000000000..347fcd0f59
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class MultiplexingMailboxServiceTest {
+  private static final StringMailboxIdentifier LOCAL_MAILBOX_ID = new StringMailboxIdentifier(
+      "localJobId", "localhost", 0, "localhost", 0);
+  private static final StringMailboxIdentifier NON_LOCAL_MAILBOX_ID = new StringMailboxIdentifier(
+      "localJobId", "localhost", 0, "localhost", 1);
+
+  @Test
+  public void testHappyPath() {
+    // Setup mock Grpc and InMemory mailbox service
+    GrpcMailboxService grpcMailboxService = Mockito.mock(GrpcMailboxService.class);
+    InMemoryMailboxService inMemoryMailboxService = Mockito.mock(InMemoryMailboxService.class);
+    Mockito.doReturn("localhost").when(grpcMailboxService).getHostname();
+    Mockito.doReturn("localhost").when(inMemoryMailboxService).getHostname();
+    Mockito.doReturn(1000).when(grpcMailboxService).getMailboxPort();
+    Mockito.doReturn(1000).when(inMemoryMailboxService).getMailboxPort();
+    Mockito.doReturn(Mockito.mock(InMemorySendingMailbox.class)).when(inMemoryMailboxService).getSendingMailbox(
+        Mockito.any());
+    Mockito.doReturn(Mockito.mock(InMemoryReceivingMailbox.class)).when(inMemoryMailboxService).getReceivingMailbox(
+        Mockito.any());
+    Mockito.doReturn(Mockito.mock(GrpcSendingMailbox.class)).when(grpcMailboxService).getSendingMailbox(
+        Mockito.any());
+    Mockito.doReturn(Mockito.mock(GrpcReceivingMailbox.class)).when(grpcMailboxService).getReceivingMailbox(
+        Mockito.any());
+
+    // Create multiplex service with mocks
+    MultiplexingMailboxService multiplexService = new MultiplexingMailboxService(grpcMailboxService,
+        inMemoryMailboxService);
+
+    // Ensure both underlying services are started
+    multiplexService.start();
+    Mockito.verify(grpcMailboxService, Mockito.times(1)).start();
+    Mockito.verify(inMemoryMailboxService, Mockito.times(1)).start();
+
+    // Ensure hostname and ports are returned accurately
+    Assert.assertEquals("localhost", multiplexService.getHostname());
+    Assert.assertEquals(1000, multiplexService.getMailboxPort());
+
+    Assert.assertTrue(multiplexService.getSendingMailbox(LOCAL_MAILBOX_ID) instanceof InMemorySendingMailbox);
+    Assert.assertTrue(multiplexService.getSendingMailbox(NON_LOCAL_MAILBOX_ID) instanceof GrpcSendingMailbox);
+
+    Assert.assertTrue(multiplexService.getReceivingMailbox(LOCAL_MAILBOX_ID) instanceof InMemoryReceivingMailbox);
+    Assert.assertTrue(multiplexService.getReceivingMailbox(NON_LOCAL_MAILBOX_ID) instanceof GrpcReceivingMailbox);
+
+    multiplexService.shutdown();
+    Mockito.verify(grpcMailboxService, Mockito.times(1)).shutdown();
+    Mockito.verify(inMemoryMailboxService, Mockito.times(1)).shutdown();
+  }
+
+  @Test
+  public void testInConsistentHostPort() {
+    // Make the underlying services return different ports
+    GrpcMailboxService grpcMailboxService = Mockito.mock(GrpcMailboxService.class);
+    InMemoryMailboxService inMemoryMailboxService = Mockito.mock(InMemoryMailboxService.class);
+    Mockito.doReturn("localhost").when(grpcMailboxService).getHostname();
+    Mockito.doReturn("localhost").when(inMemoryMailboxService).getHostname();
+    Mockito.doReturn(1000).when(grpcMailboxService).getMailboxPort();
+    Mockito.doReturn(1001).when(inMemoryMailboxService).getMailboxPort();
+
+    try {
+      new MultiplexingMailboxService(grpcMailboxService, inMemoryMailboxService);
+      Assert.fail("Method call above should have failed");
+    } catch (IllegalStateException ignored) {
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org