You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/02/02 23:14:17 UTC

[pinot] branch master updated: [multistage] [bugfix] Fix sending mailbox object leak (#10190)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bbb9455eb [multistage] [bugfix] Fix sending mailbox object leak (#10190)
0bbb9455eb is described below

commit 0bbb9455eb7aa374e0c8b1e2c5ec0a4d6cd7a93e
Author: Yao Liu <ya...@startree.ai>
AuthorDate: Thu Feb 2 15:14:10 2023 -0800

    [multistage] [bugfix] Fix sending mailbox object leak (#10190)
    
    * cancel
    
    * clean up
    
    * error format
    
    * all type support and fix
    
    * fix sending mailbox leak
    
    * fix lint
    
    * address comments
    
    * address comments
---
 .../pinot/query/mailbox/GrpcMailboxService.java    | 19 ++++--
 .../pinot/query/mailbox/GrpcSendingMailbox.java    | 38 +++++-------
 .../query/mailbox/InMemoryMailboxService.java      | 44 ++++----------
 .../query/mailbox/InMemorySendingMailbox.java      |  8 ++-
 .../apache/pinot/query/mailbox/SendingMailbox.java |  2 +
 .../runtime/operator/exchange/BlockExchange.java   | 53 ++++++----------
 .../operator/exchange/BroadcastExchange.java       | 15 +++--
 .../runtime/operator/exchange/HashExchange.java    | 34 +++++------
 .../runtime/operator/exchange/RandomExchange.java  | 20 +++---
 .../operator/exchange/SingletonExchange.java       | 17 +++---
 .../operator/exchange/BlockExchangeTest.java       | 71 +++++++---------------
 .../operator/exchange/BroadcastExchangeTest.java   | 36 +++++------
 .../operator/exchange/HashExchangeTest.java        | 44 ++++++--------
 .../operator/exchange/RandomExchangeTest.java      | 30 +++++----
 .../operator/exchange/SingletonExchangeTest.java   | 27 ++++----
 15 files changed, 190 insertions(+), 268 deletions(-)

diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
index fc42388a11..37fd81b3d2 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
@@ -19,9 +19,14 @@
 package org.apache.pinot.query.mailbox;
 
 import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.function.Consumer;
+import org.apache.pinot.common.proto.Mailbox;
+import org.apache.pinot.common.proto.PinotMailboxGrpc;
 import org.apache.pinot.query.mailbox.channel.ChannelManager;
+import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.spi.env.PinotConfiguration;
 
@@ -52,8 +57,6 @@ public class GrpcMailboxService implements MailboxService<TransferableBlock> {
   // maintaining a list of registered mailboxes.
   private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>> _receivingMailboxMap =
       new ConcurrentHashMap<>();
-  private final ConcurrentHashMap<String, SendingMailbox<TransferableBlock>> _sendingMailboxMap =
-      new ConcurrentHashMap<>();
   private final Consumer<MailboxIdentifier> _gotMailCallback;
 
   public GrpcMailboxService(String hostname, int mailboxPort, PinotConfiguration extraConfig,
@@ -89,7 +92,13 @@ public class GrpcMailboxService implements MailboxService<TransferableBlock> {
    * @param mailboxId the id of the mailbox.
    */
   public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier mailboxId) {
-    return _sendingMailboxMap.computeIfAbsent(mailboxId.toString(), (mId) -> new GrpcSendingMailbox(mId, this));
+    ManagedChannel channel = getChannel(mailboxId.toString());
+    PinotMailboxGrpc.PinotMailboxStub stub = PinotMailboxGrpc.newStub(channel);
+    CountDownLatch latch = new CountDownLatch(1);
+    StreamObserver<Mailbox.MailboxContent> mailboxContentStreamObserver =
+        stub.open(new MailboxStatusStreamObserver(latch));
+    GrpcSendingMailbox mailbox = new GrpcSendingMailbox(mailboxId.toString(), mailboxContentStreamObserver, latch);
+    return mailbox;
   }
 
   /**
@@ -97,8 +106,8 @@ public class GrpcMailboxService implements MailboxService<TransferableBlock> {
    * @param mailboxId the id of the mailbox.
    */
   public ReceivingMailbox<TransferableBlock> getReceivingMailbox(MailboxIdentifier mailboxId) {
-    return _receivingMailboxMap.computeIfAbsent(
-        mailboxId.toString(), (mId) -> new GrpcReceivingMailbox(mId, _gotMailCallback));
+    return _receivingMailboxMap.computeIfAbsent(mailboxId.toString(),
+        (mId) -> new GrpcReceivingMailbox(mId, _gotMailCallback));
   }
 
   public ManagedChannel getChannel(String mailboxId) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index 1f446952a8..d2f4de89c2 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.query.mailbox;
 
 import com.google.protobuf.ByteString;
-import io.grpc.ManagedChannel;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
@@ -30,9 +29,7 @@ import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.proto.Mailbox.MailboxContent;
-import org.apache.pinot.common.proto.PinotMailboxGrpc;
 import org.apache.pinot.query.mailbox.channel.ChannelUtils;
-import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 
 
@@ -40,39 +37,27 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock;
  * GRPC implementation of the {@link SendingMailbox}.
  */
 public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
-  private final GrpcMailboxService _mailboxService;
   private final String _mailboxId;
   private final AtomicBoolean _initialized = new AtomicBoolean(false);
   private final AtomicInteger _totalMsgSent = new AtomicInteger(0);
 
-  private CountDownLatch _finishLatch = new CountDownLatch(1);
+  private final CountDownLatch _finishLatch;
+  private final StreamObserver<MailboxContent> _mailboxContentStreamObserver;
 
-  private StreamObserver<MailboxContent> _mailboxContentStreamObserver;
-
-  public GrpcSendingMailbox(String mailboxId, GrpcMailboxService mailboxService) {
-    _mailboxService = mailboxService;
+  public GrpcSendingMailbox(String mailboxId, StreamObserver<MailboxContent> mailboxContentStreamObserver,
+      CountDownLatch latch) {
     _mailboxId = mailboxId;
+    _mailboxContentStreamObserver = mailboxContentStreamObserver;
+    _finishLatch = latch;
     _initialized.set(false);
   }
 
-  public void init()
-      throws UnsupportedOperationException {
-    ManagedChannel channel = _mailboxService.getChannel(_mailboxId);
-    PinotMailboxGrpc.PinotMailboxStub stub = PinotMailboxGrpc.newStub(channel);
-    _mailboxContentStreamObserver = stub.open(new MailboxStatusStreamObserver(_finishLatch));
-    // TODO: Replace init call with metadata.
-    // send a begin-of-stream message.
-    _mailboxContentStreamObserver.onNext(MailboxContent.newBuilder().setMailboxId(_mailboxId)
-        .putMetadata(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY, "true").build());
-    _initialized.set(true);
-  }
-
   @Override
   public void send(TransferableBlock block)
       throws UnsupportedOperationException {
     if (!_initialized.get()) {
       // initialization is special
-      init();
+      open();
     }
     MailboxContent data = toMailboxContent(block.getDataBlock());
     _mailboxContentStreamObserver.onNext(data);
@@ -84,6 +69,15 @@ public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
     _mailboxContentStreamObserver.onCompleted();
   }
 
+  @Override
+  public void open() {
+    // TODO: Get rid of init call.
+    // send a begin-of-stream message.
+    _mailboxContentStreamObserver.onNext(MailboxContent.newBuilder().setMailboxId(_mailboxId)
+        .putMetadata(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY, "true").build());
+    _initialized.set(true);
+  }
+
   @Override
   public String getMailboxId() {
     return _mailboxId;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java
index 6a6b0bb875..b5bcb9057b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java
@@ -32,7 +32,8 @@ public class InMemoryMailboxService implements MailboxService<TransferableBlock>
   private final int _mailboxPort;
   private final Consumer<MailboxIdentifier> _receivedMailContentCallback;
 
-  private final ConcurrentHashMap<String, InMemoryMailboxState> _mailboxStateMap = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, ReceivingMailbox> _receivingMailbox = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, BlockingQueue> _mailboxQueue = new ConcurrentHashMap<>();
 
   public InMemoryMailboxService(String hostname, int mailboxPort,
       Consumer<MailboxIdentifier> receivedMailContentCallback) {
@@ -41,6 +42,10 @@ public class InMemoryMailboxService implements MailboxService<TransferableBlock>
     _receivedMailContentCallback = receivedMailContentCallback;
   }
 
+  public Consumer<MailboxIdentifier> getReceivedMailContentCallback() {
+    return _receivedMailContentCallback;
+  }
+
   @Override
   public void start() {
   }
@@ -62,24 +67,6 @@ public class InMemoryMailboxService implements MailboxService<TransferableBlock>
   public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier mailboxId) {
     Preconditions.checkState(mailboxId.isLocal(), "Cannot use in-memory mailbox service for non-local transport");
     String mId = mailboxId.toString();
-    return _mailboxStateMap.computeIfAbsent(mId, this::newMailboxState)._sendingMailbox;
-  }
-
-  public ReceivingMailbox<TransferableBlock> getReceivingMailbox(MailboxIdentifier mailboxId) {
-    Preconditions.checkState(mailboxId.isLocal(), "Cannot use in-memory mailbox service for non-local transport");
-    String mId = mailboxId.toString();
-    return _mailboxStateMap.computeIfAbsent(mId, this::newMailboxState)._receivingMailbox;
-  }
-
-  InMemoryMailboxState newMailboxState(String mailboxId) {
-    BlockingQueue<TransferableBlock> queue = createDefaultChannel();
-    return new InMemoryMailboxState(
-        new InMemorySendingMailbox(mailboxId, queue, _receivedMailContentCallback),
-        new InMemoryReceivingMailbox(mailboxId, queue),
-        queue);
-  }
-
-  private BlockingQueue<TransferableBlock> createDefaultChannel() {
     // for now, we use an unbounded blocking queue as the means of communication between
     // in memory mailboxes - the reason for this is that unless we implement flow control,
     // blocks will sit in memory either way (blocking the sender from sending doesn't prevent
@@ -88,19 +75,14 @@ public class InMemoryMailboxService implements MailboxService<TransferableBlock>
     // threads (most importantly, the receiving thread) from running - which can cause unnecessary
     // failure situations
     // TODO: when we implement flow control, we should swap this out with a bounded abstraction
-    return new LinkedBlockingQueue<>();
+    return new InMemorySendingMailbox(mailboxId.toString(),
+        _mailboxQueue.computeIfAbsent(mId, id -> new LinkedBlockingQueue<>()), getReceivedMailContentCallback());
   }
 
-  static class InMemoryMailboxState {
-    ReceivingMailbox<TransferableBlock> _receivingMailbox;
-    SendingMailbox<TransferableBlock> _sendingMailbox;
-    BlockingQueue<TransferableBlock> _queue;
-
-    InMemoryMailboxState(SendingMailbox<TransferableBlock> sendingMailbox,
-        ReceivingMailbox<TransferableBlock> receivingMailbox, BlockingQueue<TransferableBlock> queue) {
-      _receivingMailbox = receivingMailbox;
-      _sendingMailbox = sendingMailbox;
-      _queue = queue;
-    }
+  public ReceivingMailbox<TransferableBlock> getReceivingMailbox(MailboxIdentifier mailboxId) {
+    Preconditions.checkState(mailboxId.isLocal(), "Cannot use in-memory mailbox service for non-local transport");
+    String mId = mailboxId.toString();
+    BlockingQueue mailboxQueue = _mailboxQueue.computeIfAbsent(mId, id -> new LinkedBlockingQueue<>());
+    return _receivingMailbox.computeIfAbsent(mId, id -> new InMemoryReceivingMailbox(mId, mailboxQueue));
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
index 81528f7106..18dcd8ffd3 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
@@ -25,10 +25,12 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 
 
 public class InMemorySendingMailbox implements SendingMailbox<TransferableBlock> {
-  private final BlockingQueue<TransferableBlock> _queue;
   private final Consumer<MailboxIdentifier> _gotMailCallback;
   private final String _mailboxId;
 
+  // TODO: changed to 2-way communication channel.
+  private BlockingQueue<TransferableBlock> _queue;
+
   public InMemorySendingMailbox(String mailboxId, BlockingQueue<TransferableBlock> queue,
       Consumer<MailboxIdentifier> gotMailCallback) {
     _mailboxId = mailboxId;
@@ -36,6 +38,10 @@ public class InMemorySendingMailbox implements SendingMailbox<TransferableBlock>
     _gotMailCallback = gotMailCallback;
   }
 
+  @Override
+  public void open() {
+  }
+
   @Override
   public String getMailboxId() {
     return _mailboxId;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
index 7d7b547324..6cc162b0ec 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
@@ -30,6 +30,8 @@ import java.util.concurrent.TimeUnit;
  */
 public interface SendingMailbox<T> {
 
+  void open();
+
   /**
    * get the unique identifier for the mailbox.
    *
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
index 2309b9dcb1..e6f131b33e 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.runtime.operator.exchange;
 
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import org.apache.calcite.rel.RelDistribution;
@@ -38,23 +39,25 @@ public abstract class BlockExchange {
   // TODO: Deduct this value via grpc config maximum byte size; and make it configurable with override.
   // TODO: Max block size is a soft limit. only counts fixedSize datatable byte buffer
   private static final int MAX_MAILBOX_CONTENT_SIZE_BYTES = 4 * 1024 * 1024;
-
-  private final MailboxService<TransferableBlock> _mailbox;
-  private final List<MailboxIdentifier> _destinations;
+  private final List<SendingMailbox<TransferableBlock>> _sendingMailboxes;
   private final BlockSplitter _splitter;
 
   public static BlockExchange getExchange(MailboxService<TransferableBlock> mailboxService,
-      List<MailboxIdentifier> destinations, RelDistribution.Type exchangeType,
-      KeySelector<Object[], Object[]> selector, BlockSplitter splitter) {
+      List<MailboxIdentifier> destinations, RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> selector,
+      BlockSplitter splitter) {
+    List<SendingMailbox<TransferableBlock>> sendingMailboxes = new ArrayList<>();
+    for (MailboxIdentifier mid : destinations) {
+      sendingMailboxes.add(mailboxService.getSendingMailbox(mid));
+    }
     switch (exchangeType) {
       case SINGLETON:
-        return new SingletonExchange(mailboxService, destinations, splitter);
+        return new SingletonExchange(sendingMailboxes, splitter);
       case HASH_DISTRIBUTED:
-        return new HashExchange(mailboxService, destinations, selector, splitter);
+        return new HashExchange(sendingMailboxes, selector, splitter);
       case RANDOM_DISTRIBUTED:
-        return new RandomExchange(mailboxService, destinations, splitter);
+        return new RandomExchange(sendingMailboxes, splitter);
       case BROADCAST_DISTRIBUTED:
-        return new BroadcastExchange(mailboxService, destinations, splitter);
+        return new BroadcastExchange(sendingMailboxes, splitter);
       case ROUND_ROBIN_DISTRIBUTED:
       case RANGE_DISTRIBUTED:
       case ANY:
@@ -63,29 +66,20 @@ public abstract class BlockExchange {
     }
   }
 
-  protected BlockExchange(MailboxService<TransferableBlock> mailbox, List<MailboxIdentifier> destinations,
-      BlockSplitter splitter) {
-    _mailbox = mailbox;
-    _destinations = destinations;
+  protected BlockExchange(List<SendingMailbox<TransferableBlock>> sendingMailboxes, BlockSplitter splitter) {
+    _sendingMailboxes = sendingMailboxes;
     _splitter = splitter;
   }
 
   public void send(TransferableBlock block) {
     if (block.isEndOfStreamBlock()) {
-      _destinations.forEach(destination -> sendBlock(destination, block));
+      _sendingMailboxes.forEach(destination -> sendBlock(destination, block));
       return;
     }
-
-    Iterator<RoutedBlock> routedBlocks = route(_destinations, block);
-    while (routedBlocks.hasNext()) {
-      RoutedBlock next = routedBlocks.next();
-      sendBlock(next._destination, next._block);
-    }
+    route(_sendingMailboxes, block);
   }
 
-  private void sendBlock(MailboxIdentifier mailboxId, TransferableBlock block) {
-    SendingMailbox<TransferableBlock> sendingMailbox = _mailbox.getSendingMailbox(mailboxId);
-
+  protected void sendBlock(SendingMailbox<TransferableBlock> sendingMailbox, TransferableBlock block) {
     if (block.isEndOfStreamBlock()) {
       sendingMailbox.send(block);
       sendingMailbox.complete();
@@ -94,21 +88,10 @@ public abstract class BlockExchange {
 
     DataBlock.Type type = block.getType();
     Iterator<TransferableBlock> splits = _splitter.split(block, type, MAX_MAILBOX_CONTENT_SIZE_BYTES);
-
     while (splits.hasNext()) {
       sendingMailbox.send(splits.next());
     }
   }
 
-  protected abstract Iterator<RoutedBlock> route(List<MailboxIdentifier> destinations, TransferableBlock block);
-
-  protected static class RoutedBlock {
-    final MailboxIdentifier _destination;
-    final TransferableBlock _block;
-
-    protected RoutedBlock(MailboxIdentifier destination, TransferableBlock block) {
-      _destination = destination;
-      _block = block;
-    }
-  }
+  protected abstract void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block);
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java
index dc1dc2e9dd..932f7593b6 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java
@@ -18,10 +18,8 @@
  */
 package org.apache.pinot.query.runtime.operator.exchange;
 
-import java.util.Iterator;
 import java.util.List;
-import org.apache.pinot.query.mailbox.MailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.runtime.blocks.BlockSplitter;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 
@@ -31,13 +29,14 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock;
  */
 class BroadcastExchange extends BlockExchange {
 
-  protected BroadcastExchange(MailboxService<TransferableBlock> mailbox, List<MailboxIdentifier> destinations,
-      BlockSplitter splitter) {
-    super(mailbox, destinations, splitter);
+  protected BroadcastExchange(List<SendingMailbox<TransferableBlock>> sendingMailboxes, BlockSplitter splitter) {
+    super(sendingMailboxes, splitter);
   }
 
   @Override
-  protected Iterator<RoutedBlock> route(List<MailboxIdentifier> destinations, TransferableBlock block) {
-    return destinations.stream().map(dest -> new RoutedBlock(dest, block)).iterator();
+  protected void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block) {
+    for (SendingMailbox mailbox : destinations) {
+      sendBlock(mailbox, block);
+    }
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
index 2371221740..99f0e04f91 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
@@ -18,14 +18,9 @@
  */
 package org.apache.pinot.query.runtime.operator.exchange;
 
-import com.google.common.collect.Iterators;
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import org.apache.pinot.query.mailbox.MailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.runtime.blocks.BlockSplitter;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -41,25 +36,26 @@ class HashExchange extends BlockExchange {
   // TODO: ensure that server instance list is sorted using same function in sender.
   private final KeySelector<Object[], Object[]> _keySelector;
 
-  HashExchange(MailboxService<TransferableBlock> mailbox, List<MailboxIdentifier> destinations,
-      KeySelector<Object[], Object[]> selector, BlockSplitter splitter) {
-    super(mailbox, destinations, splitter);
+  HashExchange(List<SendingMailbox<TransferableBlock>> sendingMailboxes, KeySelector<Object[], Object[]> selector,
+      BlockSplitter splitter) {
+    super(sendingMailboxes, splitter);
     _keySelector = selector;
   }
 
   @Override
-  protected Iterator<RoutedBlock> route(List<MailboxIdentifier> destinations, TransferableBlock block) {
-    Map<Integer, List<Object[]>> destIdxToRows = new HashMap<>();
-
+  protected void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block) {
+    List<Object[]>[] destIdxToRows = new List[destinations.size()];
     for (Object[] row : block.getContainer()) {
       int partition = _keySelector.computeHash(row) % destinations.size();
-      destIdxToRows.computeIfAbsent(partition, k -> new ArrayList<>()).add(row);
+      if (destIdxToRows[partition] == null) {
+        destIdxToRows[partition] = new ArrayList<>();
+      }
+      destIdxToRows[partition].add(row);
+    }
+    for (int i = 0; i < destinations.size(); i++) {
+      if (destIdxToRows[i] != null) {
+        sendBlock(destinations.get(i), new TransferableBlock(destIdxToRows[i], block.getDataSchema(), block.getType()));
+      }
     }
-
-    return Iterators.transform(
-        destIdxToRows.entrySet().iterator(),
-        partitionAndBlock -> new RoutedBlock(
-            destinations.get(partitionAndBlock.getKey()),
-            new TransferableBlock(partitionAndBlock.getValue(), block.getDataSchema(), block.getType())));
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java
index d12cc3ae46..0073e28bf0 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java
@@ -19,13 +19,10 @@
 package org.apache.pinot.query.runtime.operator.exchange;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterators;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
 import java.util.function.IntFunction;
-import org.apache.pinot.query.mailbox.MailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.runtime.blocks.BlockSplitter;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 
@@ -39,21 +36,20 @@ class RandomExchange extends BlockExchange {
 
   private final IntFunction<Integer> _rand;
 
-  RandomExchange(MailboxService<TransferableBlock> mailbox, List<MailboxIdentifier> destinations,
-      BlockSplitter splitter) {
-    this(mailbox, destinations, RANDOM::nextInt, splitter);
+  RandomExchange(List<SendingMailbox<TransferableBlock>> sendingMailboxes, BlockSplitter splitter) {
+    this(sendingMailboxes, RANDOM::nextInt, splitter);
   }
 
   @VisibleForTesting
-  RandomExchange(MailboxService<TransferableBlock> mailbox, List<MailboxIdentifier> destinations,
-      IntFunction<Integer> rand, BlockSplitter splitter) {
-    super(mailbox, destinations, splitter);
+  RandomExchange(List<SendingMailbox<TransferableBlock>> sendingMailboxes, IntFunction<Integer> rand,
+      BlockSplitter splitter) {
+    super(sendingMailboxes, splitter);
     _rand = rand;
   }
 
   @Override
-  protected Iterator<RoutedBlock> route(List<MailboxIdentifier> destinations, TransferableBlock block) {
+  protected void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block) {
     int destinationIdx = _rand.apply(destinations.size());
-    return Iterators.singletonIterator(new RoutedBlock(destinations.get(destinationIdx), block));
+    sendBlock(destinations.get(destinationIdx), block);
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
index 94568e57db..713cfa5ba2 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
@@ -18,12 +18,8 @@
  */
 package org.apache.pinot.query.runtime.operator.exchange;
 
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-import java.util.Iterator;
 import java.util.List;
-import org.apache.pinot.query.mailbox.MailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.runtime.blocks.BlockSplitter;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 
@@ -34,13 +30,14 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock;
  */
 class SingletonExchange extends BlockExchange {
 
-  SingletonExchange(MailboxService<TransferableBlock> mailbox, List<MailboxIdentifier> destinations,
-      BlockSplitter splitter) {
-    super(mailbox, destinations, splitter);
+  SingletonExchange(List<SendingMailbox<TransferableBlock>> sendingMailboxes, BlockSplitter splitter) {
+    super(sendingMailboxes, splitter);
   }
 
   @Override
-  protected Iterator<RoutedBlock> route(List<MailboxIdentifier> destinations, TransferableBlock block) {
-    return Iterators.singletonIterator(new RoutedBlock(Iterables.getOnlyElement(destinations), block));
+  protected void route(List<SendingMailbox<TransferableBlock>> mailbox, TransferableBlock block) {
+    for (SendingMailbox sendingMailbox : mailbox) {
+      sendBlock(sendingMailbox, block);
+    }
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
index 43663c5e0a..033f010a8c 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
@@ -20,15 +20,10 @@ package org.apache.pinot.query.runtime.operator.exchange;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
-import java.util.Iterator;
 import java.util.List;
-import java.util.function.BiFunction;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.runtime.blocks.BlockSplitter;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -44,13 +39,8 @@ import org.testng.annotations.Test;
 
 
 public class BlockExchangeTest {
-  private static final MailboxIdentifier MAILBOX_1 = new JsonMailboxIdentifier("1", "0@host:1", "0@host:1");
-  private static final MailboxIdentifier MAILBOX_2 = new JsonMailboxIdentifier("1", "0@host:1", "0@host:2");
-
   private AutoCloseable _mocks;
 
-  @Mock
-  private MailboxService<TransferableBlock> _mailboxService;
   @Mock
   private SendingMailbox<TransferableBlock> _mailbox1;
   @Mock
@@ -59,8 +49,6 @@ public class BlockExchangeTest {
   @BeforeMethod
   public void setUp() {
     _mocks = MockitoAnnotations.openMocks(this);
-    Mockito.when(_mailboxService.getSendingMailbox(MAILBOX_1)).thenReturn(_mailbox1);
-    Mockito.when(_mailboxService.getSendingMailbox(MAILBOX_2)).thenReturn(_mailbox2);
   }
 
   @AfterMethod
@@ -72,13 +60,8 @@ public class BlockExchangeTest {
   @Test
   public void shouldSendEosBlockToAllDestinations() {
     // Given:
-    List<MailboxIdentifier> destinations = ImmutableList.of(MAILBOX_1, MAILBOX_2);
-    BlockExchange exchange = new TestBlockExchange(
-        _mailboxService,
-        destinations,
-        (dest, block) -> Iterators.singletonIterator(new BlockExchange.RoutedBlock(MAILBOX_1, block))
-    );
-
+    List<SendingMailbox<TransferableBlock>> destinations = ImmutableList.of(_mailbox1, _mailbox2);
+    BlockExchange exchange = new TestBlockExchange(destinations);
     // When:
     exchange.send(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
@@ -97,12 +80,8 @@ public class BlockExchangeTest {
   @Test
   public void shouldSendDataBlocksOnlyToTargetDestination() {
     // Given:
-    List<MailboxIdentifier> destinations = ImmutableList.of(MAILBOX_1, MAILBOX_2);
-    BlockExchange exchange = new TestBlockExchange(
-        _mailboxService,
-        destinations,
-        (dest, block) -> Iterators.singletonIterator(new BlockExchange.RoutedBlock(MAILBOX_1, block))
-    );
+    List<SendingMailbox<TransferableBlock>> destinations = ImmutableList.of(_mailbox1);
+    BlockExchange exchange = new TestBlockExchange(destinations);
     TransferableBlock block = new TransferableBlock(ImmutableList.of(new Object[]{"val"}),
         new DataSchema(new String[]{"foo"}, new ColumnDataType[]{ColumnDataType.STRING}), DataBlock.Type.ROW);
 
@@ -120,25 +99,21 @@ public class BlockExchangeTest {
   @Test
   public void shouldSplitBlocks() {
     // Given:
-    List<MailboxIdentifier> destinations = ImmutableList.of(MAILBOX_1, MAILBOX_2);
+    List<SendingMailbox<TransferableBlock>> destinations = ImmutableList.of(_mailbox1);
 
     DataSchema schema = new DataSchema(new String[]{"foo"}, new ColumnDataType[]{ColumnDataType.STRING});
 
-    TransferableBlock inBlock = new TransferableBlock(
-        ImmutableList.of(new Object[]{"one"}, new Object[]{"two"}), schema, DataBlock.Type.ROW);
+    TransferableBlock inBlock =
+        new TransferableBlock(ImmutableList.of(new Object[]{"one"}, new Object[]{"two"}), schema, DataBlock.Type.ROW);
 
-    TransferableBlock outBlockOne = new TransferableBlock(
-        ImmutableList.of(new Object[]{"one"}), schema, DataBlock.Type.ROW);
+    TransferableBlock outBlockOne =
+        new TransferableBlock(ImmutableList.of(new Object[]{"one"}), schema, DataBlock.Type.ROW);
 
-    TransferableBlock outBlockTwo = new TransferableBlock(
-        ImmutableList.of(new Object[]{"two"}), schema, DataBlock.Type.ROW);
+    TransferableBlock outBlockTwo =
+        new TransferableBlock(ImmutableList.of(new Object[]{"two"}), schema, DataBlock.Type.ROW);
 
-    BlockExchange exchange = new TestBlockExchange(
-        _mailboxService,
-        destinations,
-        (dest, block) -> Iterators.singletonIterator(new BlockExchange.RoutedBlock(MAILBOX_1, block)),
-        (block, type, maxSize) -> ImmutableList.of(outBlockOne, outBlockTwo).iterator()
-    );
+    BlockExchange exchange = new TestBlockExchange(destinations,
+        (block, type, maxSize) -> ImmutableList.of(outBlockOne, outBlockTwo).iterator());
 
     // When:
     exchange.send(inBlock);
@@ -154,23 +129,19 @@ public class BlockExchangeTest {
   }
 
   private static class TestBlockExchange extends BlockExchange {
-
-    private final BiFunction<List<MailboxIdentifier>, TransferableBlock, Iterator<RoutedBlock>> _router;
-
-    protected TestBlockExchange(MailboxService<TransferableBlock> mailbox, List<MailboxIdentifier> destinations,
-        BiFunction<List<MailboxIdentifier>, TransferableBlock, Iterator<RoutedBlock>> router) {
-      this(mailbox, destinations, router, (block, type, size) -> Iterators.singletonIterator(block));
+    protected TestBlockExchange(List<SendingMailbox<TransferableBlock>> destinations) {
+      this(destinations, (block, type, size) -> Iterators.singletonIterator(block));
     }
 
-    protected TestBlockExchange(MailboxService<TransferableBlock> mailbox, List<MailboxIdentifier> destinations,
-        BiFunction<List<MailboxIdentifier>, TransferableBlock, Iterator<RoutedBlock>> router, BlockSplitter splitter) {
-      super(mailbox, destinations, splitter);
-      _router = router;
+    protected TestBlockExchange(List<SendingMailbox<TransferableBlock>> destinations, BlockSplitter splitter) {
+      super(destinations, splitter);
     }
 
     @Override
-    protected Iterator<RoutedBlock> route(List<MailboxIdentifier> destinations, TransferableBlock block) {
-      return _router.apply(destinations, block);
+    protected void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block) {
+      for (SendingMailbox mailbox : destinations) {
+        sendBlock(mailbox, block);
+      }
     }
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
index eccd31fe6a..b39d8e43fb 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
@@ -19,13 +19,13 @@
 package org.apache.pinot.query.runtime.operator.exchange;
 
 import com.google.common.collect.ImmutableList;
-import java.util.Iterator;
-import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -34,19 +34,20 @@ import org.testng.annotations.Test;
 
 
 public class BroadcastExchangeTest {
-  private static final MailboxIdentifier MAILBOX_1 = new JsonMailboxIdentifier("1", "0@host:1", "0@host:1");
-  private static final MailboxIdentifier MAILBOX_2 = new JsonMailboxIdentifier("1", "0@host:1", "0@host:2");
-
   private AutoCloseable _mocks;
 
   @Mock
   TransferableBlock _block;
+
+  @Mock
+  private SendingMailbox<TransferableBlock> _mailbox1;
   @Mock
-  MailboxService<TransferableBlock> _mailboxService;
+  private SendingMailbox<TransferableBlock> _mailbox2;
 
   @BeforeMethod
   public void setUp() {
     _mocks = MockitoAnnotations.openMocks(this);
+    Mockito.when(_block.getType()).thenReturn(DataBlock.Type.METADATA);
   }
 
   @AfterMethod
@@ -58,22 +59,17 @@ public class BroadcastExchangeTest {
   @Test
   public void shouldBroadcast() {
     // Given:
-    ImmutableList<MailboxIdentifier> destinations = ImmutableList.of(MAILBOX_1, MAILBOX_2);
+    ImmutableList<SendingMailbox<TransferableBlock>> destinations = ImmutableList.of(_mailbox1, _mailbox2);
 
     // When:
-    Iterator<BlockExchange.RoutedBlock> route =
-        new BroadcastExchange(_mailboxService, destinations, TransferableBlockUtils::splitBlock)
-            .route(destinations, _block);
+    new BroadcastExchange(destinations, TransferableBlockUtils::splitBlock).route(destinations, _block);
 
-    // Then:
-    BlockExchange.RoutedBlock routedBlock = route.next();
-    Assert.assertEquals(routedBlock._destination, MAILBOX_1);
-    Assert.assertEquals(routedBlock._block, _block);
+    ArgumentCaptor<TransferableBlock> captor = ArgumentCaptor.forClass(TransferableBlock.class);
 
-    routedBlock = route.next();
-    Assert.assertEquals(routedBlock._destination, MAILBOX_2);
-    Assert.assertEquals(routedBlock._block, _block);
+    Mockito.verify(_mailbox1, Mockito.times(1)).send(captor.capture());
+    Assert.assertEquals(captor.getValue(), _block);
 
-    Assert.assertFalse(route.hasNext(), "should be done with routing");
+    Mockito.verify(_mailbox2, Mockito.times(1)).send(captor.capture());
+    Assert.assertEquals(captor.getValue(), _block);
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
index 31ff4a70f3..50b8b4bdc9 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
@@ -21,12 +21,13 @@ package org.apache.pinot.query.runtime.operator.exchange;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 import java.util.Iterator;
-import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
@@ -37,19 +38,22 @@ import org.testng.annotations.Test;
 
 
 public class HashExchangeTest {
-  private static final MailboxIdentifier MAILBOX_1 = new JsonMailboxIdentifier("1", "0@host:1", "0@host:1");
-  private static final MailboxIdentifier MAILBOX_2 = new JsonMailboxIdentifier("1", "0@host:1", "0@host:2");
+  @Mock
+  private SendingMailbox<TransferableBlock> _mailbox1;
+  @Mock
+  private SendingMailbox<TransferableBlock> _mailbox2;
 
   private AutoCloseable _mocks;
 
   @Mock
   TransferableBlock _block;
-  @Mock
-  MailboxService<TransferableBlock> _mailboxService;
 
   @BeforeMethod
   public void setUp() {
     _mocks = MockitoAnnotations.openMocks(this);
+    Mockito.when(_block.getType()).thenReturn(DataBlock.Type.ROW);
+    Mockito.when(_block.getDataSchema()).thenReturn(
+        new DataSchema(new String[]{"col1"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
   }
 
   @AfterMethod
@@ -62,29 +66,21 @@ public class HashExchangeTest {
   public void shouldSplitAndRouteBlocksBasedOnPartitionKey() {
     // Given:
     TestSelector selector = new TestSelector(Iterators.forArray(2, 0, 1));
-    Mockito.when(_block.getContainer()).thenReturn(ImmutableList.of(
-        new Object[]{0},
-        new Object[]{1},
-        new Object[]{2}
-    ));
-    ImmutableList<MailboxIdentifier> destinations = ImmutableList.of(MAILBOX_1, MAILBOX_2);
+    Mockito.when(_block.getContainer()).thenReturn(ImmutableList.of(new Object[]{0}, new Object[]{1}, new Object[]{2}));
+    ImmutableList<SendingMailbox<TransferableBlock>> destinations = ImmutableList.of(_mailbox1, _mailbox2);
 
     // When:
-    Iterator<BlockExchange.RoutedBlock> route =
-        new HashExchange(_mailboxService, destinations, selector, TransferableBlockUtils::splitBlock)
-            .route(destinations, _block);
+    new HashExchange(destinations, selector, TransferableBlockUtils::splitBlock).route(destinations, _block);
 
     // Then:
-    BlockExchange.RoutedBlock block1 = route.next();
-    Assert.assertEquals(block1._destination, MAILBOX_1);
-    Assert.assertEquals(block1._block.getContainer().get(0), new Object[]{0});
-    Assert.assertEquals(block1._block.getContainer().get(1), new Object[]{1});
+    ArgumentCaptor<TransferableBlock> captor = ArgumentCaptor.forClass(TransferableBlock.class);
 
-    BlockExchange.RoutedBlock block2 = route.next();
-    Assert.assertEquals(block2._destination, MAILBOX_2);
-    Assert.assertEquals(block2._block.getContainer().get(0), new Object[]{2});
+    Mockito.verify(_mailbox1, Mockito.times(1)).send(captor.capture());
+    Assert.assertEquals(captor.getValue().getContainer().get(0), new Object[]{0});
+    Assert.assertEquals(captor.getValue().getContainer().get(1), new Object[]{1});
 
-    Assert.assertFalse(route.hasNext());
+    Mockito.verify(_mailbox2, Mockito.times(1)).send(captor.capture());
+    Assert.assertEquals(captor.getValue().getContainer().get(0), new Object[]{2});
   }
 
   private static class TestSelector implements KeySelector<Object[], Object[]> {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
index c93080824c..ab97ea6196 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
@@ -19,33 +19,34 @@
 package org.apache.pinot.query.runtime.operator.exchange;
 
 import com.google.common.collect.ImmutableList;
-import java.util.Iterator;
-import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-public class RandomExchangeTest {
-  private static final MailboxIdentifier MAILBOX_1 = new JsonMailboxIdentifier("1", "0@host:1", "0@host:1");
-  private static final MailboxIdentifier MAILBOX_2 = new JsonMailboxIdentifier("1", "0@host:1", "0@host:2");
 
+public class RandomExchangeTest {
   private AutoCloseable _mocks;
 
   @Mock
   TransferableBlock _block;
   @Mock
-  MailboxService<TransferableBlock> _mailboxService;
+  private SendingMailbox<TransferableBlock> _mailbox1;
+  @Mock
+  private SendingMailbox<TransferableBlock> _mailbox2;
 
   @BeforeMethod
   public void setUp() {
     _mocks = MockitoAnnotations.openMocks(this);
+    Mockito.when(_block.getType()).thenReturn(DataBlock.Type.METADATA);
   }
 
   @AfterMethod
@@ -57,17 +58,14 @@ public class RandomExchangeTest {
   @Test
   public void shouldRouteRandomly() {
     // Given:
-    ImmutableList<MailboxIdentifier> destinations = ImmutableList.of(MAILBOX_1, MAILBOX_2);
+    ImmutableList<SendingMailbox<TransferableBlock>> destinations = ImmutableList.of(_mailbox1, _mailbox2);
 
     // When:
-    Iterator<BlockExchange.RoutedBlock> route =
-        new RandomExchange(_mailboxService, destinations, size -> 1, TransferableBlockUtils::splitBlock)
-            .route(destinations, _block);
+    new RandomExchange(destinations, size -> 1, TransferableBlockUtils::splitBlock).route(destinations, _block);
 
+    ArgumentCaptor<TransferableBlock> captor = ArgumentCaptor.forClass(TransferableBlock.class);
     // Then:
-    BlockExchange.RoutedBlock routedBlock = route.next();
-    Assert.assertEquals(routedBlock._destination, MAILBOX_2);
-    Assert.assertEquals(routedBlock._block, _block);
-    Assert.assertFalse(route.hasNext(), "should be done with routing");
+    Mockito.verify(_mailbox2, Mockito.times(1)).send(captor.capture());
+    Assert.assertEquals(captor.getValue(), _block);
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
index 6b9dcd53f1..09855be2a4 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
@@ -19,13 +19,13 @@
 package org.apache.pinot.query.runtime.operator.exchange;
 
 import com.google.common.collect.ImmutableList;
-import java.util.Iterator;
-import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -34,18 +34,17 @@ import org.testng.annotations.Test;
 
 
 public class SingletonExchangeTest {
-  private static final MailboxIdentifier MAILBOX_1 = new JsonMailboxIdentifier("1", "0@host:1", "0@host:1");
-
   private AutoCloseable _mocks;
 
   @Mock
   TransferableBlock _block;
   @Mock
-  MailboxService<TransferableBlock> _mailboxService;
+  private SendingMailbox<TransferableBlock> _mailbox1;
 
   @BeforeMethod
   public void setUp() {
     _mocks = MockitoAnnotations.openMocks(this);
+    Mockito.when(_block.getType()).thenReturn(DataBlock.Type.METADATA);
   }
 
   @AfterMethod
@@ -57,17 +56,15 @@ public class SingletonExchangeTest {
   @Test
   public void shouldRouteSingleton() {
     // Given:
-    ImmutableList<MailboxIdentifier> destinations = ImmutableList.of(MAILBOX_1);
+    ImmutableList<SendingMailbox<TransferableBlock>> destinations = ImmutableList.of(_mailbox1);
 
     // When:
-    Iterator<BlockExchange.RoutedBlock> route =
-        new SingletonExchange(_mailboxService, destinations, TransferableBlockUtils::splitBlock)
-            .route(destinations, _block);
+    new SingletonExchange(destinations, TransferableBlockUtils::splitBlock).route(destinations, _block);
 
     // Then:
-    BlockExchange.RoutedBlock routedBlock = route.next();
-    Assert.assertEquals(routedBlock._destination, MAILBOX_1);
-    Assert.assertEquals(routedBlock._block, _block);
-    Assert.assertFalse(route.hasNext(), "should be done with routing");
+    ArgumentCaptor<TransferableBlock> captor = ArgumentCaptor.forClass(TransferableBlock.class);
+    // Then:
+    Mockito.verify(_mailbox1, Mockito.times(1)).send(captor.capture());
+    Assert.assertEquals(captor.getValue(), _block);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org