You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/05/04 04:51:54 UTC

[pinot] branch master updated: [cleanup] last place to clean up OpChain wake up callback (#10710)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b190599cf1 [cleanup] last place to clean up OpChain wake up callback (#10710)
b190599cf1 is described below

commit b190599cf1ac2a16fc3ab43320dd7fde092dd734
Author: Rong Rong <ro...@apache.org>
AuthorDate: Wed May 3 21:51:45 2023 -0700

    [cleanup] last place to clean up OpChain wake up callback (#10710)
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../java/org/apache/pinot/query/mailbox/MailboxService.java  |  5 +++--
 .../org/apache/pinot/query/mailbox/ReceivingMailbox.java     |  9 +++++----
 .../pinot/query/runtime/executor/OpChainScheduler.java       |  7 ++++---
 .../query/runtime/executor/OpChainSchedulerService.java      | 12 ++++++------
 .../pinot/query/runtime/executor/RoundRobinScheduler.java    |  8 +++-----
 .../org/apache/pinot/query/mailbox/MailboxServiceTest.java   |  5 +++--
 .../query/runtime/executor/RoundRobinSchedulerTest.java      |  8 ++++----
 7 files changed, 28 insertions(+), 26 deletions(-)

diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
index c8e6cf6927..aa142c11ac 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import org.apache.pinot.query.mailbox.channel.ChannelManager;
 import org.apache.pinot.query.mailbox.channel.GrpcMailboxServer;
+import org.apache.pinot.query.runtime.operator.OpChainId;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,12 +56,12 @@ public class MailboxService {
   private final String _hostname;
   private final int _port;
   private final PinotConfiguration _config;
-  private final Consumer<String> _receiveMailCallback;
+  private final Consumer<OpChainId> _receiveMailCallback;
   private final ChannelManager _channelManager = new ChannelManager();
 
   private GrpcMailboxServer _grpcMailboxServer;
 
-  public MailboxService(String hostname, int port, PinotConfiguration config, Consumer<String> receiveMailCallback) {
+  public MailboxService(String hostname, int port, PinotConfiguration config, Consumer<OpChainId> receiveMailCallback) {
     _hostname = hostname;
     _port = port;
     _config = config;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
index df7d0b93c2..fcba7c0a3d 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
@@ -27,6 +27,7 @@ import java.util.function.Consumer;
 import javax.annotation.Nullable;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.OpChainId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,13 +46,13 @@ public class ReceivingMailbox {
       TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException("Cancelled by receiver"));
 
   private final String _id;
-  private final Consumer<String> _receiveMailCallback;
+  private final Consumer<OpChainId> _receiveMailCallback;
   // TODO: Make the queue size configurable
   // TODO: Revisit if this is the correct way to apply back pressure
   private final BlockingQueue<TransferableBlock> _blocks = new ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_BLOCKS);
   private final AtomicReference<TransferableBlock> _errorBlock = new AtomicReference<>();
 
-  public ReceivingMailbox(String id, Consumer<String> receiveMailCallback) {
+  public ReceivingMailbox(String id, Consumer<OpChainId> receiveMailCallback) {
     _id = id;
     _receiveMailCallback = receiveMailCallback;
   }
@@ -78,7 +79,7 @@ public class ReceivingMailbox {
     try {
       if (_blocks.offer(block, timeoutMs, TimeUnit.MILLISECONDS)) {
         if (_errorBlock.get() == null) {
-          _receiveMailCallback.accept(_id);
+          _receiveMailCallback.accept(MailboxIdUtils.toOpChainId(_id));
           return true;
         } else {
           LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring the late block", _id);
@@ -104,7 +105,7 @@ public class ReceivingMailbox {
   public void setErrorBlock(TransferableBlock errorBlock) {
     if (_errorBlock.compareAndSet(null, errorBlock)) {
       _blocks.clear();
-      _receiveMailCallback.accept(_id);
+      _receiveMailCallback.accept(MailboxIdUtils.toOpChainId(_id));
     }
   }
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
index e32c04f9be..b19ea88263 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.query.runtime.operator.OpChain;
+import org.apache.pinot.query.runtime.operator.OpChainId;
 
 
 /**
@@ -52,12 +53,12 @@ public interface OpChainScheduler {
   void yield(OpChain opChain);
 
   /**
-   * A callback called whenever data is received for the given mailbox. This can be used by the scheduler
+   * A callback called whenever data is received for the given opChain. This can be used by the scheduler
    * implementations to re-scheduled suspended OpChains. This method may be called for an OpChain that has not yet
    * been scheduled, or an OpChain that has already been de-registered.
-   * @param mailboxId the mailbox ID
+   * @param opChainId the {@link OpChain} ID
    */
-  void onDataAvailable(String mailboxId);
+  void onDataAvailable(OpChainId opChainId);
 
   /**
    * Returns an OpChain that is ready to be run by {@link OpChainSchedulerService}, waiting for the given time if
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
index bd4dee17df..a886269ab9 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.pinot.core.util.trace.TraceRunnable;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.operator.OpChain;
+import org.apache.pinot.query.runtime.operator.OpChainId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -152,14 +153,13 @@ public class OpChainSchedulerService extends AbstractExecutionThreadService {
   }
 
   /**
-   * This method should be called whenever data is available in a given mailbox.
-   * Implementations of this method should be idempotent, it may be called in the
-   * scenario that no mail is available.
+   * This method should be called whenever data is available for an {@link OpChain} to consume.
+   * Implementations of this method should be idempotent, it may be called in the scenario that no data is available.
    *
-   * @param mailboxId the identifier of the mailbox that now has data
+   * @param opChainId the identifier of the operator chain
    */
-  public final void onDataAvailable(String mailboxId) {
-    _scheduler.onDataAvailable(mailboxId);
+  public final void onDataAvailable(OpChainId opChainId) {
+    _scheduler.onDataAvailable(opChainId);
   }
 
   // TODO: remove this method after we pipe down the proper executor pool to the v1 engine
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
index 79576ca373..a1934d884a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
@@ -35,7 +35,6 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
 import javax.annotation.concurrent.ThreadSafe;
-import org.apache.pinot.query.mailbox.MailboxIdUtils;
 import org.apache.pinot.query.runtime.operator.OpChain;
 import org.apache.pinot.query.runtime.operator.OpChainId;
 import org.slf4j.Logger;
@@ -191,12 +190,11 @@ public class RoundRobinScheduler implements OpChainScheduler {
   }
 
   @Override
-  public void onDataAvailable(String mailboxId) {
-    OpChainId opChainId = MailboxIdUtils.toOpChainId(mailboxId);
+  public void onDataAvailable(OpChainId opChainId) {
     // If this chain isn't alive as per the scheduler, don't do anything. If the OpChain is registered after this, it
     // will anyways be scheduled to run since new OpChains are run immediately.
     if (!_aliveChains.containsKey(opChainId)) {
-      trace("got mail, but the OpChain is not registered so ignoring the event " + mailboxId);
+      trace("woken up but the OpChain is not registered so ignoring the event: " + opChainId);
       return;
     }
     _lock.lock();
@@ -217,7 +215,7 @@ public class RoundRobinScheduler implements OpChainScheduler {
     } finally {
       _lock.unlock();
     }
-    trace("got mail for " + mailboxId);
+    trace("got data for " + opChainId);
   }
 
   @Override
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
index b0c45013e9..0b77996c22 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.OpChainId;
 import org.apache.pinot.query.runtime.operator.OperatorTestUtil;
 import org.apache.pinot.query.service.QueryConfig;
 import org.apache.pinot.query.testutils.QueryTestUtils;
@@ -46,7 +47,7 @@ public class MailboxServiceTest {
   private static final DataSchema DATA_SCHEMA =
       new DataSchema(new String[]{"testColumn"}, new ColumnDataType[]{ColumnDataType.INT});
 
-  private final AtomicReference<Consumer<String>> _receiveMailCallback1 = new AtomicReference<>();
+  private final AtomicReference<Consumer<OpChainId>> _receiveMailCallback1 = new AtomicReference<>();
 
   private MailboxService _mailboxService1;
   private MailboxService _mailboxService2;
@@ -58,7 +59,7 @@ public class MailboxServiceTest {
     PinotConfiguration config = new PinotConfiguration(
         Collections.singletonMap(QueryConfig.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES, 4_000_000));
     _mailboxService1 = new MailboxService("localhost", QueryTestUtils.getAvailablePort(), config,
-        mailboxId -> _receiveMailCallback1.get().accept(mailboxId));
+        opChainId -> _receiveMailCallback1.get().accept(opChainId));
     _mailboxService1.start();
     _mailboxService2 = new MailboxService("localhost", QueryTestUtils.getAvailablePort(), config, mailboxId -> {
     });
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
index 807b361e63..6589fe079f 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
@@ -88,7 +88,7 @@ public class RoundRobinSchedulerTest {
     // When Op-Chain is done executing, yield is called
     _scheduler.yield(chain);
     // When data is received, callback is called
-    _scheduler.onDataAvailable(MAILBOX_1);
+    _scheduler.onDataAvailable(MailboxIdUtils.toOpChainId(MAILBOX_1));
     // next should return the OpChain immediately after the callback
     Assert.assertEquals(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS), chain);
     // Say the OpChain is done, then a de-register will be called
@@ -160,9 +160,9 @@ public class RoundRobinSchedulerTest {
     _scheduler.yield(chain3);
     _scheduler.yield(chain2);
     // Data may be received in arbitrary order
-    _scheduler.onDataAvailable(MAILBOX_2);
-    _scheduler.onDataAvailable(MAILBOX_3);
-    _scheduler.onDataAvailable(MAILBOX_1);
+    _scheduler.onDataAvailable(MailboxIdUtils.toOpChainId(MAILBOX_2));
+    _scheduler.onDataAvailable(MailboxIdUtils.toOpChainId(MAILBOX_3));
+    _scheduler.onDataAvailable(MailboxIdUtils.toOpChainId(MAILBOX_1));
     // Subsequent polls would be in the order the callback was processed. A callback here is said to be "processed"
     // if it has successfully returned.
     Assert.assertEquals(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS), chain2);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org