You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/05/04 04:51:54 UTC
[pinot] branch master updated: [cleanup] last place to clean up OpChain wake up callback (#10710)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b190599cf1 [cleanup] last place to clean up OpChain wake up callback (#10710)
b190599cf1 is described below
commit b190599cf1ac2a16fc3ab43320dd7fde092dd734
Author: Rong Rong <ro...@apache.org>
AuthorDate: Wed May 3 21:51:45 2023 -0700
[cleanup] last place to clean up OpChain wake up callback (#10710)
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../java/org/apache/pinot/query/mailbox/MailboxService.java | 5 +++--
.../org/apache/pinot/query/mailbox/ReceivingMailbox.java | 9 +++++----
.../pinot/query/runtime/executor/OpChainScheduler.java | 7 ++++---
.../query/runtime/executor/OpChainSchedulerService.java | 12 ++++++------
.../pinot/query/runtime/executor/RoundRobinScheduler.java | 8 +++-----
.../org/apache/pinot/query/mailbox/MailboxServiceTest.java | 5 +++--
.../query/runtime/executor/RoundRobinSchedulerTest.java | 8 ++++----
7 files changed, 28 insertions(+), 26 deletions(-)
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
index c8e6cf6927..aa142c11ac 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.pinot.query.mailbox.channel.ChannelManager;
import org.apache.pinot.query.mailbox.channel.GrpcMailboxServer;
+import org.apache.pinot.query.runtime.operator.OpChainId;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,12 +56,12 @@ public class MailboxService {
private final String _hostname;
private final int _port;
private final PinotConfiguration _config;
- private final Consumer<String> _receiveMailCallback;
+ private final Consumer<OpChainId> _receiveMailCallback;
private final ChannelManager _channelManager = new ChannelManager();
private GrpcMailboxServer _grpcMailboxServer;
- public MailboxService(String hostname, int port, PinotConfiguration config, Consumer<String> receiveMailCallback) {
+ public MailboxService(String hostname, int port, PinotConfiguration config, Consumer<OpChainId> receiveMailCallback) {
_hostname = hostname;
_port = port;
_config = config;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
index df7d0b93c2..fcba7c0a3d 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
@@ -27,6 +27,7 @@ import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.OpChainId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,13 +46,13 @@ public class ReceivingMailbox {
TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException("Cancelled by receiver"));
private final String _id;
- private final Consumer<String> _receiveMailCallback;
+ private final Consumer<OpChainId> _receiveMailCallback;
// TODO: Make the queue size configurable
// TODO: Revisit if this is the correct way to apply back pressure
private final BlockingQueue<TransferableBlock> _blocks = new ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_BLOCKS);
private final AtomicReference<TransferableBlock> _errorBlock = new AtomicReference<>();
- public ReceivingMailbox(String id, Consumer<String> receiveMailCallback) {
+ public ReceivingMailbox(String id, Consumer<OpChainId> receiveMailCallback) {
_id = id;
_receiveMailCallback = receiveMailCallback;
}
@@ -78,7 +79,7 @@ public class ReceivingMailbox {
try {
if (_blocks.offer(block, timeoutMs, TimeUnit.MILLISECONDS)) {
if (_errorBlock.get() == null) {
- _receiveMailCallback.accept(_id);
+ _receiveMailCallback.accept(MailboxIdUtils.toOpChainId(_id));
return true;
} else {
LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring the late block", _id);
@@ -104,7 +105,7 @@ public class ReceivingMailbox {
public void setErrorBlock(TransferableBlock errorBlock) {
if (_errorBlock.compareAndSet(null, errorBlock)) {
_blocks.clear();
- _receiveMailCallback.accept(_id);
+ _receiveMailCallback.accept(MailboxIdUtils.toOpChainId(_id));
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
index e32c04f9be..b19ea88263 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.query.runtime.operator.OpChain;
+import org.apache.pinot.query.runtime.operator.OpChainId;
/**
@@ -52,12 +53,12 @@ public interface OpChainScheduler {
void yield(OpChain opChain);
/**
- * A callback called whenever data is received for the given mailbox. This can be used by the scheduler
+ * A callback called whenever data is received for the given opChain. This can be used by the scheduler
* implementations to re-scheduled suspended OpChains. This method may be called for an OpChain that has not yet
* been scheduled, or an OpChain that has already been de-registered.
- * @param mailboxId the mailbox ID
+ * @param opChainId the {@link OpChain} ID
*/
- void onDataAvailable(String mailboxId);
+ void onDataAvailable(OpChainId opChainId);
/**
* Returns an OpChain that is ready to be run by {@link OpChainSchedulerService}, waiting for the given time if
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
index bd4dee17df..a886269ab9 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.pinot.core.util.trace.TraceRunnable;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.operator.OpChain;
+import org.apache.pinot.query.runtime.operator.OpChainId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -152,14 +153,13 @@ public class OpChainSchedulerService extends AbstractExecutionThreadService {
}
/**
- * This method should be called whenever data is available in a given mailbox.
- * Implementations of this method should be idempotent, it may be called in the
- * scenario that no mail is available.
+ * This method should be called whenever data is available for an {@link OpChain} to consume.
+ * Implementations of this method should be idempotent, it may be called in the scenario that no data is available.
*
- * @param mailboxId the identifier of the mailbox that now has data
+ * @param opChainId the identifier of the operator chain
*/
- public final void onDataAvailable(String mailboxId) {
- _scheduler.onDataAvailable(mailboxId);
+ public final void onDataAvailable(OpChainId opChainId) {
+ _scheduler.onDataAvailable(opChainId);
}
// TODO: remove this method after we pipe down the proper executor pool to the v1 engine
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
index 79576ca373..a1934d884a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
@@ -35,7 +35,6 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import javax.annotation.concurrent.ThreadSafe;
-import org.apache.pinot.query.mailbox.MailboxIdUtils;
import org.apache.pinot.query.runtime.operator.OpChain;
import org.apache.pinot.query.runtime.operator.OpChainId;
import org.slf4j.Logger;
@@ -191,12 +190,11 @@ public class RoundRobinScheduler implements OpChainScheduler {
}
@Override
- public void onDataAvailable(String mailboxId) {
- OpChainId opChainId = MailboxIdUtils.toOpChainId(mailboxId);
+ public void onDataAvailable(OpChainId opChainId) {
// If this chain isn't alive as per the scheduler, don't do anything. If the OpChain is registered after this, it
// will anyways be scheduled to run since new OpChains are run immediately.
if (!_aliveChains.containsKey(opChainId)) {
- trace("got mail, but the OpChain is not registered so ignoring the event " + mailboxId);
+ trace("woken up but the OpChain is not registered so ignoring the event: " + opChainId);
return;
}
_lock.lock();
@@ -217,7 +215,7 @@ public class RoundRobinScheduler implements OpChainScheduler {
} finally {
_lock.unlock();
}
- trace("got mail for " + mailboxId);
+ trace("got data for " + opChainId);
}
@Override
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
index b0c45013e9..0b77996c22 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.OpChainId;
import org.apache.pinot.query.runtime.operator.OperatorTestUtil;
import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.query.testutils.QueryTestUtils;
@@ -46,7 +47,7 @@ public class MailboxServiceTest {
private static final DataSchema DATA_SCHEMA =
new DataSchema(new String[]{"testColumn"}, new ColumnDataType[]{ColumnDataType.INT});
- private final AtomicReference<Consumer<String>> _receiveMailCallback1 = new AtomicReference<>();
+ private final AtomicReference<Consumer<OpChainId>> _receiveMailCallback1 = new AtomicReference<>();
private MailboxService _mailboxService1;
private MailboxService _mailboxService2;
@@ -58,7 +59,7 @@ public class MailboxServiceTest {
PinotConfiguration config = new PinotConfiguration(
Collections.singletonMap(QueryConfig.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES, 4_000_000));
_mailboxService1 = new MailboxService("localhost", QueryTestUtils.getAvailablePort(), config,
- mailboxId -> _receiveMailCallback1.get().accept(mailboxId));
+ opChainId -> _receiveMailCallback1.get().accept(opChainId));
_mailboxService1.start();
_mailboxService2 = new MailboxService("localhost", QueryTestUtils.getAvailablePort(), config, mailboxId -> {
});
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
index 807b361e63..6589fe079f 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
@@ -88,7 +88,7 @@ public class RoundRobinSchedulerTest {
// When Op-Chain is done executing, yield is called
_scheduler.yield(chain);
// When data is received, callback is called
- _scheduler.onDataAvailable(MAILBOX_1);
+ _scheduler.onDataAvailable(MailboxIdUtils.toOpChainId(MAILBOX_1));
// next should return the OpChain immediately after the callback
Assert.assertEquals(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS), chain);
// Say the OpChain is done, then a de-register will be called
@@ -160,9 +160,9 @@ public class RoundRobinSchedulerTest {
_scheduler.yield(chain3);
_scheduler.yield(chain2);
// Data may be received in arbitrary order
- _scheduler.onDataAvailable(MAILBOX_2);
- _scheduler.onDataAvailable(MAILBOX_3);
- _scheduler.onDataAvailable(MAILBOX_1);
+ _scheduler.onDataAvailable(MailboxIdUtils.toOpChainId(MAILBOX_2));
+ _scheduler.onDataAvailable(MailboxIdUtils.toOpChainId(MAILBOX_3));
+ _scheduler.onDataAvailable(MailboxIdUtils.toOpChainId(MAILBOX_1));
// Subsequent polls would be in the order the callback was processed. A callback here is said to be "processed"
// if it has successfully returned.
Assert.assertEquals(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS), chain2);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org