You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "ankitsultana (via GitHub)" <gi...@apache.org> on 2023/02/23 09:15:32 UTC

[GitHub] [pinot] ankitsultana opened a new pull request, #10322: [wip] [Do Not Review] [multistage] Fix Leaks in Mailbox

ankitsultana opened a new pull request, #10322:
URL: https://github.com/apache/pinot/pull/10322

   WIP


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on a diff in pull request #10322: [Draft] [multistage] Fix Leaks in Mailbox

Posted by "ankitsultana (via GitHub)" <gi...@apache.org>.
ankitsultana commented on code in PR #10322:
URL: https://github.com/apache/pinot/pull/10322#discussion_r1128201146


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java:
##########
@@ -93,5 +100,25 @@ protected void sendBlock(SendingMailbox<TransferableBlock> sendingMailbox, Trans
     }
   }
 
-  protected abstract void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block);
+  protected abstract void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block)
+      throws Exception;
+
+  // Called when the OpChain gracefully returns.
+  // TODO: This is a no-op right now.
+  public void close() {
+    for (SendingMailbox sendingMailbox : _sendingMailboxes) {
+      if (sendingMailbox.isInitialized() && !sendingMailbox.isClosed()) {
+        LOGGER.info("SendingMailbox={} was not closed presumably because receiver hasn't completed processing",
+            sendingMailbox.getMailboxId());
+      }
+    }
+  }
+
+  public void cancel(Throwable t) {
+    for (SendingMailbox sendingMailbox : _sendingMailboxes) {
+      if (sendingMailbox.isInitialized() && !sendingMailbox.isClosed()) {
+        sendingMailbox.cancel(t);
+      }
+    }
+  }

Review Comment:
   There's no release for that since the ownership is not with `MailboxService`. By design the sending/receiving mailbox are asymmetric.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on pull request #10322: [wip] [Do Not Review] [multistage] Fix Leaks in Mailbox

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on PR #10322:
URL: https://github.com/apache/pinot/pull/10322#issuecomment-1447537459

   great work cleaning up the stability issues. thank you @ankitsultana . please comment when this PR is ready for final review. I will take a look and comment as we go as well. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10322: [Draft] [multistage] Fix Leaks in Mailbox

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10322:
URL: https://github.com/apache/pinot/pull/10322#discussion_r1126714732


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java:
##########
@@ -50,21 +55,51 @@
   int getMailboxPort();
 
   /**
-   * Look up a receiving mailbox by {@link MailboxIdentifier}.
-   *
-   * <p>the acquired {@link ReceivingMailbox} will be constructed if not exist already, but it might not have been
-   * initialized.
+   * Return a {@link ReceivingMailbox} for the given {@link MailboxIdentifier}.
    *
    * @param mailboxId mailbox identifier.
    * @return a receiving mailbox.
    */
   ReceivingMailbox<T> getReceivingMailbox(MailboxIdentifier mailboxId);
 
   /**
-   * Look up a sending mailbox by {@link MailboxIdentifier}.
+   * Same as {@link #getReceivingMailbox} but this would return null if the mailbox isn't already created.
+   */
+  @Nullable
+  ReceivingMailbox<T> getReceivingMailboxIfPresent(MailboxIdentifier mailboxId);
+
+  /**
+   * Return a sending-mailbox for the given {@link MailboxIdentifier}. The returned {@link SendingMailbox} is
+   * uninitialized, i.e. it will not open the underlying channel or acquire any additional resources. Instead the
+   * {@link SendingMailbox} will initialize lazily when the data is sent for the first time through it.
    *
    * @param mailboxId mailbox identifier.
+   * @param deadlineMs deadline in milliseconds, which is usually the same as the query deadline.
    * @return a sending mailbox.
    */
-  SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId);
+  SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId, long deadlineMs);
+
+  /**
+   * A {@link ReceivingMailbox} for a given {@link OpChain} may be created before the OpChain is even registered.
+   * Reason being that the sender starts sending data, and the receiver starts receiving the same without waiting for
+   * the OpChain to be registered. The ownership for the ReceivingMailbox hence lies with the MailboxService and not
+   * the OpChain. There are two ways in which a MailboxService may release its references to a ReceivingMailbox and
+   * the underlying resources:
+   *
+   * <ol>
+   *   <li>
+   *     If the OpChain corresponding to a ReceivingMailbox was closed or cancelled. In that case,
+   *     {@link MailboxReceiveOperator} will call this method as part of its close/cancel call. This is the main
+   *     reason why this method exists.
+   *   </li>
+   *   <li>
+   *     There can be cases where the corresponding OpChain was never registered with the scheduler. In that case, it
+   *     is up to the {@link MailboxService} to ensure that there are no leaks of resources. E.g. it could setup a
+   *     periodic job to detect such mailbox and do any clean-up. Note that for this case, it is not mandatory for
+   *     the {@link MailboxService} to use this method. It can use any internal method it needs to do the clean-up.
+   *   </li>
+   * </ol>
+   * @param mailboxId
+   */
+  void releaseReceivingMailbox(MailboxIdentifier mailboxId);

Review Comment:
   IMO.
   
   1. `MailboxService` ALWAYS owns the `MailboxContentStreamObserver` (and thus the `GRPCReceivingMailbox`)
       - Throughout the interaction of the GRPC layer, both the mailbox grpc stub (`GRPCMailboxServer.open`) and the `MailboxReceiveOperator` will access `MailboxService.getReceivingMailbox` and thus we cannot transfer the ownership of the receiving mailbox to the mailbox operator (e.g. delete it from the cache)
   2. `MailboxReceiveOperator` DOES NOT OWN the observer. thus it should ONLY call method to interact with mailbox via mailboxService. This means:
       - it should not be acquiring any Receiving mailbox object, rather it should only use the APIs to get contents out of the receiving mailbox. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on a diff in pull request #10322: [Draft] [multistage] Fix Leaks in Mailbox

Posted by "ankitsultana (via GitHub)" <gi...@apache.org>.
ankitsultana commented on code in PR #10322:
URL: https://github.com/apache/pinot/pull/10322#discussion_r1128209233


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -68,6 +68,7 @@ protected void run()
         @Override
         public void runJob() {
           boolean isFinished = false;
+          boolean returnedErrorBlock = false;

Review Comment:
   Let's keep them in the same PR since this enforces the behavior that we should call Operator#cancel when there's an error-block returned.
   
   A MailboxReceiveOperator may receive data from multiple mailbox (say 10 of them).
   
   If one of them returns an error-block we bubble up the error-block immediately, so the other mailbox may still be active. So we shouldn't simply close the Operator but rather cancel to signal an early-termination.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10322: [Draft] [multistage] Fix Leaks in Mailbox

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10322:
URL: https://github.com/apache/pinot/pull/10322#discussion_r1126672929


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java:
##########
@@ -18,45 +18,62 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import io.grpc.ManagedChannel;
-import io.grpc.stub.StreamObserver;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import org.apache.pinot.common.proto.Mailbox;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.proto.PinotMailboxGrpc;
 import org.apache.pinot.query.mailbox.channel.ChannelManager;
 import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * GRPC-based implementation of {@link MailboxService}.
+ * GRPC-based implementation of {@link MailboxService}. Note that there can be cases where the ReceivingMailbox
+ * and/or the underlying connection can be leaked:
  *
- * <p>It maintains a collection of connected mailbox servers and clients to remote hosts. All indexed by the
- * mailboxID in the format of: <code>"jobId:partitionKey:senderHost:senderPort:receiverHost:receiverPort"</code>
+ * <ol>
+ *   <li>When the OpChain corresponding to the receiver was never registered.</li>
+ *   <li>When the receiving OpChain exited before data was sent for the first time by the sender.</li>
+ * </ol>
  *
- * <p>Connections are established/initiated from the sender side and only tier-down from the sender side as well.
- * In the event of exception or timed out, the connection is cloased based on a mutually agreed upon timeout period
- * after the last successful message sent/received.
- *
- * <p>Noted that:
- * <ul>
- *   <li>the latter part of the mailboxID consist of the channelID.</li>
- *   <li>the job_id should be uniquely identifying a send/receving pair, for example if one bundle job requires
- *   to open 2 mailboxes, they should use {job_id}_1 and {job_id}_2 to distinguish the 2 different mailbox.</li>
- * </ul>
+ * To handle these cases, we store the {@link ReceivingMailbox} entries in a time-expiring cache. If there was a
+ * leak, the entry would be evicted, and in that case we also issue a cancel to ensure the underlying stream is also
+ * released.
  */
 public class GrpcMailboxService implements MailboxService<TransferableBlock> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(GrpcMailboxService.class);
   // channel manager
+  private static final Duration DANGLING_RECEIVING_MAILBOX_EXPIRY = Duration.ofMinutes(5);
   private final ChannelManager _channelManager;
   private final String _hostname;
   private final int _mailboxPort;
 
-  // maintaining a list of registered mailboxes.
-  private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>> _receivingMailboxMap =
-      new ConcurrentHashMap<>();
+  // We use a cache to ensure that the receiving mailbox and the underlying gRPC stream are not leaked in the cases
+  // where the corresponding OpChain is either never registered or died before the sender sent data for the first time.
+  private final Cache<String, GrpcReceivingMailbox> _receivingMailboxCache =
+      CacheBuilder.newBuilder().expireAfterAccess(DANGLING_RECEIVING_MAILBOX_EXPIRY.toMinutes(), TimeUnit.MINUTES)
+          .removalListener(new RemovalListener<String, GrpcReceivingMailbox>() {
+            @Override
+            public void onRemoval(RemovalNotification<String, GrpcReceivingMailbox> notification) {
+              if (notification.wasEvicted()) {
+                // TODO: This should be tied with query deadline, but for that we need to know the query deadline
+                //  when the GrpcReceivingMailbox is initialized in MailboxContentStreamObserver.
+                LOGGER.info("Removing dangling GrpcReceivingMailbox: {}", notification.getKey());
+                notification.getValue().cancel();

Review Comment:
   cancel needs to be idempotent. there's a chance it got called multiple times. 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java:
##########
@@ -18,45 +18,62 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import io.grpc.ManagedChannel;
-import io.grpc.stub.StreamObserver;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import org.apache.pinot.common.proto.Mailbox;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.proto.PinotMailboxGrpc;
 import org.apache.pinot.query.mailbox.channel.ChannelManager;
 import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * GRPC-based implementation of {@link MailboxService}.
+ * GRPC-based implementation of {@link MailboxService}. Note that there can be cases where the ReceivingMailbox
+ * and/or the underlying connection can be leaked:
  *
- * <p>It maintains a collection of connected mailbox servers and clients to remote hosts. All indexed by the
- * mailboxID in the format of: <code>"jobId:partitionKey:senderHost:senderPort:receiverHost:receiverPort"</code>
+ * <ol>
+ *   <li>When the OpChain corresponding to the receiver was never registered.</li>
+ *   <li>When the receiving OpChain exited before data was sent for the first time by the sender.</li>
+ * </ol>
  *
- * <p>Connections are established/initiated from the sender side and only tier-down from the sender side as well.
- * In the event of exception or timed out, the connection is cloased based on a mutually agreed upon timeout period
- * after the last successful message sent/received.
- *
- * <p>Noted that:
- * <ul>
- *   <li>the latter part of the mailboxID consist of the channelID.</li>
- *   <li>the job_id should be uniquely identifying a send/receving pair, for example if one bundle job requires
- *   to open 2 mailboxes, they should use {job_id}_1 and {job_id}_2 to distinguish the 2 different mailbox.</li>
- * </ul>
+ * To handle these cases, we store the {@link ReceivingMailbox} entries in a time-expiring cache. If there was a
+ * leak, the entry would be evicted, and in that case we also issue a cancel to ensure the underlying stream is also
+ * released.
  */
 public class GrpcMailboxService implements MailboxService<TransferableBlock> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(GrpcMailboxService.class);
   // channel manager
+  private static final Duration DANGLING_RECEIVING_MAILBOX_EXPIRY = Duration.ofMinutes(5);
   private final ChannelManager _channelManager;
   private final String _hostname;
   private final int _mailboxPort;
 
-  // maintaining a list of registered mailboxes.
-  private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>> _receivingMailboxMap =
-      new ConcurrentHashMap<>();
+  // We use a cache to ensure that the receiving mailbox and the underlying gRPC stream are not leaked in the cases
+  // where the corresponding OpChain is either never registered or died before the sender sent data for the first time.
+  private final Cache<String, GrpcReceivingMailbox> _receivingMailboxCache =
+      CacheBuilder.newBuilder().expireAfterAccess(DANGLING_RECEIVING_MAILBOX_EXPIRY.toMinutes(), TimeUnit.MINUTES)
+          .removalListener(new RemovalListener<String, GrpcReceivingMailbox>() {
+            @Override
+            public void onRemoval(RemovalNotification<String, GrpcReceivingMailbox> notification) {
+              if (notification.wasEvicted()) {
+                // TODO: This should be tied with query deadline, but for that we need to know the query deadline
+                //  when the GrpcReceivingMailbox is initialized in MailboxContentStreamObserver.
+                LOGGER.info("Removing dangling GrpcReceivingMailbox: {}", notification.getKey());

Review Comment:
   log level to warning? Ideally the opChain should cancel mailboxes. gradually we should make it less depend on the cache removal.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java:
##########
@@ -50,21 +55,51 @@
   int getMailboxPort();
 
   /**
-   * Look up a receiving mailbox by {@link MailboxIdentifier}.
-   *
-   * <p>the acquired {@link ReceivingMailbox} will be constructed if not exist already, but it might not have been
-   * initialized.
+   * Return a {@link ReceivingMailbox} for the given {@link MailboxIdentifier}.
    *
    * @param mailboxId mailbox identifier.
    * @return a receiving mailbox.
    */
   ReceivingMailbox<T> getReceivingMailbox(MailboxIdentifier mailboxId);
 
   /**
-   * Look up a sending mailbox by {@link MailboxIdentifier}.
+   * Same as {@link #getReceivingMailbox} but this would return null if the mailbox isn't already created.
+   */
+  @Nullable
+  ReceivingMailbox<T> getReceivingMailboxIfPresent(MailboxIdentifier mailboxId);
+
+  /**
+   * Return a sending-mailbox for the given {@link MailboxIdentifier}. The returned {@link SendingMailbox} is
+   * uninitialized, i.e. it will not open the underlying channel or acquire any additional resources. Instead the
+   * {@link SendingMailbox} will initialize lazily when the data is sent for the first time through it.
    *
    * @param mailboxId mailbox identifier.
+   * @param deadlineMs deadline in milliseconds, which is usually the same as the query deadline.
    * @return a sending mailbox.
    */
-  SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId);
+  SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId, long deadlineMs);
+
+  /**
+   * A {@link ReceivingMailbox} for a given {@link OpChain} may be created before the OpChain is even registered.
+   * Reason being that the sender starts sending data, and the receiver starts receiving the same without waiting for
+   * the OpChain to be registered. The ownership for the ReceivingMailbox hence lies with the MailboxService and not
+   * the OpChain. There are two ways in which a MailboxService may release its references to a ReceivingMailbox and
+   * the underlying resources:
+   *
+   * <ol>
+   *   <li>
+   *     If the OpChain corresponding to a ReceivingMailbox was closed or cancelled. In that case,
+   *     {@link MailboxReceiveOperator} will call this method as part of its close/cancel call. This is the main
+   *     reason why this method exists.
+   *   </li>
+   *   <li>
+   *     There can be cases where the corresponding OpChain was never registered with the scheduler. In that case, it
+   *     is up to the {@link MailboxService} to ensure that there are no leaks of resources. E.g. it could setup a
+   *     periodic job to detect such mailbox and do any clean-up. Note that for this case, it is not mandatory for
+   *     the {@link MailboxService} to use this method. It can use any internal method it needs to do the clean-up.
+   *   </li>
+   * </ol>
+   * @param mailboxId
+   */
+  void releaseReceivingMailbox(MailboxIdentifier mailboxId);

Review Comment:
   IMO.
   
   1. `MailboxService` ALWAYS owns the `MailboxContentStreamObserver` (and thus the `GRPCReceivingMailbox`)



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java:
##########
@@ -50,21 +55,51 @@
   int getMailboxPort();
 
   /**
-   * Look up a receiving mailbox by {@link MailboxIdentifier}.
-   *
-   * <p>the acquired {@link ReceivingMailbox} will be constructed if not exist already, but it might not have been
-   * initialized.
+   * Return a {@link ReceivingMailbox} for the given {@link MailboxIdentifier}.
    *
    * @param mailboxId mailbox identifier.
    * @return a receiving mailbox.
    */
   ReceivingMailbox<T> getReceivingMailbox(MailboxIdentifier mailboxId);
 
   /**
-   * Look up a sending mailbox by {@link MailboxIdentifier}.
+   * Same as {@link #getReceivingMailbox} but this would return null if the mailbox isn't already created.
+   */
+  @Nullable
+  ReceivingMailbox<T> getReceivingMailboxIfPresent(MailboxIdentifier mailboxId);

Review Comment:
   This API is not needed. make it private and annotate the above method as Nullable.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java:
##########
@@ -50,21 +55,51 @@
   int getMailboxPort();
 
   /**
-   * Look up a receiving mailbox by {@link MailboxIdentifier}.
-   *
-   * <p>the acquired {@link ReceivingMailbox} will be constructed if not exist already, but it might not have been
-   * initialized.
+   * Return a {@link ReceivingMailbox} for the given {@link MailboxIdentifier}.
    *
    * @param mailboxId mailbox identifier.
    * @return a receiving mailbox.
    */
   ReceivingMailbox<T> getReceivingMailbox(MailboxIdentifier mailboxId);
 
   /**
-   * Look up a sending mailbox by {@link MailboxIdentifier}.
+   * Same as {@link #getReceivingMailbox} but this would return null if the mailbox isn't already created.
+   */
+  @Nullable
+  ReceivingMailbox<T> getReceivingMailboxIfPresent(MailboxIdentifier mailboxId);
+
+  /**
+   * Return a sending-mailbox for the given {@link MailboxIdentifier}. The returned {@link SendingMailbox} is
+   * uninitialized, i.e. it will not open the underlying channel or acquire any additional resources. Instead the
+   * {@link SendingMailbox} will initialize lazily when the data is sent for the first time through it.
    *
    * @param mailboxId mailbox identifier.
+   * @param deadlineMs deadline in milliseconds, which is usually the same as the query deadline.
    * @return a sending mailbox.
    */
-  SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId);
+  SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId, long deadlineMs);
+
+  /**
+   * A {@link ReceivingMailbox} for a given {@link OpChain} may be created before the OpChain is even registered.
+   * Reason being that the sender starts sending data, and the receiver starts receiving the same without waiting for
+   * the OpChain to be registered. The ownership for the ReceivingMailbox hence lies with the MailboxService and not
+   * the OpChain. There are two ways in which a MailboxService may release its references to a ReceivingMailbox and
+   * the underlying resources:
+   *
+   * <ol>
+   *   <li>
+   *     If the OpChain corresponding to a ReceivingMailbox was closed or cancelled. In that case,
+   *     {@link MailboxReceiveOperator} will call this method as part of its close/cancel call. This is the main
+   *     reason why this method exists.
+   *   </li>
+   *   <li>
+   *     There can be cases where the corresponding OpChain was never registered with the scheduler. In that case, it
+   *     is up to the {@link MailboxService} to ensure that there are no leaks of resources. E.g. it could setup a
+   *     periodic job to detect such mailbox and do any clean-up. Note that for this case, it is not mandatory for
+   *     the {@link MailboxService} to use this method. It can use any internal method it needs to do the clean-up.
+   *   </li>
+   * </ol>
+   * @param mailboxId
+   */
+  void releaseReceivingMailbox(MailboxIdentifier mailboxId);

Review Comment:
   i think releaseReceivingMailbox should only invalidate the mailbox cached inside the MailboxService. 
   
   MailboxOperator will have 
   1. info regarding the actual mailbox
   2. the mailbox service that caches the mailboxID -> mailbox object mapping entry
   
   we need to make the ownership relation a bit more clear. to me with the current model, cancel/close needs to be idempotent and states needs to be kept and even though it is only done 1 once per mailbox. each call into these state transition will have to be guarded by locks and it is not ideal. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on a diff in pull request #10322: [Draft] [multistage] Fix Leaks in Mailbox

Posted by "ankitsultana (via GitHub)" <gi...@apache.org>.
ankitsultana commented on code in PR #10322:
URL: https://github.com/apache/pinot/pull/10322#discussion_r1128195721


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java:
##########
@@ -49,12 +48,20 @@
  */
 public class MailboxContentStreamObserver implements StreamObserver<Mailbox.MailboxContent> {
   private static final Logger LOGGER = LoggerFactory.getLogger(MailboxContentStreamObserver.class);
+  private static final int DEFAULT_MAX_PENDING_MAILBOX_CONTENT = 5;
+  private static final long DEFAULT_QUEUE_POLL_TIMEOUT_MS = 120_000;

Review Comment:
   This is the timeout for the queue offer and not poll (it should be renamed). The receive operator will return null if there's no data in the receivingBuffer.
   
   This should ideally be tied to the query deadline but we don't have it in MailboxContentStreamObserver right now. At present I kept it to a reasonably large number. We can consider either increasing or decreasing this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on a diff in pull request #10322: [Draft] [multistage] Fix Leaks in Mailbox

Posted by "ankitsultana (via GitHub)" <gi...@apache.org>.
ankitsultana commented on code in PR #10322:
URL: https://github.com/apache/pinot/pull/10322#discussion_r1128162136


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java:
##########
@@ -18,45 +18,62 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import io.grpc.ManagedChannel;
-import io.grpc.stub.StreamObserver;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import org.apache.pinot.common.proto.Mailbox;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.proto.PinotMailboxGrpc;
 import org.apache.pinot.query.mailbox.channel.ChannelManager;
 import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * GRPC-based implementation of {@link MailboxService}.
+ * GRPC-based implementation of {@link MailboxService}. Note that there can be cases where the ReceivingMailbox
+ * and/or the underlying connection can be leaked:
  *
- * <p>It maintains a collection of connected mailbox servers and clients to remote hosts. All indexed by the
- * mailboxID in the format of: <code>"jobId:partitionKey:senderHost:senderPort:receiverHost:receiverPort"</code>
+ * <ol>
+ *   <li>When the OpChain corresponding to the receiver was never registered.</li>
+ *   <li>When the receiving OpChain exited before data was sent for the first time by the sender.</li>
+ * </ol>
  *
- * <p>Connections are established/initiated from the sender side and only tier-down from the sender side as well.
- * In the event of exception or timed out, the connection is cloased based on a mutually agreed upon timeout period
- * after the last successful message sent/received.
- *
- * <p>Noted that:
- * <ul>
- *   <li>the latter part of the mailboxID consist of the channelID.</li>
- *   <li>the job_id should be uniquely identifying a send/receving pair, for example if one bundle job requires
- *   to open 2 mailboxes, they should use {job_id}_1 and {job_id}_2 to distinguish the 2 different mailbox.</li>
- * </ul>
+ * To handle these cases, we store the {@link ReceivingMailbox} entries in a time-expiring cache. If there was a
+ * leak, the entry would be evicted, and in that case we also issue a cancel to ensure the underlying stream is also
+ * released.
  */
 public class GrpcMailboxService implements MailboxService<TransferableBlock> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(GrpcMailboxService.class);
   // channel manager
+  private static final Duration DANGLING_RECEIVING_MAILBOX_EXPIRY = Duration.ofMinutes(5);
   private final ChannelManager _channelManager;
   private final String _hostname;
   private final int _mailboxPort;
 
-  // maintaining a list of registered mailboxes.
-  private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>> _receivingMailboxMap =
-      new ConcurrentHashMap<>();
+  // We use a cache to ensure that the receiving mailbox and the underlying gRPC stream are not leaked in the cases
+  // where the corresponding OpChain is either never registered or died before the sender sent data for the first time.
+  private final Cache<String, GrpcReceivingMailbox> _receivingMailboxCache =
+      CacheBuilder.newBuilder().expireAfterAccess(DANGLING_RECEIVING_MAILBOX_EXPIRY.toMinutes(), TimeUnit.MINUTES)
+          .removalListener(new RemovalListener<String, GrpcReceivingMailbox>() {
+            @Override
+            public void onRemoval(RemovalNotification<String, GrpcReceivingMailbox> notification) {
+              if (notification.wasEvicted()) {
+                // TODO: This should be tied with query deadline, but for that we need to know the query deadline
+                //  when the GrpcReceivingMailbox is initialized in MailboxContentStreamObserver.
+                LOGGER.info("Removing dangling GrpcReceivingMailbox: {}", notification.getKey());

Review Comment:
   Yeah we can change the log-level to warning.
   
   The cache based cleanup is only for cases where the corresponding OpChain was never registered (either the OpChain died before any data was received or the OpChain was never registered).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10322: [Draft] [multistage] Fix Leaks in Mailbox

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10322:
URL: https://github.com/apache/pinot/pull/10322#discussion_r1126819601


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java:
##########
@@ -50,21 +55,51 @@
   int getMailboxPort();
 
   /**
-   * Look up a receiving mailbox by {@link MailboxIdentifier}.
-   *
-   * <p>the acquired {@link ReceivingMailbox} will be constructed if not exist already, but it might not have been
-   * initialized.
+   * Return a {@link ReceivingMailbox} for the given {@link MailboxIdentifier}.
    *
    * @param mailboxId mailbox identifier.
    * @return a receiving mailbox.
    */
   ReceivingMailbox<T> getReceivingMailbox(MailboxIdentifier mailboxId);
 
   /**
-   * Look up a sending mailbox by {@link MailboxIdentifier}.
+   * Same as {@link #getReceivingMailbox} but this would return null if the mailbox isn't already created.
+   */
+  @Nullable
+  ReceivingMailbox<T> getReceivingMailboxIfPresent(MailboxIdentifier mailboxId);
+
+  /**
+   * Return a sending-mailbox for the given {@link MailboxIdentifier}. The returned {@link SendingMailbox} is
+   * uninitialized, i.e. it will not open the underlying channel or acquire any additional resources. Instead the
+   * {@link SendingMailbox} will initialize lazily when the data is sent for the first time through it.
    *
    * @param mailboxId mailbox identifier.
+   * @param deadlineMs deadline in milliseconds, which is usually the same as the query deadline.
    * @return a sending mailbox.
    */
-  SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId);
+  SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId, long deadlineMs);
+
+  /**
+   * A {@link ReceivingMailbox} for a given {@link OpChain} may be created before the OpChain is even registered.
+   * Reason being that the sender starts sending data, and the receiver starts receiving the same without waiting for
+   * the OpChain to be registered. The ownership for the ReceivingMailbox hence lies with the MailboxService and not
+   * the OpChain. There are two ways in which a MailboxService may release its references to a ReceivingMailbox and
+   * the underlying resources:
+   *
+   * <ol>
+   *   <li>
+   *     If the OpChain corresponding to a ReceivingMailbox was closed or cancelled. In that case,
+   *     {@link MailboxReceiveOperator} will call this method as part of its close/cancel call. This is the main
+   *     reason why this method exists.
+   *   </li>
+   *   <li>
+   *     There can be cases where the corresponding OpChain was never registered with the scheduler. In that case, it
+   *     is up to the {@link MailboxService} to ensure that there are no leaks of resources. E.g. it could setup a
+   *     periodic job to detect such mailbox and do any clean-up. Note that for this case, it is not mandatory for
+   *     the {@link MailboxService} to use this method. It can use any internal method it needs to do the clean-up.
+   *   </li>
+   * </ol>
+   * @param mailboxId
+   */
+  void releaseReceivingMailbox(MailboxIdentifier mailboxId);

Review Comment:
   the relationship looks like this
   ```
    * ----------------------------------------------------------------------------
    * (Operator Layer)
    *            MailboxSendOperator ---------> MailboxReceiveOperator
    *                   |                                  |
    *                   |                                  |
    * ------------------|----------------------------------|----------------------
    *                   |                                  |
    * (MailboxService)  |                                  |  ( WAIT ON INIT )
    *                  \_/                                \_/                   
    *           SendingMailbox                      ReceivingMailbox
    * ------------------|---------------------------------/^\---------------------
    * (Physical Layer)  |                                  |
    * (e.g. GRPC)       |                                  |  ( INITIALIZE )
    *                   |                                  |
    *                  \_/                                 |
    *           StreamObserver -------------------> StreamObserver
    * ----------------------------------------------------------------------------
   ```
   and the only place we require MailboxService to own the closable object is ReceivingMailbox. 
   
   This means
   1. we need to make sure that MailboxReceiveOperator gets the mailbox but not store it
       - possibly changing the getReceivingMailbox API to `TransferrableBlock receiveMail(MailboxID)`
   2. SendingMailbox can be change into a util-based initialization and doesn't need to go through MailboxServcie (but for the simplicity we can still keep the current way)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10322: [Draft] [multistage] Fix Leaks in Mailbox

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10322:
URL: https://github.com/apache/pinot/pull/10322#discussion_r1126952976


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -18,79 +18,104 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
+import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.proto.Mailbox.MailboxContent;
 import org.apache.pinot.query.mailbox.channel.ChannelUtils;
+import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * GRPC implementation of the {@link SendingMailbox}.
+ * gRPC implementation of the {@link SendingMailbox}. The gRPC stream is created on the first call to {@link #send}.
  */
 public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(GrpcSendingMailbox.class);
   private final String _mailboxId;
   private final AtomicBoolean _initialized = new AtomicBoolean(false);
-  private final AtomicInteger _totalMsgSent = new AtomicInteger(0);
 
-  private final CountDownLatch _finishLatch;
-  private final StreamObserver<MailboxContent> _mailboxContentStreamObserver;
+  private StreamObserver<MailboxContent> _mailboxContentStreamObserver;
+  private final Function<Long, StreamObserver<MailboxContent>> _mailboxContentStreamObserverSupplier;
+  private final MailboxStatusStreamObserver _statusObserver;
+  private final long _deadlineMs;
 
-  public GrpcSendingMailbox(String mailboxId, StreamObserver<MailboxContent> mailboxContentStreamObserver,
-      CountDownLatch latch) {
+  public GrpcSendingMailbox(String mailboxId, MailboxStatusStreamObserver statusObserver,
+      Function<Long, StreamObserver<MailboxContent>> contentStreamObserverSupplier, long deadlineMs) {
     _mailboxId = mailboxId;
-    _mailboxContentStreamObserver = mailboxContentStreamObserver;
-    _finishLatch = latch;
-    _initialized.set(false);
+    _mailboxContentStreamObserverSupplier = contentStreamObserverSupplier;
+    _statusObserver = statusObserver;
+    _deadlineMs = deadlineMs;
   }
 
   @Override
   public void send(TransferableBlock block)
-      throws UnsupportedOperationException {
+      throws Exception {
     if (!_initialized.get()) {
-      // initialization is special
       open();
     }
+    Preconditions.checkState(!_statusObserver.isFinished(),
+        "Called send when stream is already closed for mailbox=" + _mailboxId);
     MailboxContent data = toMailboxContent(block.getDataBlock());
     _mailboxContentStreamObserver.onNext(data);
-    _totalMsgSent.incrementAndGet();
   }
 
   @Override
-  public void complete() {
+  public void complete()
+      throws Exception {
     _mailboxContentStreamObserver.onCompleted();
   }
 
   @Override
-  public void open() {
-    // TODO: Get rid of init call.
-    // send a begin-of-stream message.
-    _mailboxContentStreamObserver.onNext(MailboxContent.newBuilder().setMailboxId(_mailboxId)
-        .putMetadata(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY, "true").build());
-    _initialized.set(true);
+  public boolean isInitialized() {
+    return _initialized.get();
   }
 
+  /**
+   * As required by {@link SendingMailbox#isClosed()}, we return true only if this mailbox is done sending all the
+   * data and has released all underlying resources. To check whether all resources have been released, i.e. the
+   * underlying gRPC stream has been closed, we use {@link MailboxStatusStreamObserver#isFinished()}.
+   */
   @Override
-  public String getMailboxId() {
-    return _mailboxId;
+  public boolean isClosed() {

Review Comment:
   i dont think this API is needed. 
   
   mailbox should only expose. 
   send, complete, cancel, close. whether it is closed should be an internal state of mailbox. 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java:
##########
@@ -70,29 +79,37 @@ public Consumer<MailboxIdentifier> init(MailboxContentStreamObserver streamObser
    *  2. If the received block from the sender is a data-block with 0 rows.
    * </p>
    */
+  @Nullable
   @Override
-  public TransferableBlock receive()
-      throws Exception {
+  public TransferableBlock receive() throws Exception {
     if (!waitForInitialize()) {
       return null;
     }
     MailboxContent mailboxContent = _contentStreamObserver.poll();
-    _totalMsgReceived.incrementAndGet();
     return mailboxContent == null ? null : fromMailboxContent(mailboxContent);
   }
 
   @Override
   public boolean isInitialized() {
-    return _initializationLatch.getCount() <= 0;
+    return _initializationLatch.getCount() == 0;
   }
 
   @Override
   public boolean isClosed() {
-    return isInitialized() && _contentStreamObserver.isCompleted();
+    return isInitialized() && _contentStreamObserver.hasConsumedAllData();
   }
 
   @Override
-  public void cancel(Throwable e) {
+  public void cancel() {
+    if (isInitialized()) {
+      try {
+        _statusStreamObserver.onError(Status.CANCELLED.asRuntimeException());
+      } catch (Exception e) {
+        // TODO: This can happen if the call is already closed. Consider removing this log altogether or find a way
+        //  to check if the stream is already closed.
+        LOGGER.info("Tried to cancel receiving mailbox", e);

Review Comment:
   let's search for a better way to deal with already closed issue. 
   I think i read some article regarding close/half-close conditions. but we can follow up later. 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -18,79 +18,104 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
+import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.proto.Mailbox.MailboxContent;
 import org.apache.pinot.query.mailbox.channel.ChannelUtils;
+import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * GRPC implementation of the {@link SendingMailbox}.
+ * gRPC implementation of the {@link SendingMailbox}. The gRPC stream is created on the first call to {@link #send}.
  */
 public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(GrpcSendingMailbox.class);
   private final String _mailboxId;
   private final AtomicBoolean _initialized = new AtomicBoolean(false);
-  private final AtomicInteger _totalMsgSent = new AtomicInteger(0);
 
-  private final CountDownLatch _finishLatch;
-  private final StreamObserver<MailboxContent> _mailboxContentStreamObserver;
+  private StreamObserver<MailboxContent> _mailboxContentStreamObserver;
+  private final Function<Long, StreamObserver<MailboxContent>> _mailboxContentStreamObserverSupplier;
+  private final MailboxStatusStreamObserver _statusObserver;
+  private final long _deadlineMs;
 
-  public GrpcSendingMailbox(String mailboxId, StreamObserver<MailboxContent> mailboxContentStreamObserver,
-      CountDownLatch latch) {
+  public GrpcSendingMailbox(String mailboxId, MailboxStatusStreamObserver statusObserver,
+      Function<Long, StreamObserver<MailboxContent>> contentStreamObserverSupplier, long deadlineMs) {
     _mailboxId = mailboxId;
-    _mailboxContentStreamObserver = mailboxContentStreamObserver;
-    _finishLatch = latch;
-    _initialized.set(false);
+    _mailboxContentStreamObserverSupplier = contentStreamObserverSupplier;
+    _statusObserver = statusObserver;
+    _deadlineMs = deadlineMs;
   }
 
   @Override
   public void send(TransferableBlock block)
-      throws UnsupportedOperationException {
+      throws Exception {
     if (!_initialized.get()) {
-      // initialization is special
       open();
     }
+    Preconditions.checkState(!_statusObserver.isFinished(),
+        "Called send when stream is already closed for mailbox=" + _mailboxId);
     MailboxContent data = toMailboxContent(block.getDataBlock());
     _mailboxContentStreamObserver.onNext(data);
-    _totalMsgSent.incrementAndGet();
   }
 
   @Override
-  public void complete() {
+  public void complete()
+      throws Exception {
     _mailboxContentStreamObserver.onCompleted();
   }
 
   @Override
-  public void open() {
-    // TODO: Get rid of init call.
-    // send a begin-of-stream message.
-    _mailboxContentStreamObserver.onNext(MailboxContent.newBuilder().setMailboxId(_mailboxId)
-        .putMetadata(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY, "true").build());
-    _initialized.set(true);
+  public boolean isInitialized() {
+    return _initialized.get();
   }
 
+  /**
+   * As required by {@link SendingMailbox#isClosed()}, we return true only if this mailbox is done sending all the
+   * data and has released all underlying resources. To check whether all resources have been released, i.e. the
+   * underlying gRPC stream has been closed, we use {@link MailboxStatusStreamObserver#isFinished()}.
+   */
   @Override
-  public String getMailboxId() {
-    return _mailboxId;
+  public boolean isClosed() {
+    return _initialized.get() && _statusObserver.isFinished();
   }
 
   @Override
-  public void waitForFinish(long timeout, TimeUnit unit)
-      throws InterruptedException {
-    _finishLatch.await(timeout, unit);
+  public void cancel(Throwable t) {
+    if (_initialized.get() && !_statusObserver.isFinished()) {

Review Comment:
   as you also check this here. there's no need to check isClosed()



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java:
##########
@@ -49,12 +48,20 @@
  */
 public class MailboxContentStreamObserver implements StreamObserver<Mailbox.MailboxContent> {
   private static final Logger LOGGER = LoggerFactory.getLogger(MailboxContentStreamObserver.class);
+  private static final int DEFAULT_MAX_PENDING_MAILBOX_CONTENT = 5;
+  private static final long DEFAULT_QUEUE_POLL_TIMEOUT_MS = 120_000;

Review Comment:
   what's the reason for this? and how is this determined?
   since it is controlled by the ReceiveOperator to move on to the next mailbox if the current one doesn'thave data. shouldn't we not put any wait here and simply return a null so that ReceiveOperator can decided what to do?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java:
##########
@@ -179,23 +158,38 @@ public void onNext(Mailbox.MailboxContent mailboxContent) {
 
   @Override
   public void onError(Throwable e) {
-    try {
-      _errorLock.writeLock().lock();
-      _errorContent = createErrorContent(e);
-      _gotMailCallback.accept(_mailboxId);
-      throw new RuntimeException(e);
-    } catch (IOException ioe) {
-      throw new RuntimeException("Unable to encode exception for cascade reporting: " + e, ioe);
-    } finally {
-      _errorLock.writeLock().unlock();
-      LOGGER.error("MaxBufferSize:", getMaxBufferSize(), " for mailbox:", _mailboxId);
-    }
+    _errorContent = createErrorContent(e);
+    _streamFinished = true;
+    _gotMailCallback.accept(_mailboxId);
   }
 
   @Override
   public void onCompleted() {
     _isCompleted.set(true);
+    _streamFinished = true;
     _responseObserver.onCompleted();
-    LOGGER.debug("MaxBufferSize:", getMaxBufferSize(), " for mailbox:", _mailboxId);
+  }
+
+  public boolean hasConsumedAllData() {
+    return _isCompleted.get() && _receivingBuffer.isEmpty();
+  }
+
+  public boolean hasStreamFinished() {
+    return _streamFinished;
+  }

Review Comment:
   javadoc please. 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -68,6 +68,7 @@ protected void run()
         @Override
         public void runJob() {
           boolean isFinished = false;
+          boolean returnedErrorBlock = false;

Review Comment:
   IMO this file's change can be checked in separately



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java:
##########
@@ -71,15 +74,19 @@ protected BlockExchange(List<SendingMailbox<TransferableBlock>> sendingMailboxes
     _splitter = splitter;
   }
 
-  public void send(TransferableBlock block) {
+  public void send(TransferableBlock block)
+      throws Exception {
     if (block.isEndOfStreamBlock()) {
-      _sendingMailboxes.forEach(destination -> sendBlock(destination, block));
+      for (SendingMailbox sendingMailbox : _sendingMailboxes) {

Review Comment:
   generic 
   ```suggestion
         for (SendingMailbox<TransferableBlock> sendingMailbox : _sendingMailboxes) {
   ```



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java:
##########
@@ -186,4 +186,25 @@ protected TransferableBlock getNextBlock() {
             : TransferableBlockUtils.getEndOfStreamTransferableBlock();
     return block;
   }
+
+  @Override
+  public void close() {
+    super.close();
+    for (MailboxIdentifier sendingMailbox : _sendingMailbox) {
+      _mailboxService.releaseReceivingMailbox(sendingMailbox);
+    }
+  }
+
+  @Override
+  public void cancel(Throwable t) {
+    super.cancel(t);
+    for (MailboxIdentifier sendingMailbox : _sendingMailbox) {
+      ReceivingMailbox<TransferableBlock> receivingMailbox = _mailboxService
+          .getReceivingMailboxIfPresent(sendingMailbox);
+      if (receivingMailbox != null) {
+        receivingMailbox.cancel();

Review Comment:
   dont call cancel here. instead rely on releaseReceivingMailbox to call cancel.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java:
##########
@@ -93,5 +100,25 @@ protected void sendBlock(SendingMailbox<TransferableBlock> sendingMailbox, Trans
     }
   }
 
-  protected abstract void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block);
+  protected abstract void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block)
+      throws Exception;
+
+  // Called when the OpChain gracefully returns.
+  // TODO: This is a no-op right now.
+  public void close() {
+    for (SendingMailbox sendingMailbox : _sendingMailboxes) {
+      if (sendingMailbox.isInitialized() && !sendingMailbox.isClosed()) {
+        LOGGER.info("SendingMailbox={} was not closed presumably because receiver hasn't completed processing",

Review Comment:
   with the suggested change we dont need to log here at all. 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java:
##########
@@ -93,5 +100,25 @@ protected void sendBlock(SendingMailbox<TransferableBlock> sendingMailbox, Trans
     }
   }
 
-  protected abstract void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block);
+  protected abstract void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block)
+      throws Exception;
+
+  // Called when the OpChain gracefully returns.
+  // TODO: This is a no-op right now.
+  public void close() {
+    for (SendingMailbox sendingMailbox : _sendingMailboxes) {
+      if (sendingMailbox.isInitialized() && !sendingMailbox.isClosed()) {
+        LOGGER.info("SendingMailbox={} was not closed presumably because receiver hasn't completed processing",
+            sendingMailbox.getMailboxId());
+      }
+    }
+  }
+
+  public void cancel(Throwable t) {
+    for (SendingMailbox sendingMailbox : _sendingMailboxes) {
+      if (sendingMailbox.isInitialized() && !sendingMailbox.isClosed()) {
+        sendingMailbox.cancel(t);
+      }
+    }
+  }

Review Comment:
   for symmetric point of view. we can also add a "releaseSendingMailbox" method in MailboxService, 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on a diff in pull request #10322: [Draft] [multistage] Fix Leaks in Mailbox

Posted by "ankitsultana (via GitHub)" <gi...@apache.org>.
ankitsultana commented on code in PR #10322:
URL: https://github.com/apache/pinot/pull/10322#discussion_r1125056403


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java:
##########
@@ -93,5 +100,25 @@ protected void sendBlock(SendingMailbox<TransferableBlock> sendingMailbox, Trans
     }
   }
 
-  protected abstract void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block);
+  protected abstract void route(List<SendingMailbox<TransferableBlock>> destinations, TransferableBlock block)
+      throws Exception;
+
+  // Called when the OpChain gracefully returns.
+  // TODO: This is a no-op right now.
+  public void close() {
+    for (SendingMailbox sendingMailbox : _sendingMailboxes) {
+      if (sendingMailbox.isInitialized() && !sendingMailbox.isClosed()) {
+        LOGGER.info("SendingMailbox={} was not closed presumably because receiver hasn't completed processing",

Review Comment:
   self-review: log level should be debug



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java:
##########
@@ -34,17 +34,19 @@ public class PlanRequestContext {
   protected final long _requestId;
   protected final int _stageId;
   private final long _timeoutMs;
+  private final long _deadlineMs;
   protected final VirtualServerAddress _server;
   protected final Map<Integer, StageMetadata> _metadataMap;
   protected final List<MailboxIdentifier> _receivingMailboxes = new ArrayList<>();
 
 
   public PlanRequestContext(MailboxService<TransferableBlock> mailboxService, long requestId, int stageId,
-      long timeoutMs, VirtualServerAddress server, Map<Integer, StageMetadata> metadataMap) {
+      long timeoutMs, long deadlineMs, VirtualServerAddress server, Map<Integer, StageMetadata> metadataMap) {
     _mailboxService = mailboxService;
     _requestId = requestId;
     _stageId = stageId;
     _timeoutMs = timeoutMs;

Review Comment:
   self-review: We don't need to store timeoutMs anymore. We can pass deadline directly to `MailboxReceiveOperator`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on a diff in pull request #10322: [Draft] [multistage] Fix Leaks in Mailbox

Posted by "ankitsultana (via GitHub)" <gi...@apache.org>.
ankitsultana commented on code in PR #10322:
URL: https://github.com/apache/pinot/pull/10322#discussion_r1128158528


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java:
##########
@@ -18,45 +18,62 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import io.grpc.ManagedChannel;
-import io.grpc.stub.StreamObserver;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import org.apache.pinot.common.proto.Mailbox;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.proto.PinotMailboxGrpc;
 import org.apache.pinot.query.mailbox.channel.ChannelManager;
 import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * GRPC-based implementation of {@link MailboxService}.
+ * GRPC-based implementation of {@link MailboxService}. Note that there can be cases where the ReceivingMailbox
+ * and/or the underlying connection can be leaked:
  *
- * <p>It maintains a collection of connected mailbox servers and clients to remote hosts. All indexed by the
- * mailboxID in the format of: <code>"jobId:partitionKey:senderHost:senderPort:receiverHost:receiverPort"</code>
+ * <ol>
+ *   <li>When the OpChain corresponding to the receiver was never registered.</li>
+ *   <li>When the receiving OpChain exited before data was sent for the first time by the sender.</li>
+ * </ol>
  *
- * <p>Connections are established/initiated from the sender side and only tier-down from the sender side as well.
- * In the event of exception or timed out, the connection is cloased based on a mutually agreed upon timeout period
- * after the last successful message sent/received.
- *
- * <p>Noted that:
- * <ul>
- *   <li>the latter part of the mailboxID consist of the channelID.</li>
- *   <li>the job_id should be uniquely identifying a send/receving pair, for example if one bundle job requires
- *   to open 2 mailboxes, they should use {job_id}_1 and {job_id}_2 to distinguish the 2 different mailbox.</li>
- * </ul>
+ * To handle these cases, we store the {@link ReceivingMailbox} entries in a time-expiring cache. If there was a
+ * leak, the entry would be evicted, and in that case we also issue a cancel to ensure the underlying stream is also
+ * released.
  */
 public class GrpcMailboxService implements MailboxService<TransferableBlock> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(GrpcMailboxService.class);
   // channel manager
+  private static final Duration DANGLING_RECEIVING_MAILBOX_EXPIRY = Duration.ofMinutes(5);
   private final ChannelManager _channelManager;
   private final String _hostname;
   private final int _mailboxPort;
 
-  // maintaining a list of registered mailboxes.
-  private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>> _receivingMailboxMap =
-      new ConcurrentHashMap<>();
+  // We use a cache to ensure that the receiving mailbox and the underlying gRPC stream are not leaked in the cases
+  // where the corresponding OpChain is either never registered or died before the sender sent data for the first time.
+  private final Cache<String, GrpcReceivingMailbox> _receivingMailboxCache =
+      CacheBuilder.newBuilder().expireAfterAccess(DANGLING_RECEIVING_MAILBOX_EXPIRY.toMinutes(), TimeUnit.MINUTES)
+          .removalListener(new RemovalListener<String, GrpcReceivingMailbox>() {
+            @Override
+            public void onRemoval(RemovalNotification<String, GrpcReceivingMailbox> notification) {
+              if (notification.wasEvicted()) {
+                // TODO: This should be tied with query deadline, but for that we need to know the query deadline
+                //  when the GrpcReceivingMailbox is initialized in MailboxContentStreamObserver.
+                LOGGER.info("Removing dangling GrpcReceivingMailbox: {}", notification.getKey());
+                notification.getValue().cancel();

Review Comment:
   Yeah that's required. I'll add it to the interface javadoc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on a diff in pull request #10322: [Draft] [multistage] Fix Leaks in Mailbox

Posted by "ankitsultana (via GitHub)" <gi...@apache.org>.
ankitsultana commented on code in PR #10322:
URL: https://github.com/apache/pinot/pull/10322#discussion_r1128186807


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java:
##########
@@ -70,29 +79,37 @@ public Consumer<MailboxIdentifier> init(MailboxContentStreamObserver streamObser
    *  2. If the received block from the sender is a data-block with 0 rows.
    * </p>
    */
+  @Nullable
   @Override
-  public TransferableBlock receive()
-      throws Exception {
+  public TransferableBlock receive() throws Exception {
     if (!waitForInitialize()) {
       return null;
     }
     MailboxContent mailboxContent = _contentStreamObserver.poll();
-    _totalMsgReceived.incrementAndGet();
     return mailboxContent == null ? null : fromMailboxContent(mailboxContent);
   }
 
   @Override
   public boolean isInitialized() {
-    return _initializationLatch.getCount() <= 0;
+    return _initializationLatch.getCount() == 0;
   }
 
   @Override
   public boolean isClosed() {
-    return isInitialized() && _contentStreamObserver.isCompleted();
+    return isInitialized() && _contentStreamObserver.hasConsumedAllData();
   }
 
   @Override
-  public void cancel(Throwable e) {
+  public void cancel() {
+    if (isInitialized()) {
+      try {
+        _statusStreamObserver.onError(Status.CANCELLED.asRuntimeException());
+      } catch (Exception e) {
+        // TODO: This can happen if the call is already closed. Consider removing this log altogether or find a way
+        //  to check if the stream is already closed.
+        LOGGER.info("Tried to cancel receiving mailbox", e);

Review Comment:
   This is not related to gRPC but our business logic. There can be concurrent cancellations/terminations of the stream so this can always happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr merged pull request #10322: [multistage] Fix Leaks in Mailbox

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr merged PR #10322:
URL: https://github.com/apache/pinot/pull/10322


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #10322: [Draft] [multistage] Fix Leaks in Mailbox

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #10322:
URL: https://github.com/apache/pinot/pull/10322#issuecomment-1454231367

   # [Codecov](https://codecov.io/gh/apache/pinot/pull/10322?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#10322](https://codecov.io/gh/apache/pinot/pull/10322?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8161206) into [master](https://codecov.io/gh/apache/pinot/commit/2e03df02f65d766bde040389e4e6b12ea09710fa?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (2e03df0) will **decrease** coverage by `21.51%`.
   > The diff coverage is `0.00%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #10322       +/-   ##
   =============================================
   - Coverage     35.25%   13.74%   -21.51%     
   + Complexity      254      231       -23     
   =============================================
     Files          2034     1981       -53     
     Lines        110249   107934     -2315     
     Branches      16760    16480      -280     
   =============================================
   - Hits          38865    14834    -24031     
   - Misses        68021    91918    +23897     
   + Partials       3363     1182     -2181     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `?` | |
   | unittests2 | `13.74% <0.00%> (-0.02%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/10322?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...apache/pinot/query/mailbox/GrpcMailboxService.java](https://codecov.io/gh/apache/pinot/pull/10322?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvbWFpbGJveC9HcnBjTWFpbGJveFNlcnZpY2UuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...ache/pinot/query/mailbox/GrpcReceivingMailbox.java](https://codecov.io/gh/apache/pinot/pull/10322?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvbWFpbGJveC9HcnBjUmVjZWl2aW5nTWFpbGJveC5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...apache/pinot/query/mailbox/GrpcSendingMailbox.java](https://codecov.io/gh/apache/pinot/pull/10322?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvbWFpbGJveC9HcnBjU2VuZGluZ01haWxib3guamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...he/pinot/query/mailbox/InMemoryMailboxService.java](https://codecov.io/gh/apache/pinot/pull/10322?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvbWFpbGJveC9Jbk1lbW9yeU1haWxib3hTZXJ2aWNlLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [.../pinot/query/mailbox/InMemoryReceivingMailbox.java](https://codecov.io/gh/apache/pinot/pull/10322?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvbWFpbGJveC9Jbk1lbW9yeVJlY2VpdmluZ01haWxib3guamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...he/pinot/query/mailbox/InMemorySendingMailbox.java](https://codecov.io/gh/apache/pinot/pull/10322?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvbWFpbGJveC9Jbk1lbW9yeVNlbmRpbmdNYWlsYm94LmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...inot/query/mailbox/MultiplexingMailboxService.java](https://codecov.io/gh/apache/pinot/pull/10322?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvbWFpbGJveC9NdWx0aXBsZXhpbmdNYWlsYm94U2VydmljZS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [.../query/mailbox/channel/InMemoryTransferStream.java](https://codecov.io/gh/apache/pinot/pull/10322?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvbWFpbGJveC9jaGFubmVsL0luTWVtb3J5VHJhbnNmZXJTdHJlYW0uamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [.../mailbox/channel/MailboxContentStreamObserver.java](https://codecov.io/gh/apache/pinot/pull/10322?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvbWFpbGJveC9jaGFubmVsL01haWxib3hDb250ZW50U3RyZWFtT2JzZXJ2ZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...y/mailbox/channel/MailboxStatusStreamObserver.java](https://codecov.io/gh/apache/pinot/pull/10322?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvbWFpbGJveC9jaGFubmVsL01haWxib3hTdGF0dXNTdHJlYW1PYnNlcnZlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | ... and [786 more](https://codecov.io/gh/apache/pinot/pull/10322?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10322: [Draft] [multistage] Fix Leaks in Mailbox

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10322:
URL: https://github.com/apache/pinot/pull/10322#discussion_r1126724681


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java:
##########
@@ -50,21 +55,51 @@
   int getMailboxPort();
 
   /**
-   * Look up a receiving mailbox by {@link MailboxIdentifier}.
-   *
-   * <p>the acquired {@link ReceivingMailbox} will be constructed if not exist already, but it might not have been
-   * initialized.
+   * Return a {@link ReceivingMailbox} for the given {@link MailboxIdentifier}.
    *
    * @param mailboxId mailbox identifier.
    * @return a receiving mailbox.
    */
   ReceivingMailbox<T> getReceivingMailbox(MailboxIdentifier mailboxId);
 
   /**
-   * Look up a sending mailbox by {@link MailboxIdentifier}.
+   * Same as {@link #getReceivingMailbox} but this would return null if the mailbox isn't already created.
+   */
+  @Nullable
+  ReceivingMailbox<T> getReceivingMailboxIfPresent(MailboxIdentifier mailboxId);
+
+  /**
+   * Return a sending-mailbox for the given {@link MailboxIdentifier}. The returned {@link SendingMailbox} is
+   * uninitialized, i.e. it will not open the underlying channel or acquire any additional resources. Instead the
+   * {@link SendingMailbox} will initialize lazily when the data is sent for the first time through it.
    *
    * @param mailboxId mailbox identifier.
+   * @param deadlineMs deadline in milliseconds, which is usually the same as the query deadline.
    * @return a sending mailbox.
    */
-  SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId);
+  SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId, long deadlineMs);
+
+  /**
+   * A {@link ReceivingMailbox} for a given {@link OpChain} may be created before the OpChain is even registered.
+   * Reason being that the sender starts sending data, and the receiver starts receiving the same without waiting for
+   * the OpChain to be registered. The ownership for the ReceivingMailbox hence lies with the MailboxService and not
+   * the OpChain. There are two ways in which a MailboxService may release its references to a ReceivingMailbox and
+   * the underlying resources:
+   *
+   * <ol>
+   *   <li>
+   *     If the OpChain corresponding to a ReceivingMailbox was closed or cancelled. In that case,
+   *     {@link MailboxReceiveOperator} will call this method as part of its close/cancel call. This is the main
+   *     reason why this method exists.
+   *   </li>
+   *   <li>
+   *     There can be cases where the corresponding OpChain was never registered with the scheduler. In that case, it
+   *     is up to the {@link MailboxService} to ensure that there are no leaks of resources. E.g. it could setup a
+   *     periodic job to detect such mailbox and do any clean-up. Note that for this case, it is not mandatory for
+   *     the {@link MailboxService} to use this method. It can use any internal method it needs to do the clean-up.
+   *   </li>
+   * </ol>
+   * @param mailboxId
+   */
+  void releaseReceivingMailbox(MailboxIdentifier mailboxId);

Review Comment:
   on the contrary, sending side, mailboxservice NEVER owns the GRPCSendingMailbox, b/c it is always constructed and initialized by the `MailboxSendObserver`, so mailbox send observer holds the reference to the StatusStreamObserver and thus responsible for calling close / cancel on it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org