You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/14 08:03:08 UTC
(pinot) branch master updated: [Multi-stage] Fix SortedMailboxReceiveOperator to not pull 2 EOS blocks (#12406)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a5ae4d738a [Multi-stage] Fix SortedMailboxReceiveOperator to not pull 2 EOS blocks (#12406)
a5ae4d738a is described below
commit a5ae4d738a0e646d500b6bb3e2031f7227f035e2
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed Feb 14 00:03:02 2024 -0800
[Multi-stage] Fix SortedMailboxReceiveOperator to not pull 2 EOS blocks (#12406)
---
.../runtime/operator/BaseMailboxReceiveOperator.java | 10 +---------
.../query/runtime/operator/MailboxReceiveOperator.java | 4 ++--
.../runtime/operator/SortedMailboxReceiveOperator.java | 17 +++++++++++------
3 files changed, 14 insertions(+), 17 deletions(-)
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
index 808caba04e..a88e122739 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
@@ -47,7 +47,7 @@ public abstract class BaseMailboxReceiveOperator extends MultiStageOperator {
protected final MailboxService _mailboxService;
protected final RelDistribution.Type _exchangeType;
protected final List<String> _mailboxIds;
- private final BlockingMultiStreamConsumer.OfTransferableBlock _multiConsumer;
+ protected final BlockingMultiStreamConsumer.OfTransferableBlock _multiConsumer;
public BaseMailboxReceiveOperator(OpChainExecutionContext context, RelDistribution.Type exchangeType,
int senderStageId) {
@@ -73,14 +73,6 @@ public abstract class BaseMailboxReceiveOperator extends MultiStageOperator {
new BlockingMultiStreamConsumer.OfTransferableBlock(context.getId(), context.getDeadlineMs(), asyncStreams);
}
- protected BlockingMultiStreamConsumer.OfTransferableBlock getMultiConsumer() {
- return _multiConsumer;
- }
-
- public List<String> getMailboxIds() {
- return _mailboxIds;
- }
-
@Override
protected void earlyTerminate() {
_isEarlyTerminated = true;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index ad7913cdc1..584b49640f 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -42,13 +42,13 @@ public class MailboxReceiveOperator extends BaseMailboxReceiveOperator {
@Override
protected TransferableBlock getNextBlock() {
- TransferableBlock block = getMultiConsumer().readBlockBlocking();
+ TransferableBlock block = _multiConsumer.readBlockBlocking();
// When early termination flag is set, caller is expecting an EOS block to be returned, however since the 2 stages
// between sending/receiving mailbox are setting early termination flag asynchronously, there's chances that the
// next block pulled out of the ReceivingMailbox to be an already buffered normal data block. This requires the
// MailboxReceiveOperator to continue pulling and dropping data block until an EOS block is observed.
while (_isEarlyTerminated && !block.isEndOfStreamBlock()) {
- block = getMultiConsumer().readBlockBlocking();
+ block = _multiConsumer.readBlockBlocking();
}
return block;
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
index 05804c12d3..8949ad569a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
@@ -52,7 +52,7 @@ public class SortedMailboxReceiveOperator extends BaseMailboxReceiveOperator {
private final boolean _isSortOnSender;
private final List<Object[]> _rows = new ArrayList<>();
- private boolean _isSortedBlockConstructed;
+ private TransferableBlock _eosBlock;
public SortedMailboxReceiveOperator(OpChainExecutionContext context, RelDistribution.Type exchangeType,
DataSchema dataSchema, List<RexExpression> collationKeys, List<Direction> collationDirections,
@@ -74,20 +74,25 @@ public class SortedMailboxReceiveOperator extends BaseMailboxReceiveOperator {
@Override
protected TransferableBlock getNextBlock() {
- while (true) { // loop in order to keep asking if we receive data blocks
- TransferableBlock block = getMultiConsumer().readBlockBlocking();
+ if (_eosBlock != null) {
+ return _eosBlock;
+ }
+ // Collect all the rows from the mailbox and sort them
+ while (true) {
+ TransferableBlock block = _multiConsumer.readBlockBlocking();
if (block.isDataBlock()) {
_rows.addAll(block.getContainer());
} else if (block.isErrorBlock()) {
return block;
} else {
assert block.isSuccessfulEndOfStreamBlock();
-
- if (!_isSortedBlockConstructed && !_rows.isEmpty()) {
+ if (!_rows.isEmpty()) {
+ _eosBlock = block;
+ // TODO: This might not be efficient because we are sorting all the received rows. We should use a k-way merge
+ // when sender side is sorted.
_rows.sort(
new SortUtils.SortComparator(_collationKeys, _collationDirections, _collationNullDirections, _dataSchema,
false));
- _isSortedBlockConstructed = true;
return new TransferableBlock(_rows, _dataSchema, DataBlock.Type.ROW);
} else {
return block;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org