You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2023/02/01 18:41:49 UTC

[pinot] branch master updated: [multistage] [refactor] Introduce cancel on error API for MultiStageOperator and sending and receiving mailbox (#10170)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b75ed2cb7 [multistage] [refactor] Introduce cancel on error API for MultiStageOperator and sending and receiving mailbox  (#10170)
7b75ed2cb7 is described below

commit 7b75ed2cb7e8372b0f860e66fd0f191eba5026de
Author: Yao Liu <ya...@startree.ai>
AuthorDate: Wed Feb 1 10:41:42 2023 -0800

    [multistage] [refactor] Introduce cancel on error API for MultiStageOperator and sending and receiving mailbox  (#10170)
    
    * cancel
    
    * clean up
    
    * error format
    
    * fix todo
    
    * all type support and fix
---
 .../pinot/query/mailbox/GrpcMailboxService.java    |  2 +-
 .../pinot/query/mailbox/GrpcReceivingMailbox.java  |  9 +++---
 .../pinot/query/mailbox/GrpcSendingMailbox.java    | 31 ++++++++++++------
 .../query/mailbox/InMemoryReceivingMailbox.java    |  4 +++
 .../query/mailbox/InMemorySendingMailbox.java      | 13 ++++++--
 .../pinot/query/mailbox/ReceivingMailbox.java      |  2 ++
 .../apache/pinot/query/mailbox/SendingMailbox.java |  7 ++++
 .../channel/MailboxContentStreamObserver.java      |  2 --
 .../channel/MailboxStatusStreamObserver.java       | 37 ++++++----------------
 .../runtime/operator/MailboxSendOperator.java      |  2 +-
 .../query/runtime/operator/MultiStageOperator.java | 14 ++++++--
 .../pinot/query/runtime/operator/OpChain.java      | 12 ++++++-
 .../pinot/query/runtime/operator/SortOperator.java |  4 +++
 13 files changed, 90 insertions(+), 49 deletions(-)

diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
index 280210f22f..fc42388a11 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
@@ -98,7 +98,7 @@ public class GrpcMailboxService implements MailboxService<TransferableBlock> {
    */
   public ReceivingMailbox<TransferableBlock> getReceivingMailbox(MailboxIdentifier mailboxId) {
     return _receivingMailboxMap.computeIfAbsent(
-        mailboxId.toString(), (mId) -> new GrpcReceivingMailbox(mId, this, _gotMailCallback));
+        mailboxId.toString(), (mId) -> new GrpcReceivingMailbox(mId, _gotMailCallback));
   }
 
   public ManagedChannel getChannel(String mailboxId) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
index ce0152f1e6..60e4b444ca 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
@@ -40,7 +40,6 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
  */
 public class GrpcReceivingMailbox implements ReceivingMailbox<TransferableBlock> {
   private static final long DEFAULT_MAILBOX_INIT_TIMEOUT = 100L;
-  private final GrpcMailboxService _mailboxService;
   private final String _mailboxId;
   private Consumer<MailboxIdentifier> _gotMailCallback;
   private final CountDownLatch _initializationLatch;
@@ -48,9 +47,7 @@ public class GrpcReceivingMailbox implements ReceivingMailbox<TransferableBlock>
 
   private MailboxContentStreamObserver _contentStreamObserver;
 
-  public GrpcReceivingMailbox(String mailboxId, GrpcMailboxService mailboxService,
-      Consumer<MailboxIdentifier> gotMailCallback) {
-    _mailboxService = mailboxService;
+  public GrpcReceivingMailbox(String mailboxId, Consumer<MailboxIdentifier> gotMailCallback) {
     _mailboxId = mailboxId;
     _gotMailCallback = gotMailCallback;
     _initializationLatch = new CountDownLatch(1);
@@ -94,6 +91,10 @@ public class GrpcReceivingMailbox implements ReceivingMailbox<TransferableBlock>
     return isInitialized() && _contentStreamObserver.isCompleted();
   }
 
+  @Override
+  public void cancel(Throwable e) {
+  }
+
   private boolean waitForInitialize()
       throws Exception {
     if (_initializationLatch.getCount() > 0) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index ca8e72cef8..1f446952a8 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -20,7 +20,10 @@ package org.apache.pinot.query.mailbox;
 
 import com.google.protobuf.ByteString;
 import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
 import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pinot.common.datablock.DataBlock;
@@ -42,7 +45,9 @@ public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
   private final AtomicBoolean _initialized = new AtomicBoolean(false);
   private final AtomicInteger _totalMsgSent = new AtomicInteger(0);
 
-  private MailboxStatusStreamObserver _statusStreamObserver;
+  private CountDownLatch _finishLatch = new CountDownLatch(1);
+
+  private StreamObserver<MailboxContent> _mailboxContentStreamObserver;
 
   public GrpcSendingMailbox(String mailboxId, GrpcMailboxService mailboxService) {
     _mailboxService = mailboxService;
@@ -54,13 +59,11 @@ public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
       throws UnsupportedOperationException {
     ManagedChannel channel = _mailboxService.getChannel(_mailboxId);
     PinotMailboxGrpc.PinotMailboxStub stub = PinotMailboxGrpc.newStub(channel);
-    _statusStreamObserver = new MailboxStatusStreamObserver();
-    _statusStreamObserver.init(stub.open(_statusStreamObserver));
+    _mailboxContentStreamObserver = stub.open(new MailboxStatusStreamObserver(_finishLatch));
+    // TODO: Replace init call with metadata.
     // send a begin-of-stream message.
-    _statusStreamObserver.send(MailboxContent.newBuilder()
-        .setMailboxId(_mailboxId)
-        .putMetadata(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY, "true")
-        .build());
+    _mailboxContentStreamObserver.onNext(MailboxContent.newBuilder().setMailboxId(_mailboxId)
+        .putMetadata(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY, "true").build());
     _initialized.set(true);
   }
 
@@ -72,13 +75,13 @@ public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
       init();
     }
     MailboxContent data = toMailboxContent(block.getDataBlock());
-    _statusStreamObserver.send(data);
+    _mailboxContentStreamObserver.onNext(data);
     _totalMsgSent.incrementAndGet();
   }
 
   @Override
   public void complete() {
-    _statusStreamObserver.complete();
+    _mailboxContentStreamObserver.onCompleted();
   }
 
   @Override
@@ -86,6 +89,16 @@ public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
     return _mailboxId;
   }
 
+  @Override
+  public void waitForFinish(long timeout, TimeUnit unit)
+      throws InterruptedException {
+    _finishLatch.await(timeout, unit);
+  }
+
+  @Override
+  public void cancel(Throwable t) {
+  }
+
   private MailboxContent toMailboxContent(DataBlock dataBlock) {
     try {
       Mailbox.MailboxContent.Builder builder = Mailbox.MailboxContent.newBuilder().setMailboxId(_mailboxId)
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java
index ac75c39993..43f32c61c5 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java
@@ -63,4 +63,8 @@ public class InMemoryReceivingMailbox implements ReceivingMailbox<TransferableBl
   public boolean isClosed() {
     return _closed && _queue.size() == 0;
   }
+
+  @Override
+  public void cancel(Throwable e) {
+  }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
index ebde5b9c41..81528f7106 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.query.mailbox;
 
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 
@@ -46,8 +47,7 @@ public class InMemorySendingMailbox implements SendingMailbox<TransferableBlock>
     if (!_queue.offer(data)) {
       // this should never happen, since we use a LinkedBlockingQueue
       // which does not have capacity bounds
-      throw new IllegalStateException("Failed to insert into in-memory mailbox "
-          + _mailboxId);
+      throw new IllegalStateException("Failed to insert into in-memory mailbox " + _mailboxId);
     }
     _gotMailCallback.accept(JsonMailboxIdentifier.parse(_mailboxId));
   }
@@ -55,4 +55,13 @@ public class InMemorySendingMailbox implements SendingMailbox<TransferableBlock>
   @Override
   public void complete() {
   }
+
+  @Override
+  public void waitForFinish(long timeout, TimeUnit unit)
+      throws InterruptedException {
+  }
+
+  @Override
+  public void cancel(Throwable t) {
+  }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
index ea999807d0..377430883c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
@@ -54,4 +54,6 @@ public interface ReceivingMailbox<T> {
    * @return
    */
   boolean isClosed();
+
+  void cancel(Throwable e);
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
index 11cee29f0a..7d7b547324 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import java.util.concurrent.TimeUnit;
+
+
 /**
  * Mailbox is used to send and receive data.
  *
@@ -46,4 +49,8 @@ public interface SendingMailbox<T> {
    * Complete delivery of the current mailbox.
    */
   void complete();
+
+  void waitForFinish(long timeout, TimeUnit unit) throws InterruptedException;
+
+  void cancel(Throwable t);
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
index 4a35c8d660..f53a584c51 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
@@ -57,7 +57,6 @@ public class MailboxContentStreamObserver implements StreamObserver<Mailbox.Mail
         .putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true").build();
   }
 
-  private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5;
   private final GrpcMailboxService _mailboxService;
   private final StreamObserver<Mailbox.MailboxStatus> _responseObserver;
   private final boolean _isEnabledFeedback;
@@ -183,7 +182,6 @@ public class MailboxContentStreamObserver implements StreamObserver<Mailbox.Mail
       _errorLock.writeLock().lock();
       _errorContent = createErrorContent(e);
       _gotMailCallback.accept(_mailboxId);
-      // TODO: close the stream.
       throw new RuntimeException(e);
     } catch (IOException ioe) {
       throw new RuntimeException("Unable to encode exception for cascade reporting: " + e, ioe);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
index b758f0793d..fd7443db12 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.query.mailbox.channel;
 
 import io.grpc.stub.StreamObserver;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pinot.common.proto.Mailbox;
 import org.slf4j.Logger;
@@ -30,31 +30,18 @@ import org.slf4j.LoggerFactory;
  * {@code MailboxStatusStreamObserver} is used by the SendingMailbox to send data over the wire.
  *
  * <p>Once {@link org.apache.pinot.query.mailbox.GrpcSendingMailbox#init()} is called, one instances of this class is
- * created based on the opened GRPC connection returned {@link StreamObserver}. From this point, the sending mailbox
- * can use the {@link MailboxStatusStreamObserver#send(Mailbox.MailboxContent)} API to send data packet to the receiving
+ * created based on the opened GRPC connection returned {@link StreamObserver}.
  * end.
  */
 public class MailboxStatusStreamObserver implements StreamObserver<Mailbox.MailboxStatus> {
   private static final Logger LOGGER = LoggerFactory.getLogger(MailboxStatusStreamObserver.class);
   private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5;
   private final AtomicInteger _bufferSize = new AtomicInteger(5);
-  private final AtomicBoolean _isCompleted = new AtomicBoolean(false);
 
-  private StreamObserver<Mailbox.MailboxContent> _mailboxContentStreamObserver;
+  private CountDownLatch _finishLatch;
 
-  public MailboxStatusStreamObserver() {
-  }
-
-  public void init(StreamObserver<Mailbox.MailboxContent> mailboxContentStreamObserver) {
-    _mailboxContentStreamObserver = mailboxContentStreamObserver;
-  }
-
-  public void send(Mailbox.MailboxContent mailboxContent) {
-    _mailboxContentStreamObserver.onNext(mailboxContent);
-  }
-
-  public void complete() {
-    _mailboxContentStreamObserver.onCompleted();
+  public MailboxStatusStreamObserver(CountDownLatch finishLatch) {
+    _finishLatch = finishLatch;
   }
 
   @Override
@@ -63,8 +50,8 @@ public class MailboxStatusStreamObserver implements StreamObserver<Mailbox.Mailb
     // so we can make better throughput send judgement. here is a simple example.
     // TODO: this feedback info is not used to throttle the send speed. it is currently being discarded.
     if (mailboxStatus.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY)) {
-      _bufferSize.set(Integer.parseInt(
-          mailboxStatus.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY)));
+      _bufferSize.set(
+          Integer.parseInt(mailboxStatus.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY)));
     } else {
       _bufferSize.set(DEFAULT_MAILBOX_QUEUE_CAPACITY); // DEFAULT_AVAILABILITY;
     }
@@ -72,17 +59,13 @@ public class MailboxStatusStreamObserver implements StreamObserver<Mailbox.Mailb
 
   @Override
   public void onError(Throwable e) {
-    _isCompleted.set(true);
-    shutdown();
+    _finishLatch.countDown();
+    LOGGER.error("Receiving error msg from grpc mailbox status stream:", e);
     throw new RuntimeException(e);
   }
 
-  private void shutdown() {
-  }
-
   @Override
   public void onCompleted() {
-    _isCompleted.set(true);
-    shutdown();
+    _finishLatch.countDown();
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index e37fa15133..177926e0f3 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -118,7 +118,7 @@ public class MailboxSendOperator extends MultiStageOperator {
 
   @Override
   public List<MultiStageOperator> getChildOperators() {
-    return ImmutableList.of();
+    return ImmutableList.of(_dataTableBlockBaseOperator);
   }
 
   @Nullable
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index 9c82d1cb2a..37e954e6ae 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -27,7 +27,6 @@ import org.slf4j.LoggerFactory;
 public abstract class MultiStageOperator extends BaseOperator<TransferableBlock> implements AutoCloseable {
   private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MultiStageOperator.class);
 
-  // TODO: use the API public List<? extends Operator> getChildOperators() to merge two APIs.
   @Override
   public List<MultiStageOperator> getChildOperators() {
     throw new UnsupportedOperationException();
@@ -41,9 +40,20 @@ public abstract class MultiStageOperator extends BaseOperator<TransferableBlock>
       try {
         op.close();
       } catch (Exception e) {
-        LOGGER.error("Failed to close operator:" + op);
+        LOGGER.error("Failed to close operator: " + op + " with exception:" + e);
         // Continue processing because even one operator failed to be close, we should still close the rest.
       }
     }
   }
+
+  public void cancel(Throwable e) {
+    for (MultiStageOperator op : getChildOperators()) {
+      try {
+        op.cancel(e);
+      } catch (Exception e2) {
+        LOGGER.error("Failed to cancel operator:" + op + "with error:" + e + " with exception:" + e2);
+        // Continue processing because even one operator failed to be cancelled, we should still cancel the rest.
+      }
+    }
+  }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
index ae6bae362f..ddbe1939ff 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
@@ -31,7 +31,6 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock;
  * by send/receive stages.
  */
 public class OpChain implements AutoCloseable {
-
   private final MultiStageOperator _root;
   private final Set<MailboxIdentifier> _receivingMailbox;
   private final OpChainStats _stats;
@@ -62,8 +61,19 @@ public class OpChain implements AutoCloseable {
     return "OpChain{" + _id + "}";
   }
 
+  /**
+   * close() is called when we finish execution successfully.
+   */
   @Override
   public void close() {
     _root.close();
   }
+
+  /**
+   * cancel() is called when execution runs into error.
+   * @param e
+   */
+  public void cancel(Throwable e) {
+    _root.cancel(e);
+  }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 13f4306e01..1dee1e60c5 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -82,6 +82,10 @@ public class SortOperator extends MultiStageOperator {
     return ImmutableList.of(_upstreamOperator);
   }
 
+  @Override
+  public void cancel(Throwable e) {
+  }
+
   @Nullable
   @Override
   public String toExplainString() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org