You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/12/14 19:15:44 UTC
[pinot] branch master updated: [multistage] [bugfix] Throw error when GrpcMailbox receiving buffer is full (#9969)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f7c281f2dc [multistage] [bugfix] Throw error when GrpcMailbox receiving buffer is full (#9969)
f7c281f2dc is described below
commit f7c281f2dc5d76ed787e0aeebbc6c629c8cec50e
Author: Yao Liu <ya...@startree.ai>
AuthorDate: Wed Dec 14 11:15:36 2022 -0800
[multistage] [bugfix] Throw error when GrpcMailbox receiving buffer is full (#9969)
* error out when receiving buffer full
* fix race condition
---
.../pinot/query/mailbox/GrpcReceivingMailbox.java | 1 -
.../channel/MailboxContentStreamObserver.java | 47 +++++++++++++++++++---
.../channel/MailboxStatusStreamObserver.java | 1 -
3 files changed, 42 insertions(+), 7 deletions(-)
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
index bb6dce4b76..ce0152f1e6 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
@@ -94,7 +94,6 @@ public class GrpcReceivingMailbox implements ReceivingMailbox<TransferableBlock>
return isInitialized() && _contentStreamObserver.isCompleted();
}
- // TODO: fix busy wait. This should be guarded by timeout.
private boolean waitForInitialize()
throws Exception {
if (_initializationLatch.getCount() > 0) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
index 1b6c8a8130..238c3cf60c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
@@ -23,6 +23,8 @@ import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.query.mailbox.GrpcMailboxService;
@@ -43,6 +45,14 @@ import org.slf4j.LoggerFactory;
*/
public class MailboxContentStreamObserver implements StreamObserver<Mailbox.MailboxContent> {
private static final Logger LOGGER = LoggerFactory.getLogger(MailboxContentStreamObserver.class);
+
+ private static Mailbox.MailboxContent createErrorContent(Throwable e)
+ throws IOException {
+ return Mailbox.MailboxContent.newBuilder().setPayload(ByteString.copyFrom(
+ TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(e)).getDataBlock().toBytes()))
+ .putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true").build();
+ }
+
private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5;
private final GrpcMailboxService _mailboxService;
private final StreamObserver<Mailbox.MailboxStatus> _responseObserver;
@@ -50,6 +60,9 @@ public class MailboxContentStreamObserver implements StreamObserver<Mailbox.Mail
private final AtomicBoolean _isCompleted = new AtomicBoolean(false);
private final ArrayBlockingQueue<Mailbox.MailboxContent> _receivingBuffer;
+
+ ReadWriteLock _errorLock = new ReentrantReadWriteLock();
+ private Mailbox.MailboxContent _errorContent = null; // Guarded by _errorLock.
private StringMailboxIdentifier _mailboxId;
private Consumer<MailboxIdentifier> _gotMailCallback;
@@ -73,6 +86,14 @@ public class MailboxContentStreamObserver implements StreamObserver<Mailbox.Mail
* to indicate when to call this method.
*/
public Mailbox.MailboxContent poll() {
+ try {
+ _errorLock.readLock().lock();
+ if (_errorContent != null) {
+ return _errorContent;
+ }
+ } finally {
+ _errorLock.readLock().unlock();
+ }
if (isCompleted()) {
return null;
}
@@ -93,7 +114,22 @@ public class MailboxContentStreamObserver implements StreamObserver<Mailbox.Mail
if (!mailboxContent.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY)) {
// when the receiving end receives a message put it in the mailbox queue.
- _receivingBuffer.offer(mailboxContent);
+ // TODO: pass a timeout to _receivingBuffer.
+ if (!_receivingBuffer.offer(mailboxContent)) {
+ // TODO: close the stream.
+ RuntimeException e = new RuntimeException("Mailbox receivingBuffer is full:" + _mailboxId);
+ LOGGER.error(e.getMessage());
+ try {
+ _errorLock.writeLock().lock();
+ _errorContent = createErrorContent(e);
+ } catch (IOException ioe) {
+ e = new RuntimeException("Unable to encode exception for cascade reporting: " + e, ioe);
+ LOGGER.error(e.getMessage());
+ throw e;
+ } finally {
+ _errorLock.writeLock().unlock();
+ }
+ }
_gotMailCallback.accept(_mailboxId);
if (_isEnabledFeedback) {
@@ -116,14 +152,15 @@ public class MailboxContentStreamObserver implements StreamObserver<Mailbox.Mail
@Override
public void onError(Throwable e) {
try {
- _receivingBuffer.offer(Mailbox.MailboxContent.newBuilder()
- .setPayload(ByteString.copyFrom(
- TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(e)).getDataBlock().toBytes()))
- .putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true").build());
+ _errorLock.writeLock().lock();
+ _errorContent = createErrorContent(e);
_gotMailCallback.accept(_mailboxId);
+ // TODO: close the stream.
throw new RuntimeException(e);
} catch (IOException ioe) {
throw new RuntimeException("Unable to encode exception for cascade reporting: " + e, ioe);
+ } finally {
+ _errorLock.writeLock().unlock();
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
index e482583bd3..b758f0793d 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
@@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory;
public class MailboxStatusStreamObserver implements StreamObserver<Mailbox.MailboxStatus> {
private static final Logger LOGGER = LoggerFactory.getLogger(MailboxStatusStreamObserver.class);
private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5;
- private static final long DEFAULT_MAILBOX_POLL_TIMEOUT_MS = 1000L;
private final AtomicInteger _bufferSize = new AtomicInteger(5);
private final AtomicBoolean _isCompleted = new AtomicBoolean(false);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org