You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/14 08:03:08 UTC

(pinot) branch master updated: [Multi-stage] Fix SortedMailboxReceiveOperator to not pull 2 EOS blocks (#12406)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a5ae4d738a [Multi-stage] Fix SortedMailboxReceiveOperator to not pull 2 EOS blocks (#12406)
a5ae4d738a is described below

commit a5ae4d738a0e646d500b6bb3e2031f7227f035e2
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed Feb 14 00:03:02 2024 -0800

    [Multi-stage] Fix SortedMailboxReceiveOperator to not pull 2 EOS blocks (#12406)
---
 .../runtime/operator/BaseMailboxReceiveOperator.java    | 10 +---------
 .../query/runtime/operator/MailboxReceiveOperator.java  |  4 ++--
 .../runtime/operator/SortedMailboxReceiveOperator.java  | 17 +++++++++++------
 3 files changed, 14 insertions(+), 17 deletions(-)

diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
index 808caba04e..a88e122739 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
@@ -47,7 +47,7 @@ public abstract class BaseMailboxReceiveOperator extends MultiStageOperator {
   protected final MailboxService _mailboxService;
   protected final RelDistribution.Type _exchangeType;
   protected final List<String> _mailboxIds;
-  private final BlockingMultiStreamConsumer.OfTransferableBlock _multiConsumer;
+  protected final BlockingMultiStreamConsumer.OfTransferableBlock _multiConsumer;
 
   public BaseMailboxReceiveOperator(OpChainExecutionContext context, RelDistribution.Type exchangeType,
       int senderStageId) {
@@ -73,14 +73,6 @@ public abstract class BaseMailboxReceiveOperator extends MultiStageOperator {
         new BlockingMultiStreamConsumer.OfTransferableBlock(context.getId(), context.getDeadlineMs(), asyncStreams);
   }
 
-  protected BlockingMultiStreamConsumer.OfTransferableBlock getMultiConsumer() {
-    return _multiConsumer;
-  }
-
-  public List<String> getMailboxIds() {
-    return _mailboxIds;
-  }
-
   @Override
   protected void earlyTerminate() {
     _isEarlyTerminated = true;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index ad7913cdc1..584b49640f 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -42,13 +42,13 @@ public class MailboxReceiveOperator extends BaseMailboxReceiveOperator {
 
   @Override
   protected TransferableBlock getNextBlock() {
-    TransferableBlock block = getMultiConsumer().readBlockBlocking();
+    TransferableBlock block = _multiConsumer.readBlockBlocking();
     // When early termination flag is set, caller is expecting an EOS block to be returned, however since the 2 stages
     // between sending/receiving mailbox are setting early termination flag asynchronously, there's chances that the
     // next block pulled out of the ReceivingMailbox to be an already buffered normal data block. This requires the
     // MailboxReceiveOperator to continue pulling and dropping data block until an EOS block is observed.
     while (_isEarlyTerminated && !block.isEndOfStreamBlock()) {
-      block = getMultiConsumer().readBlockBlocking();
+      block = _multiConsumer.readBlockBlocking();
     }
     return block;
   }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
index 05804c12d3..8949ad569a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
@@ -52,7 +52,7 @@ public class SortedMailboxReceiveOperator extends BaseMailboxReceiveOperator {
   private final boolean _isSortOnSender;
   private final List<Object[]> _rows = new ArrayList<>();
 
-  private boolean _isSortedBlockConstructed;
+  private TransferableBlock _eosBlock;
 
   public SortedMailboxReceiveOperator(OpChainExecutionContext context, RelDistribution.Type exchangeType,
       DataSchema dataSchema, List<RexExpression> collationKeys, List<Direction> collationDirections,
@@ -74,20 +74,25 @@ public class SortedMailboxReceiveOperator extends BaseMailboxReceiveOperator {
 
   @Override
   protected TransferableBlock getNextBlock() {
-    while (true) { // loop in order to keep asking if we receive data blocks
-      TransferableBlock block = getMultiConsumer().readBlockBlocking();
+    if (_eosBlock != null) {
+      return _eosBlock;
+    }
+    // Collect all the rows from the mailbox and sort them
+    while (true) {
+      TransferableBlock block = _multiConsumer.readBlockBlocking();
       if (block.isDataBlock()) {
         _rows.addAll(block.getContainer());
       } else if (block.isErrorBlock()) {
         return block;
       } else {
         assert block.isSuccessfulEndOfStreamBlock();
-
-        if (!_isSortedBlockConstructed && !_rows.isEmpty()) {
+        if (!_rows.isEmpty()) {
+          _eosBlock = block;
+          // TODO: This might not be efficient because we are sorting all the received rows. We should use a k-way merge
+          //       when sender side is sorted.
           _rows.sort(
               new SortUtils.SortComparator(_collationKeys, _collationDirections, _collationNullDirections, _dataSchema,
                   false));
-          _isSortedBlockConstructed = true;
           return new TransferableBlock(_rows, _dataSchema, DataBlock.Type.ROW);
         } else {
           return block;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org