You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "Jackie-Jiang (via GitHub)" <gi...@apache.org> on 2023/04/25 00:18:53 UTC

[GitHub] [pinot] Jackie-Jiang opened a new pull request, #10681: Re-implement mailbox

Jackie-Jiang opened a new pull request, #10681:
URL: https://github.com/apache/pinot/pull/10681

   - Use `string` as the mailbox id
   - Use one single `MailboxService` implementation to handle both local and remote mails
   - Solve the dangling mailbox caused by late mails
     - If the mailbox is not fully consumed (no successful EOS), keep it within the cache but remove all the data blocks (use it as tombstone)
     - Do not allow new data blocks to be added when mailbox is cancelled/errored


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10681: [multistage]re-implement mailbox

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10681:
URL: https://github.com/apache/pinot/pull/10681#discussion_r1177072845


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java:
##########
@@ -109,19 +97,18 @@ public List<MultiStageOperator> getChildOperators() {
   @Override
   public void close() {
     super.close();
-    releaseRemainingMailboxes();
+    cancelRemainingMailboxes();
   }
 
   @Override
   public void cancel(Throwable t) {
     super.cancel(t);
-    releaseRemainingMailboxes();
+    cancelRemainingMailboxes();

Review Comment:
   ```suggestion
       cancelRemainingMailboxes(t);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10681: [multistage]re-implement mailbox

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10681:
URL: https://github.com/apache/pinot/pull/10681#discussion_r1177072210


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java:
##########
@@ -18,55 +18,118 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 import javax.annotation.Nullable;
-import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
-import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * Mailbox that's used to receive data. Ownership of the ReceivingMailbox is with the MailboxService, which is unlike
- * the {@link SendingMailbox} whose ownership lies with the {@link MailboxSendOperator}. This is because the
- * ReceivingMailbox can be initialized even before the corresponding OpChain is registered on the receiver, whereas
- * the SendingMailbox is initialized when the MailboxSendOperator is running. Also see {@link #isInitialized()}.
- *
- * @param <T> the unit of data that each {@link #receive()} call returns.
+ * the {@link SendingMailbox} whose ownership lies with the send operator. This is because the ReceivingMailbox can be
+ * initialized even before the corresponding OpChain is registered on the receiver, whereas the SendingMailbox is
+ * initialized when the send operator is running.
  */
-public interface ReceivingMailbox<T> {
+public class ReceivingMailbox {
+  public static final int DEFAULT_MAX_PENDING_BLOCKS = 5;
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ReceivingMailbox.class);
+  private static final TransferableBlock CANCELLED_ERROR_BLOCK =
+      TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException("Cancelled by receiver"));

Review Comment:
   "guaranteed to be cancelled by receiver": yes it will only be called via opChain.cancel(), but the opChain might've been cancelled due to upstream error block fromm the sender 
   
   to make it more clear we should pass the actual error opChain.cancel(t) is carrying 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10681: [multistage]re-implement mailbox

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #10681:
URL: https://github.com/apache/pinot/pull/10681#discussion_r1177082515


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java:
##########
@@ -18,80 +18,117 @@
  */
 package org.apache.pinot.query.mailbox;
 
-import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
-import org.apache.pinot.query.runtime.operator.OpChain;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.apache.pinot.query.mailbox.channel.ChannelManager;
+import org.apache.pinot.query.mailbox.channel.GrpcMailboxServer;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * Mailbox service that handles transfer for mailbox contents.
- *
- * @param <T> type of content supported by this mailbox service.
+ * Mailbox service that handles data transfer.
  */
-public interface MailboxService<T> {
+public class MailboxService {
+  private static final Logger LOGGER = LoggerFactory.getLogger(MailboxService.class);
+  private static final int DANGLING_RECEIVING_MAILBOX_EXPIRY_SECONDS = 300;
 
-  /**
-   * Starting the mailbox service.
-   */
-  void start();
+  // We use a cache to ensure the receiving mailbox are not leaked in the cases where the corresponding OpChain is
+  // either never registered or died before the sender finished sending data.
+  private final Cache<String, ReceivingMailbox> _receivingMailboxCache =
+      CacheBuilder.newBuilder().expireAfterAccess(DANGLING_RECEIVING_MAILBOX_EXPIRY_SECONDS, TimeUnit.SECONDS)
+          .removalListener((RemovalListener<String, ReceivingMailbox>) notification -> {
+            if (notification.wasEvicted()) {
+              int numPendingBlocks = notification.getValue().getNumPendingBlocks();
+              if (numPendingBlocks > 0) {
+                LOGGER.warn("Evicting dangling receiving mailbox: {} with {} pending blocks", notification.getKey(),
+                    numPendingBlocks);
+              }
+            }
+          }).build();
 
-  /**
-   * Shutting down the mailbox service.
-   */
-  void shutdown();
+  private final String _hostname;
+  private final int _port;
+  private final PinotConfiguration _config;
+  private final Consumer<String> _receiveMailCallback;
+  private final ChannelManager _channelManager = new ChannelManager();
+
+  private GrpcMailboxServer _grpcMailboxServer;
+
+  public MailboxService(String hostname, int port, PinotConfiguration config, Consumer<String> receiveMailCallback) {
+    _hostname = hostname;
+    _port = port;
+    _config = config;
+    _receiveMailCallback = receiveMailCallback;
+    LOGGER.info("Initialized MailboxService with hostname: {}, port: {}", hostname, port);

Review Comment:
   With the current `MailboxService`, `GrpcMailboxServer` is a component within the mailbox service. The port is also used to determine whether the sending mailbox is local vs remote, so I'd prefer keeping it here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr merged pull request #10681: [multistage]re-implement mailbox

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr merged PR #10681:
URL: https://github.com/apache/pinot/pull/10681


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10681: [multistage]re-implement mailbox

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10681:
URL: https://github.com/apache/pinot/pull/10681#discussion_r1177058005


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java:
##########
@@ -18,80 +18,117 @@
  */
 package org.apache.pinot.query.mailbox;
 
-import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
-import org.apache.pinot.query.runtime.operator.OpChain;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.apache.pinot.query.mailbox.channel.ChannelManager;
+import org.apache.pinot.query.mailbox.channel.GrpcMailboxServer;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * Mailbox service that handles transfer for mailbox contents.
- *
- * @param <T> type of content supported by this mailbox service.
+ * Mailbox service that handles data transfer.
  */
-public interface MailboxService<T> {
+public class MailboxService {
+  private static final Logger LOGGER = LoggerFactory.getLogger(MailboxService.class);
+  private static final int DANGLING_RECEIVING_MAILBOX_EXPIRY_SECONDS = 300;
 
-  /**
-   * Starting the mailbox service.
-   */
-  void start();
+  // We use a cache to ensure the receiving mailbox are not leaked in the cases where the corresponding OpChain is
+  // either never registered or died before the sender finished sending data.
+  private final Cache<String, ReceivingMailbox> _receivingMailboxCache =
+      CacheBuilder.newBuilder().expireAfterAccess(DANGLING_RECEIVING_MAILBOX_EXPIRY_SECONDS, TimeUnit.SECONDS)
+          .removalListener((RemovalListener<String, ReceivingMailbox>) notification -> {
+            if (notification.wasEvicted()) {
+              int numPendingBlocks = notification.getValue().getNumPendingBlocks();
+              if (numPendingBlocks > 0) {
+                LOGGER.warn("Evicting dangling receiving mailbox: {} with {} pending blocks", notification.getKey(),
+                    numPendingBlocks);
+              }
+            }
+          }).build();
 
-  /**
-   * Shutting down the mailbox service.
-   */
-  void shutdown();
+  private final String _hostname;
+  private final int _port;
+  private final PinotConfiguration _config;
+  private final Consumer<String> _receiveMailCallback;
+  private final ChannelManager _channelManager = new ChannelManager();
+
+  private GrpcMailboxServer _grpcMailboxServer;
+
+  public MailboxService(String hostname, int port, PinotConfiguration config, Consumer<String> receiveMailCallback) {
+    _hostname = hostname;
+    _port = port;
+    _config = config;
+    _receiveMailCallback = receiveMailCallback;
+    LOGGER.info("Initialized MailboxService with hostname: {}, port: {}", hostname, port);

Review Comment:
   following the above -- move this log back to GRPCMailboxServer --> as the port is only identifying the GRPC server channel port



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10681: [multistage]re-implement mailbox

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #10681:
URL: https://github.com/apache/pinot/pull/10681#discussion_r1177087610


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -18,59 +18,49 @@
  */
 package org.apache.pinot.query.mailbox;
 
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-import org.apache.pinot.query.mailbox.channel.InMemoryTransferStream;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
-public class InMemorySendingMailbox implements SendingMailbox<TransferableBlock> {
-  private final MailboxIdentifier _mailboxId;
-  private final Supplier<InMemoryTransferStream> _transferStreamProvider;
-  private final Consumer<MailboxIdentifier> _gotMailCallback;
+public class InMemorySendingMailbox implements SendingMailbox {
+  private static final Logger LOGGER = LoggerFactory.getLogger(GrpcSendingMailbox.class);
 
-  private InMemoryTransferStream _transferStream;
+  private final String _id;
+  private final MailboxService _mailboxService;
+  private final long _deadlineMs;
 
-  public InMemorySendingMailbox(MailboxIdentifier mailboxId, Supplier<InMemoryTransferStream> transferStreamProvider,
-      Consumer<MailboxIdentifier> gotMailCallback) {
-    _mailboxId = mailboxId;
-    _transferStreamProvider = transferStreamProvider;
-    _gotMailCallback = gotMailCallback;
-  }
+  private ReceivingMailbox _receivingMailbox;
 
-  @Override
-  public void send(TransferableBlock data)
-      throws Exception {
-    if (!isInitialized()) {
-      initialize();
-    }
-    _transferStream.send(data);
-    _gotMailCallback.accept(_mailboxId);
+  public InMemorySendingMailbox(String id, MailboxService mailboxService, long deadlineMs) {
+    _id = id;
+    _mailboxService = mailboxService;
+    _deadlineMs = deadlineMs;
   }
 
   @Override
-  public void complete()
-      throws Exception {
-    _transferStream.complete();
-    _gotMailCallback.accept(_mailboxId);
+  public void send(TransferableBlock block) {
+    if (_receivingMailbox == null) {
+      _receivingMailbox = _mailboxService.getReceivingMailbox(_id);
+    }
+    long timeoutMs = _deadlineMs - System.currentTimeMillis();
+    if (!_receivingMailbox.offer(block, timeoutMs)) {
+      throw new RuntimeException(String.format("Failed to offer block into mailbox: %s within: %dms", _id, timeoutMs));

Review Comment:
   Technically it is not really an IO exception as there is no IO involved :-P. Similarly we throw `IllegalStateException` (extends `RuntimeException`) in `GrpcSendingMailbox`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #10681: Re-implement mailbox

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #10681:
URL: https://github.com/apache/pinot/pull/10681#issuecomment-1521406124

   ## [Codecov](https://codecov.io/gh/apache/pinot/pull/10681?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#10681](https://codecov.io/gh/apache/pinot/pull/10681?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (e92787b) into [master](https://codecov.io/gh/apache/pinot/commit/96d34fd280b154c9d6f8435ac787b7bebd1e631e?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (96d34fd) will **decrease** coverage by `2.55%`.
   > The diff coverage is `86.14%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #10681      +/-   ##
   ============================================
   - Coverage     70.29%   67.75%   -2.55%     
   + Complexity     6514     5964     -550     
   ============================================
     Files          2108     1575     -533     
     Lines        113904    82683   -31221     
     Branches      17192    13041    -4151     
   ============================================
   - Hits          80065    56019   -24046     
   + Misses        28229    22757    -5472     
   + Partials       5610     3907    -1703     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `?` | |
   | unittests1 | `67.75% <86.14%> (-0.08%)` | :arrow_down: |
   | unittests2 | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/10681?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...he/pinot/query/mailbox/channel/ChannelManager.java](https://codecov.io/gh/apache/pinot/pull/10681?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvbWFpbGJveC9jaGFubmVsL0NoYW5uZWxNYW5hZ2VyLmphdmE=) | `100.00% <ø> (ø)` | |
   | [...y/runtime/operator/exchange/BroadcastExchange.java](https://codecov.io/gh/apache/pinot/pull/10681?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9leGNoYW5nZS9Ccm9hZGNhc3RFeGNoYW5nZS5qYXZh) | `100.00% <ø> (ø)` | |
   | [.../query/runtime/operator/exchange/HashExchange.java](https://codecov.io/gh/apache/pinot/pull/10681?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9leGNoYW5nZS9IYXNoRXhjaGFuZ2UuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...uery/runtime/operator/exchange/RandomExchange.java](https://codecov.io/gh/apache/pinot/pull/10681?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9leGNoYW5nZS9SYW5kb21FeGNoYW5nZS5qYXZh) | `100.00% <ø> (ø)` | |
   | [...y/runtime/operator/exchange/SingletonExchange.java](https://codecov.io/gh/apache/pinot/pull/10681?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9leGNoYW5nZS9TaW5nbGV0b25FeGNoYW5nZS5qYXZh) | `100.00% <ø> (ø)` | |
   | [...ot/query/runtime/plan/OpChainExecutionContext.java](https://codecov.io/gh/apache/pinot/pull/10681?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9wbGFuL09wQ2hhaW5FeGVjdXRpb25Db250ZXh0LmphdmE=) | `96.15% <ø> (ø)` | |
   | [.../runtime/plan/server/ServerPlanRequestContext.java](https://codecov.io/gh/apache/pinot/pull/10681?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9wbGFuL3NlcnZlci9TZXJ2ZXJQbGFuUmVxdWVzdENvbnRleHQuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...t/query/runtime/plan/ServerRequestPlanVisitor.java](https://codecov.io/gh/apache/pinot/pull/10681?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9wbGFuL1NlcnZlclJlcXVlc3RQbGFuVmlzaXRvci5qYXZh) | `86.20% <72.72%> (-0.12%)` | :arrow_down: |
   | [...g/apache/pinot/query/mailbox/ReceivingMailbox.java](https://codecov.io/gh/apache/pinot/pull/10681?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvbWFpbGJveC9SZWNlaXZpbmdNYWlsYm94LmphdmE=) | `73.80% <73.80%> (ø)` | |
   | [...ot/query/runtime/executor/RoundRobinScheduler.java](https://codecov.io/gh/apache/pinot/pull/10681?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9leGVjdXRvci9Sb3VuZFJvYmluU2NoZWR1bGVyLmphdmE=) | `86.74% <75.00%> (-6.12%)` | :arrow_down: |
   | ... and [18 more](https://codecov.io/gh/apache/pinot/pull/10681?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ... and [778 files with indirect coverage changes](https://codecov.io/gh/apache/pinot/pull/10681/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10681: [multistage]re-implement mailbox

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #10681:
URL: https://github.com/apache/pinot/pull/10681#discussion_r1177100281


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java:
##########
@@ -18,55 +18,118 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 import javax.annotation.Nullable;
-import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
-import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * Mailbox that's used to receive data. Ownership of the ReceivingMailbox is with the MailboxService, which is unlike
- * the {@link SendingMailbox} whose ownership lies with the {@link MailboxSendOperator}. This is because the
- * ReceivingMailbox can be initialized even before the corresponding OpChain is registered on the receiver, whereas
- * the SendingMailbox is initialized when the MailboxSendOperator is running. Also see {@link #isInitialized()}.
- *
- * @param <T> the unit of data that each {@link #receive()} call returns.
+ * the {@link SendingMailbox} whose ownership lies with the send operator. This is because the ReceivingMailbox can be
+ * initialized even before the corresponding OpChain is registered on the receiver, whereas the SendingMailbox is
+ * initialized when the send operator is running.
  */
-public interface ReceivingMailbox<T> {
+public class ReceivingMailbox {
+  public static final int DEFAULT_MAX_PENDING_BLOCKS = 5;
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ReceivingMailbox.class);
+  private static final TransferableBlock CANCELLED_ERROR_BLOCK =
+      TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException("Cancelled by receiver"));

Review Comment:
   This is used as tombstone only, and won't be returned or logged in any way (execution flow is already done, and this is for releasing resources), so a shared reference can reduce the memory footprint



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10681: [multistage]re-implement mailbox

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10681:
URL: https://github.com/apache/pinot/pull/10681#discussion_r1177057310


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java:
##########
@@ -18,80 +18,117 @@
  */
 package org.apache.pinot.query.mailbox;
 
-import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
-import org.apache.pinot.query.runtime.operator.OpChain;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.apache.pinot.query.mailbox.channel.ChannelManager;
+import org.apache.pinot.query.mailbox.channel.GrpcMailboxServer;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * Mailbox service that handles transfer for mailbox contents.
- *
- * @param <T> type of content supported by this mailbox service.
+ * Mailbox service that handles data transfer.
  */
-public interface MailboxService<T> {
+public class MailboxService {
+  private static final Logger LOGGER = LoggerFactory.getLogger(MailboxService.class);
+  private static final int DANGLING_RECEIVING_MAILBOX_EXPIRY_SECONDS = 300;
 
-  /**
-   * Starting the mailbox service.
-   */
-  void start();
+  // We use a cache to ensure the receiving mailbox are not leaked in the cases where the corresponding OpChain is
+  // either never registered or died before the sender finished sending data.
+  private final Cache<String, ReceivingMailbox> _receivingMailboxCache =
+      CacheBuilder.newBuilder().expireAfterAccess(DANGLING_RECEIVING_MAILBOX_EXPIRY_SECONDS, TimeUnit.SECONDS)
+          .removalListener((RemovalListener<String, ReceivingMailbox>) notification -> {
+            if (notification.wasEvicted()) {
+              int numPendingBlocks = notification.getValue().getNumPendingBlocks();
+              if (numPendingBlocks > 0) {
+                LOGGER.warn("Evicting dangling receiving mailbox: {} with {} pending blocks", notification.getKey(),
+                    numPendingBlocks);
+              }
+            }
+          }).build();
 
-  /**
-   * Shutting down the mailbox service.
-   */
-  void shutdown();
+  private final String _hostname;
+  private final int _port;
+  private final PinotConfiguration _config;
+  private final Consumer<String> _receiveMailCallback;
+  private final ChannelManager _channelManager = new ChannelManager();
+
+  private GrpcMailboxServer _grpcMailboxServer;
+
+  public MailboxService(String hostname, int port, PinotConfiguration config, Consumer<String> receiveMailCallback) {
+    _hostname = hostname;
+    _port = port;
+    _config = config;

Review Comment:
   for historic reasons not introduce by this PR:
   let's get rid of `port` argument here --> this should be nested inside _config and it is used by _config inside GrpcMailboxServer anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10681: [multistage]re-implement mailbox

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10681:
URL: https://github.com/apache/pinot/pull/10681#discussion_r1177051695


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java:
##########
@@ -18,55 +18,118 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 import javax.annotation.Nullable;
-import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
-import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * Mailbox that's used to receive data. Ownership of the ReceivingMailbox is with the MailboxService, which is unlike
- * the {@link SendingMailbox} whose ownership lies with the {@link MailboxSendOperator}. This is because the
- * ReceivingMailbox can be initialized even before the corresponding OpChain is registered on the receiver, whereas
- * the SendingMailbox is initialized when the MailboxSendOperator is running. Also see {@link #isInitialized()}.
- *
- * @param <T> the unit of data that each {@link #receive()} call returns.
+ * the {@link SendingMailbox} whose ownership lies with the send operator. This is because the ReceivingMailbox can be
+ * initialized even before the corresponding OpChain is registered on the receiver, whereas the SendingMailbox is
+ * initialized when the send operator is running.
  */
-public interface ReceivingMailbox<T> {
+public class ReceivingMailbox {
+  public static final int DEFAULT_MAX_PENDING_BLOCKS = 5;
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ReceivingMailbox.class);
+  private static final TransferableBlock CANCELLED_ERROR_BLOCK =
+      TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException("Cancelled by receiver"));

Review Comment:
   ```suggestion
         TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException("Cancelled Stream"));
   ```
   cancelled by receiver is misleading. it doesn't actually guarantee the call is from receiver but not sender.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10681: [multistage]re-implement mailbox

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #10681:
URL: https://github.com/apache/pinot/pull/10681#discussion_r1177104234


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java:
##########
@@ -109,19 +97,18 @@ public List<MultiStageOperator> getChildOperators() {
   @Override
   public void close() {
     super.close();
-    releaseRemainingMailboxes();
+    cancelRemainingMailboxes();
   }
 
   @Override
   public void cancel(Throwable t) {
     super.cancel(t);
-    releaseRemainingMailboxes();
+    cancelRemainingMailboxes();

Review Comment:
   Currently it is used by both `close()` and `cancel(Throwable t)`. Currently seems I can remove the one under `close()`, but if we support any kind of early termination in the future, `cancel` won't always be called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10681: [multistage]re-implement mailbox

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10681:
URL: https://github.com/apache/pinot/pull/10681#discussion_r1177044022


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -18,59 +18,49 @@
  */
 package org.apache.pinot.query.mailbox;
 
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-import org.apache.pinot.query.mailbox.channel.InMemoryTransferStream;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
-public class InMemorySendingMailbox implements SendingMailbox<TransferableBlock> {
-  private final MailboxIdentifier _mailboxId;
-  private final Supplier<InMemoryTransferStream> _transferStreamProvider;
-  private final Consumer<MailboxIdentifier> _gotMailCallback;
+public class InMemorySendingMailbox implements SendingMailbox {
+  private static final Logger LOGGER = LoggerFactory.getLogger(GrpcSendingMailbox.class);
 
-  private InMemoryTransferStream _transferStream;
+  private final String _id;
+  private final MailboxService _mailboxService;
+  private final long _deadlineMs;
 
-  public InMemorySendingMailbox(MailboxIdentifier mailboxId, Supplier<InMemoryTransferStream> transferStreamProvider,
-      Consumer<MailboxIdentifier> gotMailCallback) {
-    _mailboxId = mailboxId;
-    _transferStreamProvider = transferStreamProvider;
-    _gotMailCallback = gotMailCallback;
-  }
+  private ReceivingMailbox _receivingMailbox;
 
-  @Override
-  public void send(TransferableBlock data)
-      throws Exception {
-    if (!isInitialized()) {
-      initialize();
-    }
-    _transferStream.send(data);
-    _gotMailCallback.accept(_mailboxId);
+  public InMemorySendingMailbox(String id, MailboxService mailboxService, long deadlineMs) {
+    _id = id;
+    _mailboxService = mailboxService;
+    _deadlineMs = deadlineMs;
   }
 
   @Override
-  public void complete()
-      throws Exception {
-    _transferStream.complete();
-    _gotMailCallback.accept(_mailboxId);
+  public void send(TransferableBlock block) {
+    if (_receivingMailbox == null) {
+      _receivingMailbox = _mailboxService.getReceivingMailbox(_id);
+    }
+    long timeoutMs = _deadlineMs - System.currentTimeMillis();
+    if (!_receivingMailbox.offer(block, timeoutMs)) {
+      throw new RuntimeException(String.format("Failed to offer block into mailbox: %s within: %dms", _id, timeoutMs));

Review Comment:
   shouldn't we call cancel with error here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10681: [multistage]re-implement mailbox

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #10681:
URL: https://github.com/apache/pinot/pull/10681#discussion_r1176761684


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdUtils.java:
##########
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.query.runtime.operator.OpChainId;
+
+
+// TODO: De-couple mailbox id from query information
+public class MailboxIdUtils {

Review Comment:
   We want to de-couple mailbox id from query information (mailbox id should be just a UUID), and that is why we want to change it from an interface to just a string. Currently we still rely on some information from the mailbox id to create the `OpChainId`, so I added this util class to limit the scope of the change. Once we de-couple `OpChainId` from mailbox id, we should delete this util, and only use mailbox id as an identifier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10681: [multistage]re-implement mailbox

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #10681:
URL: https://github.com/apache/pinot/pull/10681#discussion_r1177059420


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -18,59 +18,49 @@
  */
 package org.apache.pinot.query.mailbox;
 
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-import org.apache.pinot.query.mailbox.channel.InMemoryTransferStream;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
-public class InMemorySendingMailbox implements SendingMailbox<TransferableBlock> {
-  private final MailboxIdentifier _mailboxId;
-  private final Supplier<InMemoryTransferStream> _transferStreamProvider;
-  private final Consumer<MailboxIdentifier> _gotMailCallback;
+public class InMemorySendingMailbox implements SendingMailbox {
+  private static final Logger LOGGER = LoggerFactory.getLogger(GrpcSendingMailbox.class);
 
-  private InMemoryTransferStream _transferStream;
+  private final String _id;
+  private final MailboxService _mailboxService;
+  private final long _deadlineMs;
 
-  public InMemorySendingMailbox(MailboxIdentifier mailboxId, Supplier<InMemoryTransferStream> transferStreamProvider,
-      Consumer<MailboxIdentifier> gotMailCallback) {
-    _mailboxId = mailboxId;
-    _transferStreamProvider = transferStreamProvider;
-    _gotMailCallback = gotMailCallback;
-  }
+  private ReceivingMailbox _receivingMailbox;
 
-  @Override
-  public void send(TransferableBlock data)
-      throws Exception {
-    if (!isInitialized()) {
-      initialize();
-    }
-    _transferStream.send(data);
-    _gotMailCallback.accept(_mailboxId);
+  public InMemorySendingMailbox(String id, MailboxService mailboxService, long deadlineMs) {
+    _id = id;
+    _mailboxService = mailboxService;
+    _deadlineMs = deadlineMs;
   }
 
   @Override
-  public void complete()
-      throws Exception {
-    _transferStream.complete();
-    _gotMailCallback.accept(_mailboxId);
+  public void send(TransferableBlock block) {
+    if (_receivingMailbox == null) {
+      _receivingMailbox = _mailboxService.getReceivingMailbox(_id);
+    }
+    long timeoutMs = _deadlineMs - System.currentTimeMillis();
+    if (!_receivingMailbox.offer(block, timeoutMs)) {
+      throw new RuntimeException(String.format("Failed to offer block into mailbox: %s within: %dms", _id, timeoutMs));

Review Comment:
   IMO, `send()` should throw exception (similar to `GrpcSendingMailbox.send()` when receiver already completed/errored). After it throws exception, caller will call `cancel()` to release the resources



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10681: [multistage]re-implement mailbox

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #10681:
URL: https://github.com/apache/pinot/pull/10681#discussion_r1177056586


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java:
##########
@@ -18,55 +18,118 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 import javax.annotation.Nullable;
-import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
-import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * Mailbox that's used to receive data. Ownership of the ReceivingMailbox is with the MailboxService, which is unlike
- * the {@link SendingMailbox} whose ownership lies with the {@link MailboxSendOperator}. This is because the
- * ReceivingMailbox can be initialized even before the corresponding OpChain is registered on the receiver, whereas
- * the SendingMailbox is initialized when the MailboxSendOperator is running. Also see {@link #isInitialized()}.
- *
- * @param <T> the unit of data that each {@link #receive()} call returns.
+ * the {@link SendingMailbox} whose ownership lies with the send operator. This is because the ReceivingMailbox can be
+ * initialized even before the corresponding OpChain is registered on the receiver, whereas the SendingMailbox is
+ * initialized when the send operator is running.
  */
-public interface ReceivingMailbox<T> {
+public class ReceivingMailbox {
+  public static final int DEFAULT_MAX_PENDING_BLOCKS = 5;
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ReceivingMailbox.class);
+  private static final TransferableBlock CANCELLED_ERROR_BLOCK =
+      TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException("Cancelled by receiver"));

Review Comment:
   It is guaranteed to be cancelled by the receiver. This is used just as a tombstone when `cancel()` is invoked



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] jonasgeiregat commented on a diff in pull request #10681: Re-implement mailbox

Posted by "jonasgeiregat (via GitHub)" <gi...@apache.org>.
jonasgeiregat commented on code in PR #10681:
URL: https://github.com/apache/pinot/pull/10681#discussion_r1176390654


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdUtils.java:
##########
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.query.runtime.operator.OpChainId;
+
+
+// TODO: De-couple mailbox id from query information
+public class MailboxIdUtils {

Review Comment:
   It feels more Object Oriented to encapsulate this logic in a dedicated object `MailboxId` instead of putting it in Utils class.
   
   ```java
   MailboxId id = MailboxId.builder()
       .withRequestId(requestId)
       // etc ..
       .build();
   id.toOpChainId();
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10681: [multistage]re-implement mailbox

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10681:
URL: https://github.com/apache/pinot/pull/10681#discussion_r1176846754


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdUtils.java:
##########
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.query.runtime.operator.OpChainId;
+
+
+// TODO: De-couple mailbox id from query information
+public class MailboxIdUtils {

Review Comment:
   put it this way: mailboxID(str) --> opChainID(str) is currently 1:1 mapping
   this assumption doesn't hold going forward and thus we would rely on other methods to derive the opChainIDs associate (and most likely the responsibility will be shifted to which ever class that needs to derive this)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10681: [multistage]re-implement mailbox

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10681:
URL: https://github.com/apache/pinot/pull/10681#discussion_r1177039488


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -40,86 +36,67 @@
 /**
  * gRPC implementation of the {@link SendingMailbox}. The gRPC stream is created on the first call to {@link #send}.
  */
-public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
+public class GrpcSendingMailbox implements SendingMailbox {
   private static final Logger LOGGER = LoggerFactory.getLogger(GrpcSendingMailbox.class);
 
-  private final String _mailboxId;
-  private final Function<Long, StreamObserver<MailboxContent>> _mailboxContentStreamObserverSupplier;
-  private final MailboxStatusStreamObserver _statusObserver;
+  private final String _id;
+  private final ChannelManager _channelManager;
+  private final String _hostname;
+  private final int _port;
   private final long _deadlineMs;
-  private final AtomicBoolean _initialized = new AtomicBoolean(false);
+  private final MailboxStatusObserver _statusObserver = new MailboxStatusObserver();
 
-  private StreamObserver<MailboxContent> _mailboxContentStreamObserver;
-  private TransferableBlock _errorBlock;
+  private StreamObserver<MailboxContent> _contentObserver;
 
-  public GrpcSendingMailbox(MailboxIdentifier mailboxId, MailboxStatusStreamObserver statusObserver,
-      Function<Long, StreamObserver<MailboxContent>> contentStreamObserverSupplier, long deadlineMs) {
-    _mailboxId = mailboxId.toString();
-    _mailboxContentStreamObserverSupplier = contentStreamObserverSupplier;
-    _statusObserver = statusObserver;
+  public GrpcSendingMailbox(String id, ChannelManager channelManager, String hostname, int port, long deadlineMs) {
+    _id = id;
+    _channelManager = channelManager;
+    _hostname = hostname;
+    _port = port;
     _deadlineMs = deadlineMs;
-    _errorBlock = null;
   }
 
   @Override
   public void send(TransferableBlock block)
-      throws Exception {
-    if (!_initialized.get()) {
-      open();
+      throws IOException {
+    if (_contentObserver == null) {
+      _contentObserver = getContentObserver();
     }
-    Preconditions.checkState(!_statusObserver.isFinished() || _errorBlock != null,
-        "Called send when stream is already closed for mailbox=%s", _mailboxId);
-    MailboxContent data = toMailboxContent(block.getDataBlock());
-    _mailboxContentStreamObserver.onNext(data);
+    Preconditions.checkState(!_statusObserver.isFinished(), "Mailbox: %s is already closed", _id);
+    _contentObserver.onNext(toMailboxContent(block));
   }
 
   @Override
-  public void complete()
-      throws Exception {
-    // TODO: should wait for _mailboxContentStreamObserver.onNext() finish before calling onComplete().
-    _mailboxContentStreamObserver.onCompleted();
-  }
-
-  @Override
-  public boolean isInitialized() {
-    return _initialized.get();
+  public void complete() {
+    _contentObserver.onCompleted();
   }
 
   @Override
   public void cancel(Throwable t) {
-    if (_initialized.get() && !_statusObserver.isFinished()) {
-      LOGGER.warn("GrpcSendingMailbox={} cancelling stream", _mailboxId);
+    if (!_statusObserver.isFinished()) {
+      LOGGER.debug("Cancelling mailbox: {}", _id);
+      if (_contentObserver == null) {
+        _contentObserver = getContentObserver();
+      }
       try {
-        RuntimeException e = new RuntimeException("Cancelled by the sender");
-        _errorBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
-        _mailboxContentStreamObserver.onNext(toMailboxContent(_errorBlock.getDataBlock()));
-        _mailboxContentStreamObserver.onError(Status.fromThrowable(e).asRuntimeException());
-      } catch (Exception e) {
-        // TODO: We don't necessarily need to log this since this is relatively quite likely to happen. Logging this
-        //  anyways as info for now so we can see how frequently this happens.
-        LOGGER.info("Unexpected error issuing onError to MailboxContentStreamObserver: {}", e.getMessage());
+        // NOTE: DO NOT use onError() because it will terminate the stream, and receiver might not get the callback
+        _contentObserver.onNext(toMailboxContent(
+            TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException("Cancelled by sender", t))));
+        _contentObserver.onCompleted();
+      } catch (Exception ignored) {
+        // Exception can be thrown if the stream is already closed, so we simply ignore it

Review Comment:
   should we at least log it? for example if onNext is called on a contentObserver that is not open properly (we will no know until onNext is called) --> we should log the error yes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10681: [multistage]re-implement mailbox

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10681:
URL: https://github.com/apache/pinot/pull/10681#discussion_r1177074193


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java:
##########
@@ -18,55 +18,118 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 import javax.annotation.Nullable;
-import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
-import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * Mailbox that's used to receive data. Ownership of the ReceivingMailbox is with the MailboxService, which is unlike
- * the {@link SendingMailbox} whose ownership lies with the {@link MailboxSendOperator}. This is because the
- * ReceivingMailbox can be initialized even before the corresponding OpChain is registered on the receiver, whereas
- * the SendingMailbox is initialized when the MailboxSendOperator is running. Also see {@link #isInitialized()}.
- *
- * @param <T> the unit of data that each {@link #receive()} call returns.
+ * the {@link SendingMailbox} whose ownership lies with the send operator. This is because the ReceivingMailbox can be
+ * initialized even before the corresponding OpChain is registered on the receiver, whereas the SendingMailbox is
+ * initialized when the send operator is running.
  */
-public interface ReceivingMailbox<T> {
+public class ReceivingMailbox {
+  public static final int DEFAULT_MAX_PENDING_BLOCKS = 5;
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ReceivingMailbox.class);
+  private static final TransferableBlock CANCELLED_ERROR_BLOCK =
+      TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException("Cancelled by receiver"));
+
+  private final String _id;
+  private final Consumer<String> _receiveMailCallback;
+  // TODO: Make the queue size configurable
+  // TODO: Revisit if this is the correct way to apply back pressure
+  private final BlockingQueue<TransferableBlock> _blocks = new ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_BLOCKS);
+  private final AtomicReference<TransferableBlock> _errorBlock = new AtomicReference<>();
+
+  public ReceivingMailbox(String id, Consumer<String> receiveMailCallback) {
+    _id = id;
+    _receiveMailCallback = receiveMailCallback;
+  }
 
-  MailboxIdentifier getId();
+  public String getId() {
+    return _id;
+  }
 
   /**
-   * Returns a unit of data. Implementations are allowed to return null, in which case {@link MailboxReceiveOperator}
-   * will assume that this mailbox doesn't have any data to return and it will instead poll the other mailbox (if any).
+   * Offers a non-error block into the mailbox within the timeout specified, returns whether the block is successfully
+   * added. If the block is not added, an error block is added to the mailbox.
    */
-  @Nullable
-  T receive() throws Exception;
+  public boolean offer(TransferableBlock block, long timeoutMs) {
+    if (_errorBlock.get() != null) {
+      LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring the late block", _id);
+      return false;
+    }
+    if (timeoutMs < 0) {
+      LOGGER.debug("Mailbox: {} is already timed out", _id);
+      setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(
+          new TimeoutException("Timed out while offering data to mailbox: " + _id)));
+      return false;
+    }
+    try {
+      if (_blocks.offer(block, timeoutMs, TimeUnit.MILLISECONDS)) {
+        if (_errorBlock.get() == null) {
+          _receiveMailCallback.accept(_id);
+          return true;
+        } else {
+          LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring the late block", _id);
+          _blocks.clear();
+          return false;
+        }
+      } else {
+        LOGGER.debug("Failed to offer block into mailbox: {} within: {}ms", _id, timeoutMs);
+        setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(
+            new TimeoutException("Timed out while waiting for receive operator to consume data from mailbox: " + _id)));
+        return false;
+      }
+    } catch (InterruptedException e) {
+      LOGGER.error("Interrupted while offering block into mailbox: {}", _id);
+      setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(e));
+      return false;
+    }
+  }
 
   /**
-   * A ReceivingMailbox is considered initialized when it has a reference to the underlying channel used for receiving
-   * the data. The underlying channel may be a gRPC stream, in-memory queue, etc. Once a receiving mailbox is
-   * initialized, it has the ability to close the underlying channel via the {@link #cancel()} method.
+   * Sets an error block into the mailbox. No more blocks are accepted after calling this method.
    */
-  boolean isInitialized();
+  public void setErrorBlock(TransferableBlock errorBlock) {
+    if (_errorBlock.compareAndSet(null, errorBlock)) {
+      _blocks.clear();
+      _receiveMailCallback.accept(_id);
+    }
+  }
 
   /**
-   * A ReceivingMailbox is considered closed if it has sent all the data to the receiver and doesn't have any more data
-   * to send.
+   * Returns the first block from the mailbox, or {@code null} if there is no block received yet. Error block is
+   * returned if exists.
    */
-  boolean isClosed();
+  @Nullable
+  public TransferableBlock poll() {
+    TransferableBlock errorBlock = _errorBlock.get();
+    return errorBlock != null ? errorBlock : _blocks.poll();
+  }
 
   /**
-   * A ReceivingMailbox may hold a reference to the underlying channel. Usually the channel would be automatically
-   * closed once all the data has been received by the receiver, and in such cases {@link #isClosed()} returns true.
-   * However in failure scenarios the underlying channel may not be released, and the receiver can use this method to
-   * ensure the same.
-   *
-   * This API should ensure that the underlying channel is "released" if it hasn't been already. If the channel has
-   * already been released, the API shouldn't throw and instead return gracefully.
-   *
-   * <p>
-   *   This method may be called multiple times, so implementations should ensure this is idempotent.
-   * </p>
+   * Cancels the mailbox. No more blocks are accepted after calling this method. Should only be called by the receive
+   * operator to clean up the remaining blocks.
    */
-  void cancel();
+  public void cancel() {
+    LOGGER.debug("Cancelling mailbox: {}", _id);
+    if (_errorBlock.compareAndSet(null, CANCELLED_ERROR_BLOCK)) {
+      _blocks.clear();
+    }
+  }

Review Comment:
   should accept upstream throwable here
   ```suggestion
     public void cancel(Throwable t) {
       LOGGER.debug("Cancelling mailbox: {}", _id, t);
       if (_errorBlock.compareAndSet(null, TransferableBlockUtils.getErrorTransferableBlock(t))) {
         _blocks.clear();
       }
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10681: [multistage]re-implement mailbox

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #10681:
URL: https://github.com/apache/pinot/pull/10681#discussion_r1177071005


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -40,86 +36,67 @@
 /**
  * gRPC implementation of the {@link SendingMailbox}. The gRPC stream is created on the first call to {@link #send}.
  */
-public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
+public class GrpcSendingMailbox implements SendingMailbox {
   private static final Logger LOGGER = LoggerFactory.getLogger(GrpcSendingMailbox.class);
 
-  private final String _mailboxId;
-  private final Function<Long, StreamObserver<MailboxContent>> _mailboxContentStreamObserverSupplier;
-  private final MailboxStatusStreamObserver _statusObserver;
+  private final String _id;
+  private final ChannelManager _channelManager;
+  private final String _hostname;
+  private final int _port;
   private final long _deadlineMs;
-  private final AtomicBoolean _initialized = new AtomicBoolean(false);
+  private final MailboxStatusObserver _statusObserver = new MailboxStatusObserver();
 
-  private StreamObserver<MailboxContent> _mailboxContentStreamObserver;
-  private TransferableBlock _errorBlock;
+  private StreamObserver<MailboxContent> _contentObserver;
 
-  public GrpcSendingMailbox(MailboxIdentifier mailboxId, MailboxStatusStreamObserver statusObserver,
-      Function<Long, StreamObserver<MailboxContent>> contentStreamObserverSupplier, long deadlineMs) {
-    _mailboxId = mailboxId.toString();
-    _mailboxContentStreamObserverSupplier = contentStreamObserverSupplier;
-    _statusObserver = statusObserver;
+  public GrpcSendingMailbox(String id, ChannelManager channelManager, String hostname, int port, long deadlineMs) {
+    _id = id;
+    _channelManager = channelManager;
+    _hostname = hostname;
+    _port = port;
     _deadlineMs = deadlineMs;
-    _errorBlock = null;
   }
 
   @Override
   public void send(TransferableBlock block)
-      throws Exception {
-    if (!_initialized.get()) {
-      open();
+      throws IOException {
+    if (_contentObserver == null) {
+      _contentObserver = getContentObserver();
     }
-    Preconditions.checkState(!_statusObserver.isFinished() || _errorBlock != null,
-        "Called send when stream is already closed for mailbox=%s", _mailboxId);
-    MailboxContent data = toMailboxContent(block.getDataBlock());
-    _mailboxContentStreamObserver.onNext(data);
+    Preconditions.checkState(!_statusObserver.isFinished(), "Mailbox: %s is already closed", _id);
+    _contentObserver.onNext(toMailboxContent(block));
   }
 
   @Override
-  public void complete()
-      throws Exception {
-    // TODO: should wait for _mailboxContentStreamObserver.onNext() finish before calling onComplete().
-    _mailboxContentStreamObserver.onCompleted();
-  }
-
-  @Override
-  public boolean isInitialized() {
-    return _initialized.get();
+  public void complete() {
+    _contentObserver.onCompleted();
   }
 
   @Override
   public void cancel(Throwable t) {
-    if (_initialized.get() && !_statusObserver.isFinished()) {
-      LOGGER.warn("GrpcSendingMailbox={} cancelling stream", _mailboxId);
+    if (!_statusObserver.isFinished()) {
+      LOGGER.debug("Cancelling mailbox: {}", _id);
+      if (_contentObserver == null) {
+        _contentObserver = getContentObserver();
+      }
       try {
-        RuntimeException e = new RuntimeException("Cancelled by the sender");
-        _errorBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
-        _mailboxContentStreamObserver.onNext(toMailboxContent(_errorBlock.getDataBlock()));
-        _mailboxContentStreamObserver.onError(Status.fromThrowable(e).asRuntimeException());
-      } catch (Exception e) {
-        // TODO: We don't necessarily need to log this since this is relatively quite likely to happen. Logging this
-        //  anyways as info for now so we can see how frequently this happens.
-        LOGGER.info("Unexpected error issuing onError to MailboxContentStreamObserver: {}", e.getMessage());
+        // NOTE: DO NOT use onError() because it will terminate the stream, and receiver might not get the callback
+        _contentObserver.onNext(toMailboxContent(
+            TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException("Cancelled by sender", t))));
+        _contentObserver.onCompleted();
+      } catch (Exception ignored) {
+        // Exception can be thrown if the stream is already closed, so we simply ignore it

Review Comment:
   It is normal for resource cleanup (`cancel()`) to try to send message to a closed stream (e.g. `complete()` is called, but the callback from receiver hasn't arrived yet). Logging the error could potentially flood the log. We do log warnings when the receiver buffer is not properly cleaned up. I can add a debug log here so that we can see the exception when debugging an issue



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10681: [multistage]re-implement mailbox

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10681:
URL: https://github.com/apache/pinot/pull/10681#discussion_r1177066656


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -18,59 +18,49 @@
  */
 package org.apache.pinot.query.mailbox;
 
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-import org.apache.pinot.query.mailbox.channel.InMemoryTransferStream;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
-public class InMemorySendingMailbox implements SendingMailbox<TransferableBlock> {
-  private final MailboxIdentifier _mailboxId;
-  private final Supplier<InMemoryTransferStream> _transferStreamProvider;
-  private final Consumer<MailboxIdentifier> _gotMailCallback;
+public class InMemorySendingMailbox implements SendingMailbox {
+  private static final Logger LOGGER = LoggerFactory.getLogger(GrpcSendingMailbox.class);
 
-  private InMemoryTransferStream _transferStream;
+  private final String _id;
+  private final MailboxService _mailboxService;
+  private final long _deadlineMs;
 
-  public InMemorySendingMailbox(MailboxIdentifier mailboxId, Supplier<InMemoryTransferStream> transferStreamProvider,
-      Consumer<MailboxIdentifier> gotMailCallback) {
-    _mailboxId = mailboxId;
-    _transferStreamProvider = transferStreamProvider;
-    _gotMailCallback = gotMailCallback;
-  }
+  private ReceivingMailbox _receivingMailbox;
 
-  @Override
-  public void send(TransferableBlock data)
-      throws Exception {
-    if (!isInitialized()) {
-      initialize();
-    }
-    _transferStream.send(data);
-    _gotMailCallback.accept(_mailboxId);
+  public InMemorySendingMailbox(String id, MailboxService mailboxService, long deadlineMs) {
+    _id = id;
+    _mailboxService = mailboxService;
+    _deadlineMs = deadlineMs;
   }
 
   @Override
-  public void complete()
-      throws Exception {
-    _transferStream.complete();
-    _gotMailCallback.accept(_mailboxId);
+  public void send(TransferableBlock block) {
+    if (_receivingMailbox == null) {
+      _receivingMailbox = _mailboxService.getReceivingMailbox(_id);
+    }
+    long timeoutMs = _deadlineMs - System.currentTimeMillis();
+    if (!_receivingMailbox.offer(block, timeoutMs)) {
+      throw new RuntimeException(String.format("Failed to offer block into mailbox: %s within: %dms", _id, timeoutMs));

Review Comment:
   Got it. that make sense. (we should probably throw IOException instead then



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -18,59 +18,49 @@
  */
 package org.apache.pinot.query.mailbox;
 
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-import org.apache.pinot.query.mailbox.channel.InMemoryTransferStream;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
-public class InMemorySendingMailbox implements SendingMailbox<TransferableBlock> {
-  private final MailboxIdentifier _mailboxId;
-  private final Supplier<InMemoryTransferStream> _transferStreamProvider;
-  private final Consumer<MailboxIdentifier> _gotMailCallback;
+public class InMemorySendingMailbox implements SendingMailbox {
+  private static final Logger LOGGER = LoggerFactory.getLogger(GrpcSendingMailbox.class);
 
-  private InMemoryTransferStream _transferStream;
+  private final String _id;
+  private final MailboxService _mailboxService;
+  private final long _deadlineMs;
 
-  public InMemorySendingMailbox(MailboxIdentifier mailboxId, Supplier<InMemoryTransferStream> transferStreamProvider,
-      Consumer<MailboxIdentifier> gotMailCallback) {
-    _mailboxId = mailboxId;
-    _transferStreamProvider = transferStreamProvider;
-    _gotMailCallback = gotMailCallback;
-  }
+  private ReceivingMailbox _receivingMailbox;
 
-  @Override
-  public void send(TransferableBlock data)
-      throws Exception {
-    if (!isInitialized()) {
-      initialize();
-    }
-    _transferStream.send(data);
-    _gotMailCallback.accept(_mailboxId);
+  public InMemorySendingMailbox(String id, MailboxService mailboxService, long deadlineMs) {
+    _id = id;
+    _mailboxService = mailboxService;
+    _deadlineMs = deadlineMs;
   }
 
   @Override
-  public void complete()
-      throws Exception {
-    _transferStream.complete();
-    _gotMailCallback.accept(_mailboxId);
+  public void send(TransferableBlock block) {
+    if (_receivingMailbox == null) {
+      _receivingMailbox = _mailboxService.getReceivingMailbox(_id);
+    }
+    long timeoutMs = _deadlineMs - System.currentTimeMillis();
+    if (!_receivingMailbox.offer(block, timeoutMs)) {
+      throw new RuntimeException(String.format("Failed to offer block into mailbox: %s within: %dms", _id, timeoutMs));

Review Comment:
   Got it. that make sense. we should probably throw IOException instead then



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10681: [multistage]re-implement mailbox

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #10681:
URL: https://github.com/apache/pinot/pull/10681#discussion_r1177079909


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java:
##########
@@ -18,80 +18,117 @@
  */
 package org.apache.pinot.query.mailbox;
 
-import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
-import org.apache.pinot.query.runtime.operator.OpChain;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.apache.pinot.query.mailbox.channel.ChannelManager;
+import org.apache.pinot.query.mailbox.channel.GrpcMailboxServer;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * Mailbox service that handles transfer for mailbox contents.
- *
- * @param <T> type of content supported by this mailbox service.
+ * Mailbox service that handles data transfer.
  */
-public interface MailboxService<T> {
+public class MailboxService {
+  private static final Logger LOGGER = LoggerFactory.getLogger(MailboxService.class);
+  private static final int DANGLING_RECEIVING_MAILBOX_EXPIRY_SECONDS = 300;
 
-  /**
-   * Starting the mailbox service.
-   */
-  void start();
+  // We use a cache to ensure the receiving mailbox are not leaked in the cases where the corresponding OpChain is
+  // either never registered or died before the sender finished sending data.
+  private final Cache<String, ReceivingMailbox> _receivingMailboxCache =
+      CacheBuilder.newBuilder().expireAfterAccess(DANGLING_RECEIVING_MAILBOX_EXPIRY_SECONDS, TimeUnit.SECONDS)
+          .removalListener((RemovalListener<String, ReceivingMailbox>) notification -> {
+            if (notification.wasEvicted()) {
+              int numPendingBlocks = notification.getValue().getNumPendingBlocks();
+              if (numPendingBlocks > 0) {
+                LOGGER.warn("Evicting dangling receiving mailbox: {} with {} pending blocks", notification.getKey(),
+                    numPendingBlocks);
+              }
+            }
+          }).build();
 
-  /**
-   * Shutting down the mailbox service.
-   */
-  void shutdown();
+  private final String _hostname;
+  private final int _port;
+  private final PinotConfiguration _config;
+  private final Consumer<String> _receiveMailCallback;
+  private final ChannelManager _channelManager = new ChannelManager();
+
+  private GrpcMailboxServer _grpcMailboxServer;
+
+  public MailboxService(String hostname, int port, PinotConfiguration config, Consumer<String> receiveMailCallback) {
+    _hostname = hostname;
+    _port = port;
+    _config = config;

Review Comment:
   I feel it is okay to ask the caller to pass the port. The reason being the `MailboxService` will be initialized by multiple components, and there is no guarantee they use the same config key for the port (that is the reason why we cannot extract hostname from the config).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org