You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/12/14 19:15:44 UTC

[pinot] branch master updated: [multistage] [bugfix] Throw error when GrpcMailbox receiving buffer is full (#9969)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f7c281f2dc [multistage] [bugfix] Throw error when GrpcMailbox receiving buffer is full (#9969)
f7c281f2dc is described below

commit f7c281f2dc5d76ed787e0aeebbc6c629c8cec50e
Author: Yao Liu <ya...@startree.ai>
AuthorDate: Wed Dec 14 11:15:36 2022 -0800

    [multistage] [bugfix] Throw error when GrpcMailbox receiving buffer is full (#9969)
    
    * error out when receiving buffer full
    * fix race condition
---
 .../pinot/query/mailbox/GrpcReceivingMailbox.java  |  1 -
 .../channel/MailboxContentStreamObserver.java      | 47 +++++++++++++++++++---
 .../channel/MailboxStatusStreamObserver.java       |  1 -
 3 files changed, 42 insertions(+), 7 deletions(-)

diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
index bb6dce4b76..ce0152f1e6 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
@@ -94,7 +94,6 @@ public class GrpcReceivingMailbox implements ReceivingMailbox<TransferableBlock>
     return isInitialized() && _contentStreamObserver.isCompleted();
   }
 
-  // TODO: fix busy wait. This should be guarded by timeout.
   private boolean waitForInitialize()
       throws Exception {
     if (_initializationLatch.getCount() > 0) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
index 1b6c8a8130..238c3cf60c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
@@ -23,6 +23,8 @@ import io.grpc.stub.StreamObserver;
 import java.io.IOException;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.query.mailbox.GrpcMailboxService;
@@ -43,6 +45,14 @@ import org.slf4j.LoggerFactory;
  */
 public class MailboxContentStreamObserver implements StreamObserver<Mailbox.MailboxContent> {
   private static final Logger LOGGER = LoggerFactory.getLogger(MailboxContentStreamObserver.class);
+
+  private static Mailbox.MailboxContent createErrorContent(Throwable e)
+      throws IOException {
+    return Mailbox.MailboxContent.newBuilder().setPayload(ByteString.copyFrom(
+            TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(e)).getDataBlock().toBytes()))
+        .putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true").build();
+  }
+
   private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5;
   private final GrpcMailboxService _mailboxService;
   private final StreamObserver<Mailbox.MailboxStatus> _responseObserver;
@@ -50,6 +60,9 @@ public class MailboxContentStreamObserver implements StreamObserver<Mailbox.Mail
 
   private final AtomicBoolean _isCompleted = new AtomicBoolean(false);
   private final ArrayBlockingQueue<Mailbox.MailboxContent> _receivingBuffer;
+
+  ReadWriteLock _errorLock = new ReentrantReadWriteLock();
+  private Mailbox.MailboxContent _errorContent = null; // Guarded by _errorLock.
   private StringMailboxIdentifier _mailboxId;
   private Consumer<MailboxIdentifier> _gotMailCallback;
 
@@ -73,6 +86,14 @@ public class MailboxContentStreamObserver implements StreamObserver<Mailbox.Mail
    * to indicate when to call this method.
    */
   public Mailbox.MailboxContent poll() {
+    try {
+      _errorLock.readLock().lock();
+      if (_errorContent != null) {
+        return _errorContent;
+      }
+    } finally {
+      _errorLock.readLock().unlock();
+    }
     if (isCompleted()) {
       return null;
     }
@@ -93,7 +114,22 @@ public class MailboxContentStreamObserver implements StreamObserver<Mailbox.Mail
 
     if (!mailboxContent.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY)) {
       // when the receiving end receives a message put it in the mailbox queue.
-      _receivingBuffer.offer(mailboxContent);
+      // TODO: pass a timeout to _receivingBuffer.
+      if (!_receivingBuffer.offer(mailboxContent)) {
+        // TODO: close the stream.
+        RuntimeException e = new RuntimeException("Mailbox receivingBuffer is full:" + _mailboxId);
+        LOGGER.error(e.getMessage());
+        try {
+          _errorLock.writeLock().lock();
+          _errorContent = createErrorContent(e);
+        } catch (IOException ioe) {
+          e = new RuntimeException("Unable to encode exception for cascade reporting: " + e, ioe);
+          LOGGER.error(e.getMessage());
+          throw e;
+        } finally {
+          _errorLock.writeLock().unlock();
+        }
+      }
       _gotMailCallback.accept(_mailboxId);
 
       if (_isEnabledFeedback) {
@@ -116,14 +152,15 @@ public class MailboxContentStreamObserver implements StreamObserver<Mailbox.Mail
   @Override
   public void onError(Throwable e) {
     try {
-      _receivingBuffer.offer(Mailbox.MailboxContent.newBuilder()
-          .setPayload(ByteString.copyFrom(
-              TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(e)).getDataBlock().toBytes()))
-          .putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true").build());
+      _errorLock.writeLock().lock();
+      _errorContent = createErrorContent(e);
       _gotMailCallback.accept(_mailboxId);
+      // TODO: close the stream.
       throw new RuntimeException(e);
     } catch (IOException ioe) {
       throw new RuntimeException("Unable to encode exception for cascade reporting: " + e, ioe);
+    } finally {
+      _errorLock.writeLock().unlock();
     }
   }
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
index e482583bd3..b758f0793d 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
@@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory;
 public class MailboxStatusStreamObserver implements StreamObserver<Mailbox.MailboxStatus> {
   private static final Logger LOGGER = LoggerFactory.getLogger(MailboxStatusStreamObserver.class);
   private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5;
-  private static final long DEFAULT_MAILBOX_POLL_TIMEOUT_MS = 1000L;
   private final AtomicInteger _bufferSize = new AtomicInteger(5);
   private final AtomicBoolean _isCompleted = new AtomicBoolean(false);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org