You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2023/02/01 18:41:49 UTC
[pinot] branch master updated: [multistage] [refactor] Introduce cancel on error API for MultiStageOperator and sending and receiving mailbox (#10170)
This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7b75ed2cb7 [multistage] [refactor] Introduce cancel on error API for MultiStageOperator and sending and receiving mailbox (#10170)
7b75ed2cb7 is described below
commit 7b75ed2cb7e8372b0f860e66fd0f191eba5026de
Author: Yao Liu <ya...@startree.ai>
AuthorDate: Wed Feb 1 10:41:42 2023 -0800
[multistage] [refactor] Introduce cancel on error API for MultiStageOperator and sending and receiving mailbox (#10170)
* cancel
* clean up
* error format
* fix todo
* all type support and fix
---
.../pinot/query/mailbox/GrpcMailboxService.java | 2 +-
.../pinot/query/mailbox/GrpcReceivingMailbox.java | 9 +++---
.../pinot/query/mailbox/GrpcSendingMailbox.java | 31 ++++++++++++------
.../query/mailbox/InMemoryReceivingMailbox.java | 4 +++
.../query/mailbox/InMemorySendingMailbox.java | 13 ++++++--
.../pinot/query/mailbox/ReceivingMailbox.java | 2 ++
.../apache/pinot/query/mailbox/SendingMailbox.java | 7 ++++
.../channel/MailboxContentStreamObserver.java | 2 --
.../channel/MailboxStatusStreamObserver.java | 37 ++++++----------------
.../runtime/operator/MailboxSendOperator.java | 2 +-
.../query/runtime/operator/MultiStageOperator.java | 14 ++++++--
.../pinot/query/runtime/operator/OpChain.java | 12 ++++++-
.../pinot/query/runtime/operator/SortOperator.java | 4 +++
13 files changed, 90 insertions(+), 49 deletions(-)
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
index 280210f22f..fc42388a11 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
@@ -98,7 +98,7 @@ public class GrpcMailboxService implements MailboxService<TransferableBlock> {
*/
public ReceivingMailbox<TransferableBlock> getReceivingMailbox(MailboxIdentifier mailboxId) {
return _receivingMailboxMap.computeIfAbsent(
- mailboxId.toString(), (mId) -> new GrpcReceivingMailbox(mId, this, _gotMailCallback));
+ mailboxId.toString(), (mId) -> new GrpcReceivingMailbox(mId, _gotMailCallback));
}
public ManagedChannel getChannel(String mailboxId) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
index ce0152f1e6..60e4b444ca 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
@@ -40,7 +40,6 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
*/
public class GrpcReceivingMailbox implements ReceivingMailbox<TransferableBlock> {
private static final long DEFAULT_MAILBOX_INIT_TIMEOUT = 100L;
- private final GrpcMailboxService _mailboxService;
private final String _mailboxId;
private Consumer<MailboxIdentifier> _gotMailCallback;
private final CountDownLatch _initializationLatch;
@@ -48,9 +47,7 @@ public class GrpcReceivingMailbox implements ReceivingMailbox<TransferableBlock>
private MailboxContentStreamObserver _contentStreamObserver;
- public GrpcReceivingMailbox(String mailboxId, GrpcMailboxService mailboxService,
- Consumer<MailboxIdentifier> gotMailCallback) {
- _mailboxService = mailboxService;
+ public GrpcReceivingMailbox(String mailboxId, Consumer<MailboxIdentifier> gotMailCallback) {
_mailboxId = mailboxId;
_gotMailCallback = gotMailCallback;
_initializationLatch = new CountDownLatch(1);
@@ -94,6 +91,10 @@ public class GrpcReceivingMailbox implements ReceivingMailbox<TransferableBlock>
return isInitialized() && _contentStreamObserver.isCompleted();
}
+ @Override
+ public void cancel(Throwable e) {
+ }
+
private boolean waitForInitialize()
throws Exception {
if (_initializationLatch.getCount() > 0) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index ca8e72cef8..1f446952a8 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -20,7 +20,10 @@ package org.apache.pinot.query.mailbox;
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pinot.common.datablock.DataBlock;
@@ -42,7 +45,9 @@ public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
private final AtomicBoolean _initialized = new AtomicBoolean(false);
private final AtomicInteger _totalMsgSent = new AtomicInteger(0);
- private MailboxStatusStreamObserver _statusStreamObserver;
+ private CountDownLatch _finishLatch = new CountDownLatch(1);
+
+ private StreamObserver<MailboxContent> _mailboxContentStreamObserver;
public GrpcSendingMailbox(String mailboxId, GrpcMailboxService mailboxService) {
_mailboxService = mailboxService;
@@ -54,13 +59,11 @@ public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
throws UnsupportedOperationException {
ManagedChannel channel = _mailboxService.getChannel(_mailboxId);
PinotMailboxGrpc.PinotMailboxStub stub = PinotMailboxGrpc.newStub(channel);
- _statusStreamObserver = new MailboxStatusStreamObserver();
- _statusStreamObserver.init(stub.open(_statusStreamObserver));
+ _mailboxContentStreamObserver = stub.open(new MailboxStatusStreamObserver(_finishLatch));
+ // TODO: Replace init call with metadata.
// send a begin-of-stream message.
- _statusStreamObserver.send(MailboxContent.newBuilder()
- .setMailboxId(_mailboxId)
- .putMetadata(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY, "true")
- .build());
+ _mailboxContentStreamObserver.onNext(MailboxContent.newBuilder().setMailboxId(_mailboxId)
+ .putMetadata(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY, "true").build());
_initialized.set(true);
}
@@ -72,13 +75,13 @@ public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
init();
}
MailboxContent data = toMailboxContent(block.getDataBlock());
- _statusStreamObserver.send(data);
+ _mailboxContentStreamObserver.onNext(data);
_totalMsgSent.incrementAndGet();
}
@Override
public void complete() {
- _statusStreamObserver.complete();
+ _mailboxContentStreamObserver.onCompleted();
}
@Override
@@ -86,6 +89,16 @@ public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
return _mailboxId;
}
+ @Override
+ public void waitForFinish(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ _finishLatch.await(timeout, unit);
+ }
+
+ @Override
+ public void cancel(Throwable t) {
+ }
+
private MailboxContent toMailboxContent(DataBlock dataBlock) {
try {
Mailbox.MailboxContent.Builder builder = Mailbox.MailboxContent.newBuilder().setMailboxId(_mailboxId)
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java
index ac75c39993..43f32c61c5 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java
@@ -63,4 +63,8 @@ public class InMemoryReceivingMailbox implements ReceivingMailbox<TransferableBl
public boolean isClosed() {
return _closed && _queue.size() == 0;
}
+
+ @Override
+ public void cancel(Throwable e) {
+ }
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
index ebde5b9c41..81528f7106 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
@@ -19,6 +19,7 @@
package org.apache.pinot.query.mailbox;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -46,8 +47,7 @@ public class InMemorySendingMailbox implements SendingMailbox<TransferableBlock>
if (!_queue.offer(data)) {
// this should never happen, since we use a LinkedBlockingQueue
// which does not have capacity bounds
- throw new IllegalStateException("Failed to insert into in-memory mailbox "
- + _mailboxId);
+ throw new IllegalStateException("Failed to insert into in-memory mailbox " + _mailboxId);
}
_gotMailCallback.accept(JsonMailboxIdentifier.parse(_mailboxId));
}
@@ -55,4 +55,13 @@ public class InMemorySendingMailbox implements SendingMailbox<TransferableBlock>
@Override
public void complete() {
}
+
+ @Override
+ public void waitForFinish(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ }
+
+ @Override
+ public void cancel(Throwable t) {
+ }
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
index ea999807d0..377430883c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
@@ -54,4 +54,6 @@ public interface ReceivingMailbox<T> {
* @return
*/
boolean isClosed();
+
+ void cancel(Throwable e);
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
index 11cee29f0a..7d7b547324 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
@@ -18,6 +18,9 @@
*/
package org.apache.pinot.query.mailbox;
+import java.util.concurrent.TimeUnit;
+
+
/**
* Mailbox is used to send and receive data.
*
@@ -46,4 +49,8 @@ public interface SendingMailbox<T> {
* Complete delivery of the current mailbox.
*/
void complete();
+
+ void waitForFinish(long timeout, TimeUnit unit) throws InterruptedException;
+
+ void cancel(Throwable t);
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
index 4a35c8d660..f53a584c51 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
@@ -57,7 +57,6 @@ public class MailboxContentStreamObserver implements StreamObserver<Mailbox.Mail
.putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true").build();
}
- private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5;
private final GrpcMailboxService _mailboxService;
private final StreamObserver<Mailbox.MailboxStatus> _responseObserver;
private final boolean _isEnabledFeedback;
@@ -183,7 +182,6 @@ public class MailboxContentStreamObserver implements StreamObserver<Mailbox.Mail
_errorLock.writeLock().lock();
_errorContent = createErrorContent(e);
_gotMailCallback.accept(_mailboxId);
- // TODO: close the stream.
throw new RuntimeException(e);
} catch (IOException ioe) {
throw new RuntimeException("Unable to encode exception for cascade reporting: " + e, ioe);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
index b758f0793d..fd7443db12 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
@@ -19,7 +19,7 @@
package org.apache.pinot.query.mailbox.channel;
import io.grpc.stub.StreamObserver;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pinot.common.proto.Mailbox;
import org.slf4j.Logger;
@@ -30,31 +30,18 @@ import org.slf4j.LoggerFactory;
* {@code MailboxStatusStreamObserver} is used by the SendingMailbox to send data over the wire.
*
* <p>Once {@link org.apache.pinot.query.mailbox.GrpcSendingMailbox#init()} is called, one instances of this class is
- * created based on the opened GRPC connection returned {@link StreamObserver}. From this point, the sending mailbox
- * can use the {@link MailboxStatusStreamObserver#send(Mailbox.MailboxContent)} API to send data packet to the receiving
+ * created based on the opened GRPC connection returned {@link StreamObserver}.
* end.
*/
public class MailboxStatusStreamObserver implements StreamObserver<Mailbox.MailboxStatus> {
private static final Logger LOGGER = LoggerFactory.getLogger(MailboxStatusStreamObserver.class);
private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5;
private final AtomicInteger _bufferSize = new AtomicInteger(5);
- private final AtomicBoolean _isCompleted = new AtomicBoolean(false);
- private StreamObserver<Mailbox.MailboxContent> _mailboxContentStreamObserver;
+ private CountDownLatch _finishLatch;
- public MailboxStatusStreamObserver() {
- }
-
- public void init(StreamObserver<Mailbox.MailboxContent> mailboxContentStreamObserver) {
- _mailboxContentStreamObserver = mailboxContentStreamObserver;
- }
-
- public void send(Mailbox.MailboxContent mailboxContent) {
- _mailboxContentStreamObserver.onNext(mailboxContent);
- }
-
- public void complete() {
- _mailboxContentStreamObserver.onCompleted();
+ public MailboxStatusStreamObserver(CountDownLatch finishLatch) {
+ _finishLatch = finishLatch;
}
@Override
@@ -63,8 +50,8 @@ public class MailboxStatusStreamObserver implements StreamObserver<Mailbox.Mailb
// so we can make better throughput send judgement. here is a simple example.
// TODO: this feedback info is not used to throttle the send speed. it is currently being discarded.
if (mailboxStatus.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY)) {
- _bufferSize.set(Integer.parseInt(
- mailboxStatus.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY)));
+ _bufferSize.set(
+ Integer.parseInt(mailboxStatus.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY)));
} else {
_bufferSize.set(DEFAULT_MAILBOX_QUEUE_CAPACITY); // DEFAULT_AVAILABILITY;
}
@@ -72,17 +59,13 @@ public class MailboxStatusStreamObserver implements StreamObserver<Mailbox.Mailb
@Override
public void onError(Throwable e) {
- _isCompleted.set(true);
- shutdown();
+ _finishLatch.countDown();
+ LOGGER.error("Receiving error msg from grpc mailbox status stream:", e);
throw new RuntimeException(e);
}
- private void shutdown() {
- }
-
@Override
public void onCompleted() {
- _isCompleted.set(true);
- shutdown();
+ _finishLatch.countDown();
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index e37fa15133..177926e0f3 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -118,7 +118,7 @@ public class MailboxSendOperator extends MultiStageOperator {
@Override
public List<MultiStageOperator> getChildOperators() {
- return ImmutableList.of();
+ return ImmutableList.of(_dataTableBlockBaseOperator);
}
@Nullable
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index 9c82d1cb2a..37e954e6ae 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -27,7 +27,6 @@ import org.slf4j.LoggerFactory;
public abstract class MultiStageOperator extends BaseOperator<TransferableBlock> implements AutoCloseable {
private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MultiStageOperator.class);
- // TODO: use the API public List<? extends Operator> getChildOperators() to merge two APIs.
@Override
public List<MultiStageOperator> getChildOperators() {
throw new UnsupportedOperationException();
@@ -41,9 +40,20 @@ public abstract class MultiStageOperator extends BaseOperator<TransferableBlock>
try {
op.close();
} catch (Exception e) {
- LOGGER.error("Failed to close operator:" + op);
+ LOGGER.error("Failed to close operator: " + op + " with exception:" + e);
// Continue processing because even one operator failed to be close, we should still close the rest.
}
}
}
+
+ public void cancel(Throwable e) {
+ for (MultiStageOperator op : getChildOperators()) {
+ try {
+ op.cancel(e);
+ } catch (Exception e2) {
+ LOGGER.error("Failed to cancel operator:" + op + "with error:" + e + " with exception:" + e2);
+ // Continue processing because even one operator failed to be cancelled, we should still cancel the rest.
+ }
+ }
+ }
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
index ae6bae362f..ddbe1939ff 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
@@ -31,7 +31,6 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock;
* by send/receive stages.
*/
public class OpChain implements AutoCloseable {
-
private final MultiStageOperator _root;
private final Set<MailboxIdentifier> _receivingMailbox;
private final OpChainStats _stats;
@@ -62,8 +61,19 @@ public class OpChain implements AutoCloseable {
return "OpChain{" + _id + "}";
}
+ /**
+ * close() is called when we finish execution successfully.
+ */
@Override
public void close() {
_root.close();
}
+
+ /**
+ * cancel() is called when execution runs into error.
+ * @param e
+ */
+ public void cancel(Throwable e) {
+ _root.cancel(e);
+ }
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 13f4306e01..1dee1e60c5 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -82,6 +82,10 @@ public class SortOperator extends MultiStageOperator {
return ImmutableList.of(_upstreamOperator);
}
+ @Override
+ public void cancel(Throwable e) {
+ }
+
@Nullable
@Override
public String toExplainString() {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org