You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/02/02 23:14:17 UTC
[pinot] branch master updated: [multistage] [bugfix] Fix sending mailbox object leak (#10190)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0bbb9455eb [multistage] [bugfix] Fix sending mailbox object leak (#10190)
0bbb9455eb is described below
commit 0bbb9455eb7aa374e0c8b1e2c5ec0a4d6cd7a93e
Author: Yao Liu <ya...@startree.ai>
AuthorDate: Thu Feb 2 15:14:10 2023 -0800
[multistage] [bugfix] Fix sending mailbox object leak (#10190)
* cancel
* clean up
* error format
* all type support and fix
* fix sending mailbox leak
* fix lint
* address comments
* address comments
---
.../pinot/query/mailbox/GrpcMailboxService.java | 19 ++++--
.../pinot/query/mailbox/GrpcSendingMailbox.java | 38 +++++-------
.../query/mailbox/InMemoryMailboxService.java | 44 ++++----------
.../query/mailbox/InMemorySendingMailbox.java | 8 ++-
.../apache/pinot/query/mailbox/SendingMailbox.java | 2 +
.../runtime/operator/exchange/BlockExchange.java | 53 ++++++----------
.../operator/exchange/BroadcastExchange.java | 15 +++--
.../runtime/operator/exchange/HashExchange.java | 34 +++++------
.../runtime/operator/exchange/RandomExchange.java | 20 +++---
.../operator/exchange/SingletonExchange.java | 17 +++---
.../operator/exchange/BlockExchangeTest.java | 71 +++++++---------------
.../operator/exchange/BroadcastExchangeTest.java | 36 +++++------
.../operator/exchange/HashExchangeTest.java | 44 ++++++--------
.../operator/exchange/RandomExchangeTest.java | 30 +++++----
.../operator/exchange/SingletonExchangeTest.java | 27 ++++----
15 files changed, 190 insertions(+), 268 deletions(-)
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
index fc42388a11..37fd81b3d2 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
@@ -19,9 +19,14 @@
package org.apache.pinot.query.mailbox;
import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
+import org.apache.pinot.common.proto.Mailbox;
+import org.apache.pinot.common.proto.PinotMailboxGrpc;
import org.apache.pinot.query.mailbox.channel.ChannelManager;
+import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -52,8 +57,6 @@ public class GrpcMailboxService implements MailboxService<TransferableBlock> {
// maintaining a list of registered mailboxes.
private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>> _receivingMailboxMap =
new ConcurrentHashMap<>();
- private final ConcurrentHashMap<String, SendingMailbox<TransferableBlock>> _sendingMailboxMap =
- new ConcurrentHashMap<>();
private final Consumer<MailboxIdentifier> _gotMailCallback;
public GrpcMailboxService(String hostname, int mailboxPort, PinotConfiguration extraConfig,
@@ -89,7 +92,13 @@ public class GrpcMailboxService implements MailboxService<TransferableBlock> {
* @param mailboxId the id of the mailbox.
*/
public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier mailboxId) {
- return _sendingMailboxMap.computeIfAbsent(mailboxId.toString(), (mId) -> new GrpcSendingMailbox(mId, this));
+ ManagedChannel channel = getChannel(mailboxId.toString());
+ PinotMailboxGrpc.PinotMailboxStub stub = PinotMailboxGrpc.newStub(channel);
+ CountDownLatch latch = new CountDownLatch(1);
+ StreamObserver<Mailbox.MailboxContent> mailboxContentStreamObserver =
+ stub.open(new MailboxStatusStreamObserver(latch));
+ GrpcSendingMailbox mailbox = new GrpcSendingMailbox(mailboxId.toString(), mailboxContentStreamObserver, latch);
+ return mailbox;
}
/**
@@ -97,8 +106,8 @@ public class GrpcMailboxService implements MailboxService<TransferableBlock> {
* @param mailboxId the id of the mailbox.
*/
public ReceivingMailbox<TransferableBlock> getReceivingMailbox(MailboxIdentifier mailboxId) {
- return _receivingMailboxMap.computeIfAbsent(
- mailboxId.toString(), (mId) -> new GrpcReceivingMailbox(mId, _gotMailCallback));
+ return _receivingMailboxMap.computeIfAbsent(mailboxId.toString(),
+ (mId) -> new GrpcReceivingMailbox(mId, _gotMailCallback));
}
public ManagedChannel getChannel(String mailboxId) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index 1f446952a8..d2f4de89c2 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -19,7 +19,6 @@
package org.apache.pinot.query.mailbox;
import com.google.protobuf.ByteString;
-import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
@@ -30,9 +29,7 @@ import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.proto.Mailbox.MailboxContent;
-import org.apache.pinot.common.proto.PinotMailboxGrpc;
import org.apache.pinot.query.mailbox.channel.ChannelUtils;
-import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -40,39 +37,27 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock;
* GRPC implementation of the {@link SendingMailbox}.
*/
public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
- private final GrpcMailboxService _mailboxService;
private final String _mailboxId;
private final AtomicBoolean _initialized = new AtomicBoolean(false);
private final AtomicInteger _totalMsgSent = new AtomicInteger(0);
- private CountDownLatch _finishLatch = new CountDownLatch(1);
+ private final CountDownLatch _finishLatch;
+ private final StreamObserver<MailboxContent> _mailboxContentStreamObserver;
- private StreamObserver<MailboxContent> _mailboxContentStreamObserver;
-
- public GrpcSendingMailbox(String mailboxId, GrpcMailboxService mailboxService) {
- _mailboxService = mailboxService;
+ public GrpcSendingMailbox(String mailboxId, StreamObserver<MailboxContent> mailboxContentStreamObserver,
+ CountDownLatch latch) {
_mailboxId = mailboxId;
+ _mailboxContentStreamObserver = mailboxContentStreamObserver;
+ _finishLatch = latch;
_initialized.set(false);
}
- public void init()
- throws UnsupportedOperationException {
- ManagedChannel channel = _mailboxService.getChannel(_mailboxId);
- PinotMailboxGrpc.PinotMailboxStub stub = PinotMailboxGrpc.newStub(channel);
- _mailboxContentStreamObserver = stub.open(new MailboxStatusStreamObserver(_finishLatch));
- // TODO: Replace init call with metadata.
- // send a begin-of-stream message.
- _mailboxContentStreamObserver.onNext(MailboxContent.newBuilder().setMailboxId(_mailboxId)
- .putMetadata(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY, "true").build());
- _initialized.set(true);
- }
-
@Override
public void send(TransferableBlock block)
throws UnsupportedOperationException {
if (!_initialized.get()) {
// initialization is special
- init();
+ open();
}
MailboxContent data = toMailboxContent(block.getDataBlock());
_mailboxContentStreamObserver.onNext(data);
@@ -84,6 +69,15 @@ public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
_mailboxContentStreamObserver.onCompleted();
}
+ @Override
+ public void open() {
+ // TODO: Get rid of init call.
+ // send a begin-of-stream message.
+ _mailboxContentStreamObserver.onNext(MailboxContent.newBuilder().setMailboxId(_mailboxId)
+ .putMetadata(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY, "true").build());
+ _initialized.set(true);
+ }
+
@Override
public String getMailboxId() {
return _mailboxId;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java
index 6a6b0bb875..b5bcb9057b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java
@@ -32,7 +32,8 @@ public class InMemoryMailboxService implements MailboxService<TransferableBlock>
private final int _mailboxPort;
private final Consumer<MailboxIdentifier> _receivedMailContentCallback;
- private final ConcurrentHashMap<String, InMemoryMailboxState> _mailboxStateMap = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, ReceivingMailbox> _receivingMailbox = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, BlockingQueue> _mailboxQueue = new ConcurrentHashMap<>();
public InMemoryMailboxService(String hostname, int mailboxPort,
Consumer<MailboxIdentifier> receivedMailContentCallback) {
@@ -41,6 +42,10 @@ public class InMemoryMailboxService implements MailboxService<TransferableBlock>
_receivedMailContentCallback = receivedMailContentCallback;
}
+ public Consumer<MailboxIdentifier> getReceivedMailContentCallback() {
+ return _receivedMailContentCallback;
+ }
+
@Override
public void start() {
}
@@ -62,24 +67,6 @@ public class InMemoryMailboxService implements MailboxService<TransferableBlock>
public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier mailboxId) {
Preconditions.checkState(mailboxId.isLocal(), "Cannot use in-memory mailbox service for non-local transport");
String mId = mailboxId.toString();
- return _mailboxStateMap.computeIfAbsent(mId, this::newMailboxState)._sendingMailbox;
- }
-
- public ReceivingMailbox<TransferableBlock> getReceivingMailbox(MailboxIdentifier mailboxId) {
- Preconditions.checkState(mailboxId.isLocal(), "Cannot use in-memory mailbox service for non-local transport");
- String mId = mailboxId.toString();
- return _mailboxStateMap.computeIfAbsent(mId, this::newMailboxState)._receivingMailbox;
- }
-
- InMemoryMailboxState newMailboxState(String mailboxId) {
- BlockingQueue<TransferableBlock> queue = createDefaultChannel();
- return new InMemoryMailboxState(
- new InMemorySendingMailbox(mailboxId, queue, _receivedMailContentCallback),
- new InMemoryReceivingMailbox(mailboxId, queue),
- queue);
- }
-
- private BlockingQueue<TransferableBlock> createDefaultChannel() {
// for now, we use an unbounded blocking queue as the means of communication between
// in memory mailboxes - the reason for this is that unless we implement flow control,
// blocks will sit in memory either way (blocking the sender from sending doesn't prevent
@@ -88,19 +75,14 @@ public class InMemoryMailboxService implements MailboxService<TransferableBlock>
// threads (most importantly, the receiving thread) from running - which can cause unnecessary
// failure situations
// TODO: when we implement flow control, we should swap this out with a bounded abstraction
- return new LinkedBlockingQueue<>();
+ return new InMemorySendingMailbox(mailboxId.toString(),
+ _mailboxQueue.computeIfAbsent(mId, id -> new LinkedBlockingQueue<>()), getReceivedMailContentCallback());
}
- static class InMemoryMailboxState {
- ReceivingMailbox<TransferableBlock> _receivingMailbox;
- SendingMailbox<TransferableBlock> _sendingMailbox;
- BlockingQueue<TransferableBlock> _queue;
-
- InMemoryMailboxState(SendingMailbox<TransferableBlock> sendingMailbox,
- ReceivingMailbox<TransferableBlock> receivingMailbox, BlockingQueue<TransferableBlock> queue) {
- _receivingMailbox = receivingMailbox;
- _sendingMailbox = sendingMailbox;
- _queue = queue;
- }
+ public ReceivingMailbox<TransferableBlock> getReceivingMailbox(MailboxIdentifier mailboxId) {
+ Preconditions.checkState(mailboxId.isLocal(), "Cannot use in-memory mailbox service for non-local transport");
+ String mId = mailboxId.toString();
+ BlockingQueue mailboxQueue = _mailboxQueue.computeIfAbsent(mId, id -> new LinkedBlockingQueue<>());
+ return _receivingMailbox.computeIfAbsent(mId, id -> new InMemoryReceivingMailbox(mId, mailboxQueue));
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
index 81528f7106..18dcd8ffd3 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
@@ -25,10 +25,12 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock;
public class InMemorySendingMailbox implements SendingMailbox<TransferableBlock> {
- private final BlockingQueue<TransferableBlock> _queue;
private final Consumer<MailboxIdentifier> _gotMailCallback;
private final String _mailboxId;
+ // TODO: changed to 2-way communication channel.
+ private BlockingQueue<TransferableBlock> _queue;
+
public InMemorySendingMailbox(String mailboxId, BlockingQueue<TransferableBlock> queue,
Consumer<MailboxIdentifier> gotMailCallback) {
_mailboxId = mailboxId;
@@ -36,6 +38,10 @@ public class InMemorySendingMailbox implements SendingMailbox<TransferableBlock>
_gotMailCallback = gotMailCallback;
}
+ @Override
+ public void open() {
+ }
+
@Override
public String getMailboxId() {
return _mailboxId;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
index 7d7b547324..6cc162b0ec 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
@@ -30,6 +30,8 @@ import java.util.concurrent.TimeUnit;
*/
public interface SendingMailbox<T> {
+ void open();
+
/**
* get the unique identifier for the mailbox.
*
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
index 2309b9dcb1..e6f131b33e 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.query.runtime.operator.exchange;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.calcite.rel.RelDistribution;
@@ -38,23 +39,25 @@ public abstract class BlockExchange {
// TODO: Deduct this value via grpc config maximum byte size; and make it configurable with override.
// TODO: Max block size is a soft limit. only counts fixedSize datatable byte buffer
private static final int MAX_MAILBOX_CONTENT_SIZE_BYTES = 4 * 1024 * 1024;
-
- private final MailboxService<TransferableBlock> _mailbox;
- private final List<MailboxIdentifier> _destinations;
+ private final List<SendingMailbox<TransferableBlock>> _sendingMailboxes;
private final BlockSplitter _splitter;
public static BlockExchange getExchange(MailboxService<TransferableBlock> mailboxService,
- List<MailboxIdentifier> destinations, RelDistribution.Type exchangeType,
- KeySelector<Object[], Object[]> selector, BlockSplitter splitter) {
+ List<MailboxIdentifier> destinations, RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> selector,
+ BlockSplitter splitter) {
+ List<SendingMailbox<TransferableBlock>> sendingMailboxes = new ArrayList<>();
+ for (MailboxIdentifier mid : destinations) {
+ sendingMailboxes.add(mailboxService.getSendingMailbox(mid));
+ }
switch (exchangeType) {
case SINGLETON:
- return new SingletonExchange(mailboxService, destinations, splitter);
+ return new SingletonExchange(sendingMailboxes, splitter);
case HASH_DISTRIBUTED:
- return new HashExchange(mailboxService, destinations, selector, splitter);
+ return new HashExchange(sendingMailboxes, selector, splitter);
case RANDOM_DISTRIBUTED:
- return new RandomExchange(mailboxService, destinations, splitter);
+ return new RandomExchange(sendingMailboxes, splitter);
case BROADCAST_DISTRIBUTED:
- return new BroadcastExchange(mailboxService, destinations, splitter);
+ return new BroadcastExchange(sendingMailboxes, splitter);
case ROUND_ROBIN_DISTRIBUTED:
case RANGE_DISTRIBUTED:
case ANY:
@@ -63,29 +66,20 @@ public abstract class BlockExchange {
}
}
- protected BlockExchange(MailboxService<TransferableBlock> mailbox, List<MailboxIdentifier> destinations,
- BlockSplitter splitter) {
- _mailbox = mailbox;
- _destinations = destinations;
+ protected BlockExchange(List<SendingMailbox<TransferableBlock>> sendingMailboxes, BlockSplitter splitter) {
+ _sendingMailboxes = sendingMailboxes;
_splitter = splitter;
}
public void send(TransferableBlock block) {
if (block.isEndOfStreamBlock()) {
- _destinations.forEach(destination -> sendBlock(destination, block));
+ _sendingMailboxes.forEach(destination -> sendBlock(destination, block));
return;
}
-
- Iterator<RoutedBlock> routedBlocks = route(_destinations, block);
- while (routedBlocks.hasNext()) {
- RoutedBlock next = routedBlocks.next();
- sendBlock(next._destination, next._block);
- }
+ route(_sendingMailboxes, block);
}
- private void sendBlock(MailboxIdentifier mailboxId, TransferableBlock block) {
- SendingMailbox<TransferableBlock> sendingMailbox = _mailbox.getSendingMailbox(mailboxId);
-
+ protected void sendBlock(SendingMailbox<TransferableBlock> sendingMailbox, TransferableBlock block) {
if (block.isEndOfStreamBlock()) {
sendingMailbox.send(block);
sendingMailbox.complete();
@@ -94,21 +88,10 @@ public abstract class BlockExchange {
DataBlock.Type type = block.getType();
Iterator<TransferableBlock> splits = _splitter.split(block, type, MAX_MAILBOX_CONTENT_SIZE_BYTES);
-
while (splits.hasNext()) {
sendingMailbox.send(splits.next());
}
}
- protected abstract Iterator<RoutedBlock> route(List<MailboxIdentifier> destinations, TransferableBlock block);
-
- protected static class RoutedBlock {
- final MailboxIdentifier _destination;
- final TransferableBlock _block;
-
- protected RoutedBlock(MailboxIdentifier destination, TransferableBlock block) {
- _destination = destination;
- _block = block;
- }
- }
+ protected abstract void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block);
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java
index dc1dc2e9dd..932f7593b6 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java
@@ -18,10 +18,8 @@
*/
package org.apache.pinot.query.runtime.operator.exchange;
-import java.util.Iterator;
import java.util.List;
-import org.apache.pinot.query.mailbox.MailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -31,13 +29,14 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock;
*/
class BroadcastExchange extends BlockExchange {
- protected BroadcastExchange(MailboxService<TransferableBlock> mailbox, List<MailboxIdentifier> destinations,
- BlockSplitter splitter) {
- super(mailbox, destinations, splitter);
+ protected BroadcastExchange(List<SendingMailbox<TransferableBlock>> sendingMailboxes, BlockSplitter splitter) {
+ super(sendingMailboxes, splitter);
}
@Override
- protected Iterator<RoutedBlock> route(List<MailboxIdentifier> destinations, TransferableBlock block) {
- return destinations.stream().map(dest -> new RoutedBlock(dest, block)).iterator();
+ protected void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block) {
+ for (SendingMailbox mailbox : destinations) {
+ sendBlock(mailbox, block);
+ }
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
index 2371221740..99f0e04f91 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
@@ -18,14 +18,9 @@
*/
package org.apache.pinot.query.runtime.operator.exchange;
-import com.google.common.collect.Iterators;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import org.apache.pinot.query.mailbox.MailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -41,25 +36,26 @@ class HashExchange extends BlockExchange {
// TODO: ensure that server instance list is sorted using same function in sender.
private final KeySelector<Object[], Object[]> _keySelector;
- HashExchange(MailboxService<TransferableBlock> mailbox, List<MailboxIdentifier> destinations,
- KeySelector<Object[], Object[]> selector, BlockSplitter splitter) {
- super(mailbox, destinations, splitter);
+ HashExchange(List<SendingMailbox<TransferableBlock>> sendingMailboxes, KeySelector<Object[], Object[]> selector,
+ BlockSplitter splitter) {
+ super(sendingMailboxes, splitter);
_keySelector = selector;
}
@Override
- protected Iterator<RoutedBlock> route(List<MailboxIdentifier> destinations, TransferableBlock block) {
- Map<Integer, List<Object[]>> destIdxToRows = new HashMap<>();
-
+ protected void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block) {
+ List<Object[]>[] destIdxToRows = new List[destinations.size()];
for (Object[] row : block.getContainer()) {
int partition = _keySelector.computeHash(row) % destinations.size();
- destIdxToRows.computeIfAbsent(partition, k -> new ArrayList<>()).add(row);
+ if (destIdxToRows[partition] == null) {
+ destIdxToRows[partition] = new ArrayList<>();
+ }
+ destIdxToRows[partition].add(row);
+ }
+ for (int i = 0; i < destinations.size(); i++) {
+ if (destIdxToRows[i] != null) {
+ sendBlock(destinations.get(i), new TransferableBlock(destIdxToRows[i], block.getDataSchema(), block.getType()));
+ }
}
-
- return Iterators.transform(
- destIdxToRows.entrySet().iterator(),
- partitionAndBlock -> new RoutedBlock(
- destinations.get(partitionAndBlock.getKey()),
- new TransferableBlock(partitionAndBlock.getValue(), block.getDataSchema(), block.getType())));
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java
index d12cc3ae46..0073e28bf0 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java
@@ -19,13 +19,10 @@
package org.apache.pinot.query.runtime.operator.exchange;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterators;
-import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.function.IntFunction;
-import org.apache.pinot.query.mailbox.MailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -39,21 +36,20 @@ class RandomExchange extends BlockExchange {
private final IntFunction<Integer> _rand;
- RandomExchange(MailboxService<TransferableBlock> mailbox, List<MailboxIdentifier> destinations,
- BlockSplitter splitter) {
- this(mailbox, destinations, RANDOM::nextInt, splitter);
+ RandomExchange(List<SendingMailbox<TransferableBlock>> sendingMailboxes, BlockSplitter splitter) {
+ this(sendingMailboxes, RANDOM::nextInt, splitter);
}
@VisibleForTesting
- RandomExchange(MailboxService<TransferableBlock> mailbox, List<MailboxIdentifier> destinations,
- IntFunction<Integer> rand, BlockSplitter splitter) {
- super(mailbox, destinations, splitter);
+ RandomExchange(List<SendingMailbox<TransferableBlock>> sendingMailboxes, IntFunction<Integer> rand,
+ BlockSplitter splitter) {
+ super(sendingMailboxes, splitter);
_rand = rand;
}
@Override
- protected Iterator<RoutedBlock> route(List<MailboxIdentifier> destinations, TransferableBlock block) {
+ protected void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block) {
int destinationIdx = _rand.apply(destinations.size());
- return Iterators.singletonIterator(new RoutedBlock(destinations.get(destinationIdx), block));
+ sendBlock(destinations.get(destinationIdx), block);
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
index 94568e57db..713cfa5ba2 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
@@ -18,12 +18,8 @@
*/
package org.apache.pinot.query.runtime.operator.exchange;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-import java.util.Iterator;
import java.util.List;
-import org.apache.pinot.query.mailbox.MailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -34,13 +30,14 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock;
*/
class SingletonExchange extends BlockExchange {
- SingletonExchange(MailboxService<TransferableBlock> mailbox, List<MailboxIdentifier> destinations,
- BlockSplitter splitter) {
- super(mailbox, destinations, splitter);
+ SingletonExchange(List<SendingMailbox<TransferableBlock>> sendingMailboxes, BlockSplitter splitter) {
+ super(sendingMailboxes, splitter);
}
@Override
- protected Iterator<RoutedBlock> route(List<MailboxIdentifier> destinations, TransferableBlock block) {
- return Iterators.singletonIterator(new RoutedBlock(Iterables.getOnlyElement(destinations), block));
+ protected void route(List<SendingMailbox<TransferableBlock>> mailbox, TransferableBlock block) {
+ for (SendingMailbox sendingMailbox : mailbox) {
+ sendBlock(sendingMailbox, block);
+ }
}
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
index 43663c5e0a..033f010a8c 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
@@ -20,15 +20,10 @@ package org.apache.pinot.query.runtime.operator.exchange;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
-import java.util.Iterator;
import java.util.List;
-import java.util.function.BiFunction;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -44,13 +39,8 @@ import org.testng.annotations.Test;
public class BlockExchangeTest {
- private static final MailboxIdentifier MAILBOX_1 = new JsonMailboxIdentifier("1", "0@host:1", "0@host:1");
- private static final MailboxIdentifier MAILBOX_2 = new JsonMailboxIdentifier("1", "0@host:1", "0@host:2");
-
private AutoCloseable _mocks;
- @Mock
- private MailboxService<TransferableBlock> _mailboxService;
@Mock
private SendingMailbox<TransferableBlock> _mailbox1;
@Mock
@@ -59,8 +49,6 @@ public class BlockExchangeTest {
@BeforeMethod
public void setUp() {
_mocks = MockitoAnnotations.openMocks(this);
- Mockito.when(_mailboxService.getSendingMailbox(MAILBOX_1)).thenReturn(_mailbox1);
- Mockito.when(_mailboxService.getSendingMailbox(MAILBOX_2)).thenReturn(_mailbox2);
}
@AfterMethod
@@ -72,13 +60,8 @@ public class BlockExchangeTest {
@Test
public void shouldSendEosBlockToAllDestinations() {
// Given:
- List<MailboxIdentifier> destinations = ImmutableList.of(MAILBOX_1, MAILBOX_2);
- BlockExchange exchange = new TestBlockExchange(
- _mailboxService,
- destinations,
- (dest, block) -> Iterators.singletonIterator(new BlockExchange.RoutedBlock(MAILBOX_1, block))
- );
-
+ List<SendingMailbox<TransferableBlock>> destinations = ImmutableList.of(_mailbox1, _mailbox2);
+ BlockExchange exchange = new TestBlockExchange(destinations);
// When:
exchange.send(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -97,12 +80,8 @@ public class BlockExchangeTest {
@Test
public void shouldSendDataBlocksOnlyToTargetDestination() {
// Given:
- List<MailboxIdentifier> destinations = ImmutableList.of(MAILBOX_1, MAILBOX_2);
- BlockExchange exchange = new TestBlockExchange(
- _mailboxService,
- destinations,
- (dest, block) -> Iterators.singletonIterator(new BlockExchange.RoutedBlock(MAILBOX_1, block))
- );
+ List<SendingMailbox<TransferableBlock>> destinations = ImmutableList.of(_mailbox1);
+ BlockExchange exchange = new TestBlockExchange(destinations);
TransferableBlock block = new TransferableBlock(ImmutableList.of(new Object[]{"val"}),
new DataSchema(new String[]{"foo"}, new ColumnDataType[]{ColumnDataType.STRING}), DataBlock.Type.ROW);
@@ -120,25 +99,21 @@ public class BlockExchangeTest {
@Test
public void shouldSplitBlocks() {
// Given:
- List<MailboxIdentifier> destinations = ImmutableList.of(MAILBOX_1, MAILBOX_2);
+ List<SendingMailbox<TransferableBlock>> destinations = ImmutableList.of(_mailbox1);
DataSchema schema = new DataSchema(new String[]{"foo"}, new ColumnDataType[]{ColumnDataType.STRING});
- TransferableBlock inBlock = new TransferableBlock(
- ImmutableList.of(new Object[]{"one"}, new Object[]{"two"}), schema, DataBlock.Type.ROW);
+ TransferableBlock inBlock =
+ new TransferableBlock(ImmutableList.of(new Object[]{"one"}, new Object[]{"two"}), schema, DataBlock.Type.ROW);
- TransferableBlock outBlockOne = new TransferableBlock(
- ImmutableList.of(new Object[]{"one"}), schema, DataBlock.Type.ROW);
+ TransferableBlock outBlockOne =
+ new TransferableBlock(ImmutableList.of(new Object[]{"one"}), schema, DataBlock.Type.ROW);
- TransferableBlock outBlockTwo = new TransferableBlock(
- ImmutableList.of(new Object[]{"two"}), schema, DataBlock.Type.ROW);
+ TransferableBlock outBlockTwo =
+ new TransferableBlock(ImmutableList.of(new Object[]{"two"}), schema, DataBlock.Type.ROW);
- BlockExchange exchange = new TestBlockExchange(
- _mailboxService,
- destinations,
- (dest, block) -> Iterators.singletonIterator(new BlockExchange.RoutedBlock(MAILBOX_1, block)),
- (block, type, maxSize) -> ImmutableList.of(outBlockOne, outBlockTwo).iterator()
- );
+ BlockExchange exchange = new TestBlockExchange(destinations,
+ (block, type, maxSize) -> ImmutableList.of(outBlockOne, outBlockTwo).iterator());
// When:
exchange.send(inBlock);
@@ -154,23 +129,19 @@ public class BlockExchangeTest {
}
private static class TestBlockExchange extends BlockExchange {
-
- private final BiFunction<List<MailboxIdentifier>, TransferableBlock, Iterator<RoutedBlock>> _router;
-
- protected TestBlockExchange(MailboxService<TransferableBlock> mailbox, List<MailboxIdentifier> destinations,
- BiFunction<List<MailboxIdentifier>, TransferableBlock, Iterator<RoutedBlock>> router) {
- this(mailbox, destinations, router, (block, type, size) -> Iterators.singletonIterator(block));
+ protected TestBlockExchange(List<SendingMailbox<TransferableBlock>> destinations) {
+ this(destinations, (block, type, size) -> Iterators.singletonIterator(block));
}
- protected TestBlockExchange(MailboxService<TransferableBlock> mailbox, List<MailboxIdentifier> destinations,
- BiFunction<List<MailboxIdentifier>, TransferableBlock, Iterator<RoutedBlock>> router, BlockSplitter splitter) {
- super(mailbox, destinations, splitter);
- _router = router;
+ protected TestBlockExchange(List<SendingMailbox<TransferableBlock>> destinations, BlockSplitter splitter) {
+ super(destinations, splitter);
}
@Override
- protected Iterator<RoutedBlock> route(List<MailboxIdentifier> destinations, TransferableBlock block) {
- return _router.apply(destinations, block);
+ protected void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block) {
+ for (SendingMailbox mailbox : destinations) {
+ sendBlock(mailbox, block);
+ }
}
}
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
index eccd31fe6a..b39d8e43fb 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
@@ -19,13 +19,13 @@
package org.apache.pinot.query.runtime.operator.exchange;
import com.google.common.collect.ImmutableList;
-import java.util.Iterator;
-import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -34,19 +34,20 @@ import org.testng.annotations.Test;
public class BroadcastExchangeTest {
- private static final MailboxIdentifier MAILBOX_1 = new JsonMailboxIdentifier("1", "0@host:1", "0@host:1");
- private static final MailboxIdentifier MAILBOX_2 = new JsonMailboxIdentifier("1", "0@host:1", "0@host:2");
-
private AutoCloseable _mocks;
@Mock
TransferableBlock _block;
+
+ @Mock
+ private SendingMailbox<TransferableBlock> _mailbox1;
@Mock
- MailboxService<TransferableBlock> _mailboxService;
+ private SendingMailbox<TransferableBlock> _mailbox2;
@BeforeMethod
public void setUp() {
_mocks = MockitoAnnotations.openMocks(this);
+ Mockito.when(_block.getType()).thenReturn(DataBlock.Type.METADATA);
}
@AfterMethod
@@ -58,22 +59,17 @@ public class BroadcastExchangeTest {
@Test
public void shouldBroadcast() {
// Given:
- ImmutableList<MailboxIdentifier> destinations = ImmutableList.of(MAILBOX_1, MAILBOX_2);
+ ImmutableList<SendingMailbox<TransferableBlock>> destinations = ImmutableList.of(_mailbox1, _mailbox2);
// When:
- Iterator<BlockExchange.RoutedBlock> route =
- new BroadcastExchange(_mailboxService, destinations, TransferableBlockUtils::splitBlock)
- .route(destinations, _block);
+ new BroadcastExchange(destinations, TransferableBlockUtils::splitBlock).route(destinations, _block);
- // Then:
- BlockExchange.RoutedBlock routedBlock = route.next();
- Assert.assertEquals(routedBlock._destination, MAILBOX_1);
- Assert.assertEquals(routedBlock._block, _block);
+ ArgumentCaptor<TransferableBlock> captor = ArgumentCaptor.forClass(TransferableBlock.class);
- routedBlock = route.next();
- Assert.assertEquals(routedBlock._destination, MAILBOX_2);
- Assert.assertEquals(routedBlock._block, _block);
+ Mockito.verify(_mailbox1, Mockito.times(1)).send(captor.capture());
+ Assert.assertEquals(captor.getValue(), _block);
- Assert.assertFalse(route.hasNext(), "should be done with routing");
+ Mockito.verify(_mailbox2, Mockito.times(1)).send(captor.capture());
+ Assert.assertEquals(captor.getValue(), _block);
}
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
index 31ff4a70f3..50b8b4bdc9 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
@@ -21,12 +21,13 @@ package org.apache.pinot.query.runtime.operator.exchange;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import java.util.Iterator;
-import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
@@ -37,19 +38,22 @@ import org.testng.annotations.Test;
public class HashExchangeTest {
- private static final MailboxIdentifier MAILBOX_1 = new JsonMailboxIdentifier("1", "0@host:1", "0@host:1");
- private static final MailboxIdentifier MAILBOX_2 = new JsonMailboxIdentifier("1", "0@host:1", "0@host:2");
+ @Mock
+ private SendingMailbox<TransferableBlock> _mailbox1;
+ @Mock
+ private SendingMailbox<TransferableBlock> _mailbox2;
private AutoCloseable _mocks;
@Mock
TransferableBlock _block;
- @Mock
- MailboxService<TransferableBlock> _mailboxService;
@BeforeMethod
public void setUp() {
_mocks = MockitoAnnotations.openMocks(this);
+ Mockito.when(_block.getType()).thenReturn(DataBlock.Type.ROW);
+ Mockito.when(_block.getDataSchema()).thenReturn(
+ new DataSchema(new String[]{"col1"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
}
@AfterMethod
@@ -62,29 +66,21 @@ public class HashExchangeTest {
public void shouldSplitAndRouteBlocksBasedOnPartitionKey() {
// Given:
TestSelector selector = new TestSelector(Iterators.forArray(2, 0, 1));
- Mockito.when(_block.getContainer()).thenReturn(ImmutableList.of(
- new Object[]{0},
- new Object[]{1},
- new Object[]{2}
- ));
- ImmutableList<MailboxIdentifier> destinations = ImmutableList.of(MAILBOX_1, MAILBOX_2);
+ Mockito.when(_block.getContainer()).thenReturn(ImmutableList.of(new Object[]{0}, new Object[]{1}, new Object[]{2}));
+ ImmutableList<SendingMailbox<TransferableBlock>> destinations = ImmutableList.of(_mailbox1, _mailbox2);
// When:
- Iterator<BlockExchange.RoutedBlock> route =
- new HashExchange(_mailboxService, destinations, selector, TransferableBlockUtils::splitBlock)
- .route(destinations, _block);
+ new HashExchange(destinations, selector, TransferableBlockUtils::splitBlock).route(destinations, _block);
// Then:
- BlockExchange.RoutedBlock block1 = route.next();
- Assert.assertEquals(block1._destination, MAILBOX_1);
- Assert.assertEquals(block1._block.getContainer().get(0), new Object[]{0});
- Assert.assertEquals(block1._block.getContainer().get(1), new Object[]{1});
+ ArgumentCaptor<TransferableBlock> captor = ArgumentCaptor.forClass(TransferableBlock.class);
- BlockExchange.RoutedBlock block2 = route.next();
- Assert.assertEquals(block2._destination, MAILBOX_2);
- Assert.assertEquals(block2._block.getContainer().get(0), new Object[]{2});
+ Mockito.verify(_mailbox1, Mockito.times(1)).send(captor.capture());
+ Assert.assertEquals(captor.getValue().getContainer().get(0), new Object[]{0});
+ Assert.assertEquals(captor.getValue().getContainer().get(1), new Object[]{1});
- Assert.assertFalse(route.hasNext());
+ Mockito.verify(_mailbox2, Mockito.times(1)).send(captor.capture());
+ Assert.assertEquals(captor.getValue().getContainer().get(0), new Object[]{2});
}
private static class TestSelector implements KeySelector<Object[], Object[]> {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
index c93080824c..ab97ea6196 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
@@ -19,33 +19,34 @@
package org.apache.pinot.query.runtime.operator.exchange;
import com.google.common.collect.ImmutableList;
-import java.util.Iterator;
-import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-public class RandomExchangeTest {
- private static final MailboxIdentifier MAILBOX_1 = new JsonMailboxIdentifier("1", "0@host:1", "0@host:1");
- private static final MailboxIdentifier MAILBOX_2 = new JsonMailboxIdentifier("1", "0@host:1", "0@host:2");
+public class RandomExchangeTest {
private AutoCloseable _mocks;
@Mock
TransferableBlock _block;
@Mock
- MailboxService<TransferableBlock> _mailboxService;
+ private SendingMailbox<TransferableBlock> _mailbox1;
+ @Mock
+ private SendingMailbox<TransferableBlock> _mailbox2;
@BeforeMethod
public void setUp() {
_mocks = MockitoAnnotations.openMocks(this);
+ Mockito.when(_block.getType()).thenReturn(DataBlock.Type.METADATA);
}
@AfterMethod
@@ -57,17 +58,14 @@ public class RandomExchangeTest {
@Test
public void shouldRouteRandomly() {
// Given:
- ImmutableList<MailboxIdentifier> destinations = ImmutableList.of(MAILBOX_1, MAILBOX_2);
+ ImmutableList<SendingMailbox<TransferableBlock>> destinations = ImmutableList.of(_mailbox1, _mailbox2);
// When:
- Iterator<BlockExchange.RoutedBlock> route =
- new RandomExchange(_mailboxService, destinations, size -> 1, TransferableBlockUtils::splitBlock)
- .route(destinations, _block);
+ new RandomExchange(destinations, size -> 1, TransferableBlockUtils::splitBlock).route(destinations, _block);
+ ArgumentCaptor<TransferableBlock> captor = ArgumentCaptor.forClass(TransferableBlock.class);
// Then:
- BlockExchange.RoutedBlock routedBlock = route.next();
- Assert.assertEquals(routedBlock._destination, MAILBOX_2);
- Assert.assertEquals(routedBlock._block, _block);
- Assert.assertFalse(route.hasNext(), "should be done with routing");
+ Mockito.verify(_mailbox2, Mockito.times(1)).send(captor.capture());
+ Assert.assertEquals(captor.getValue(), _block);
}
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
index 6b9dcd53f1..09855be2a4 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
@@ -19,13 +19,13 @@
package org.apache.pinot.query.runtime.operator.exchange;
import com.google.common.collect.ImmutableList;
-import java.util.Iterator;
-import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -34,18 +34,17 @@ import org.testng.annotations.Test;
public class SingletonExchangeTest {
- private static final MailboxIdentifier MAILBOX_1 = new JsonMailboxIdentifier("1", "0@host:1", "0@host:1");
-
private AutoCloseable _mocks;
@Mock
TransferableBlock _block;
@Mock
- MailboxService<TransferableBlock> _mailboxService;
+ private SendingMailbox<TransferableBlock> _mailbox1;
@BeforeMethod
public void setUp() {
_mocks = MockitoAnnotations.openMocks(this);
+ Mockito.when(_block.getType()).thenReturn(DataBlock.Type.METADATA);
}
@AfterMethod
@@ -57,17 +56,15 @@ public class SingletonExchangeTest {
@Test
public void shouldRouteSingleton() {
// Given:
- ImmutableList<MailboxIdentifier> destinations = ImmutableList.of(MAILBOX_1);
+ ImmutableList<SendingMailbox<TransferableBlock>> destinations = ImmutableList.of(_mailbox1);
// When:
- Iterator<BlockExchange.RoutedBlock> route =
- new SingletonExchange(_mailboxService, destinations, TransferableBlockUtils::splitBlock)
- .route(destinations, _block);
+ new SingletonExchange(destinations, TransferableBlockUtils::splitBlock).route(destinations, _block);
// Then:
- BlockExchange.RoutedBlock routedBlock = route.next();
- Assert.assertEquals(routedBlock._destination, MAILBOX_1);
- Assert.assertEquals(routedBlock._block, _block);
- Assert.assertFalse(route.hasNext(), "should be done with routing");
+ ArgumentCaptor<TransferableBlock> captor = ArgumentCaptor.forClass(TransferableBlock.class);
+ // Then:
+ Mockito.verify(_mailbox1, Mockito.times(1)).send(captor.capture());
+ Assert.assertEquals(captor.getValue(), _block);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org