You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/02/22 03:50:54 UTC
[pinot] branch master updated: [multistage] Fix Issues with OpChainScheduler (#10289)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1aa1368a1a [multistage] Fix Issues with OpChainScheduler (#10289)
1aa1368a1a is described below
commit 1aa1368a1a4f6467a2e49866f3de51f4d519942d
Author: Ankit Sultana <an...@uber.com>
AuthorDate: Wed Feb 22 09:20:47 2023 +0530
[multistage] Fix Issues with OpChainScheduler (#10289)
* [multistage] Fix Issues with OpChainScheduler
* Remove _running
* Add UTs
* Add support for parallelism > 1 + fix tests + address comments
* Add Nullable annotation to the interface
* Address feedback
* Add a state transition diagram
* Prevent race-condition in Stopwatch
* Fix OpChainStats unintended changes + typo in previous commit
---
.../apache/pinot/query/runtime/QueryRunner.java | 2 +-
.../query/runtime/executor/OpChainScheduler.java | 61 ++--
.../runtime/executor/OpChainSchedulerService.java | 177 +++++-------
.../runtime/executor/RoundRobinScheduler.java | 315 ++++++++++++---------
.../pinot/query/runtime/operator/OpChain.java | 13 +-
.../pinot/query/runtime/operator/OpChainId.java | 57 ++++
.../pinot/query/runtime/operator/OpChainStats.java | 18 +-
.../query/runtime/plan/PhysicalPlanVisitor.java | 3 +-
.../apache/pinot/query/service/QueryConfig.java | 2 +-
.../apache/pinot/query/QueryServerEnclosure.java | 3 +-
.../executor/OpChainSchedulerServiceTest.java | 160 ++++-------
.../runtime/executor/RoundRobinSchedulerTest.java | 212 +++++++-------
12 files changed, 538 insertions(+), 485 deletions(-)
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 0a18ac555a..d171336a52 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -106,7 +106,7 @@ public class QueryRunner {
_executorService = Executors.newFixedThreadPool(
ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
new NamedThreadFactory("query_worker_on_" + _port + "_port"));
- _scheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs), _executorService, releaseMs);
+ _scheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs), _executorService);
_mailboxService = MultiplexingMailboxService.newInstance(_hostname, _port, config, _scheduler::onDataAvailable);
_serverExecutor = new ServerQueryExecutorV1Impl();
_serverExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX), instanceDataManager, serverMetrics);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
index 635e5aacfb..0f522076e2 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java
@@ -18,52 +18,69 @@
*/
package org.apache.pinot.query.runtime.executor;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.runtime.operator.OpChain;
/**
- * An interface that defines different scheduling strategies to work with the
- * {@link OpChainSchedulerService}. All methods are thread safe and can be guaranteed
- * to never be called concurrently - therefore all implementations may use data
- * structures that are not concurrent.
+ * An interface that defines different scheduling strategies to work with the {@link OpChainSchedulerService}.
*/
+@ThreadSafe
public interface OpChainScheduler {
-
/**
+ * Registers a new OpChain with the scheduler.
* @param operatorChain the operator chain to register
- * @param isNew whether or not this is the first time the operator is scheduled
*/
- void register(OpChain operatorChain, boolean isNew);
+ void register(OpChain operatorChain);
/**
- * This method is called whenever {@code mailbox} has new data available to consume,
- * this can be useful for advanced scheduling algorithms
- *
- * @param mailbox the mailbox ID
+ * When the OpChain is finished, error or otherwise, deregister is called for it so the scheduler can do any required
+ * cleanup. After an OpChain is de-registered, the scheduler service will never call any other method for it.
+ * However, the {@link #onDataAvailable} callback may be called even after an OpChain is de-registered, and the
+ * scheduler should handle that scenario.
+ * @param operatorChain an operator chain that is finished (error or otherwise).
*/
- void onDataAvailable(MailboxIdentifier mailbox);
+ void deregister(OpChain operatorChain);
/**
- * This method is called when scheduler is terminating. It should clean up all of the resources if there are any.
- * register() and onDataAvailable() shouldn't be called anymore after shutDown is called.
+ * Used by {@link OpChainSchedulerService} to indicate that a given OpChain can be suspended until it receives some
+ * data. Note that this method is only used by the scheduler service to "indicate" that an OpChain can be suspended.
+ * The decision on whether to actually suspend or not can be taken by the scheduler.
*/
- void shutDown();
+ void yield(OpChain opChain);
/**
- * @return whether or not there is any work for the scheduler to do
+ * A callback called whenever data is received for the given mailbox. This can be used by the scheduler
+ * implementations to re-scheduled suspended OpChains. This method may be called for an OpChain that has not yet
+ * been scheduled, or an OpChain that has already been de-registered.
+ * @param mailbox the mailbox ID
*/
- boolean hasNext();
+ void onDataAvailable(MailboxIdentifier mailbox);
/**
- * @return the next operator chain to process
- * @throws java.util.NoSuchElementException if {@link #hasNext()} returns false
- * prior to this call
+ * Returns an OpChain that is ready to be run by {@link OpChainSchedulerService}, waiting for the given time if
+ * there are no such OpChains ready yet. Will return null if there's no ready OpChains even after the specified time.
+ *
+ * @param time non-negative value that determines the time the scheduler will wait for new OpChains to be ready.
+ * @param timeUnit TimeUnit for the await time.
+ * @return a non-null OpChain that's ready to be run, or null if there's no OpChain ready even after waiting for the
+ * given time.
+ * @throws InterruptedException if the wait for a ready OpChain was interrupted.
*/
- OpChain next();
+ @Nullable
+ OpChain next(long time, TimeUnit timeUnit)
+ throws InterruptedException;
/**
- * @return the number of operator chains that are awaiting execution
+ * @return the number of operator chains registered with the scheduler
*/
int size();
+
+ /**
+ * TODO: Figure out shutdown flow in context of graceful shutdown.
+ */
+ void shutdownNow();
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
index cfc3959c93..5c3e9fa29f 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -19,8 +19,6 @@
package org.apache.pinot.query.runtime.executor;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
-import com.google.common.util.concurrent.Monitor;
-import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.core.util.trace.TraceRunnable;
@@ -35,116 +33,78 @@ import org.slf4j.LoggerFactory;
* This class provides the implementation for scheduling multistage queries on a single node based
* on the {@link OpChainScheduler} logic that is passed in. Multistage queries support partial execution
* and will return a NOOP metadata block as a "yield" signal, indicating that the next operator
- * chain ({@link OpChainScheduler#next()} will be requested.
- *
- * <p>Note that a yielded operator chain will be re-registered with the underlying scheduler.
+ * chain ({@link OpChainScheduler#next} will be requested.
*/
@SuppressWarnings("UnstableApiUsage")
public class OpChainSchedulerService extends AbstractExecutionThreadService {
-
private static final Logger LOGGER = LoggerFactory.getLogger(OpChainSchedulerService.class);
-
- private static final int TERMINATION_TIMEOUT_SEC = 60;
+ // Default time scheduler is allowed to wait for a runnable OpChain to be available
+ private static final long DEFAULT_SCHEDULER_NEXT_WAIT_MS = 100;
private final OpChainScheduler _scheduler;
private final ExecutorService _workerPool;
- private final long _pollIntervalMs;
-
- // anything that is guarded by this monitor should be non-blocking
- private final Monitor _monitor = new Monitor();
- private final Monitor.Guard _hasNextOrClosing = new Monitor.Guard(_monitor) {
- @Override
- public boolean isSatisfied() {
- return _scheduler.hasNext() || !isRunning();
- }
- };
- // Note that workerPool is shut down in this class.
public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService workerPool) {
- this(scheduler, workerPool, -1);
- }
-
- public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService workerPool, long pollIntervalMs) {
_scheduler = scheduler;
_workerPool = workerPool;
- _pollIntervalMs = pollIntervalMs;
}
@Override
protected void triggerShutdown() {
+ // TODO: Figure out shutdown lifecycle with graceful shutdown in mind.
LOGGER.info("Triggered shutdown on OpChainScheduler...");
- // this will just notify all waiters that the scheduler is shutting down
- _monitor.enter();
- _monitor.leave();
- if (!MoreExecutors.shutdownAndAwaitTermination(_workerPool, TERMINATION_TIMEOUT_SEC, TimeUnit.SECONDS)) {
- LOGGER.error("Failed to shut down and terminate OpChainScheduler.");
- }
- _scheduler.shutDown();
}
@Override
protected void run()
throws Exception {
while (isRunning()) {
- if (enterMonitor()) {
- try {
- if (!isRunning()) {
- return;
- }
-
- OpChain operatorChain = _scheduler.next();
- LOGGER.trace("({}): Scheduling", operatorChain);
- _workerPool.submit(new TraceRunnable() {
- @Override
- public void runJob() {
- try {
- LOGGER.trace("({}): Executing", operatorChain);
- operatorChain.getStats().executing();
-
- // so long as there's work to be done, keep getting the next block
- // when the operator chain returns a NOOP block, then yield the execution
- // of this to another worker
- TransferableBlock result = operatorChain.getRoot().nextBlock();
- while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
- result = operatorChain.getRoot().nextBlock();
- }
+ OpChain operatorChain = _scheduler.next(DEFAULT_SCHEDULER_NEXT_WAIT_MS, TimeUnit.MILLISECONDS);
+ if (operatorChain == null) {
+ continue;
+ }
+ LOGGER.trace("({}): Scheduling", operatorChain);
+ _workerPool.submit(new TraceRunnable() {
+ @Override
+ public void runJob() {
+ boolean isFinished = false;
+ Throwable thrown = null;
+ try {
+ LOGGER.trace("({}): Executing", operatorChain);
+ operatorChain.getStats().executing();
+
+ // so long as there's work to be done, keep getting the next block
+ // when the operator chain returns a NOOP block, then yield the execution
+ // of this to another worker
+ TransferableBlock result = operatorChain.getRoot().nextBlock();
+ while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
+ result = operatorChain.getRoot().nextBlock();
+ }
- if (!result.isEndOfStreamBlock()) {
- // not complete, needs to re-register for scheduling
- register(operatorChain, false);
- } else {
- if (result.isErrorBlock()) {
- operatorChain.getRoot().toExplainString();
- LOGGER.error("({}): Completed erroneously {} {}", operatorChain, operatorChain.getStats(),
- result.getDataBlock().getExceptions());
- } else {
- operatorChain.getRoot().toExplainString();
- operatorChain.getStats().setOperatorStatsMap(result.getResultMetadata());
- LOGGER.debug("({}): Completed {}", operatorChain, operatorChain.getStats());
- }
- operatorChain.close();
- }
- } catch (Exception e) {
- operatorChain.close();
- operatorChain.getRoot().toExplainString();
- LOGGER.error("({}): Failed to execute operator chain! {}", operatorChain, operatorChain.getStats(), e);
- }
+ if (!result.isEndOfStreamBlock()) {
+ // TODO: There should be a waiting-for-data state in OpChainStats.
+ operatorChain.getStats().queued();
+ _scheduler.yield(operatorChain);
+ } else if (result.isEndOfStreamBlock()) {
+ isFinished = true;
+ LOGGER.error("({}): Completed erroneously {} {}", operatorChain, operatorChain.getStats(),
+ result.getDataBlock().getExceptions());
+ } else {
+ operatorChain.getStats().setOperatorStatsMap(result.getResultMetadata());
+ LOGGER.debug("({}): Completed {}", operatorChain, operatorChain.getStats());
}
- });
- } finally {
- _monitor.leave();
+ } catch (Exception e) {
+ LOGGER.error("({}): Failed to execute operator chain! {}", operatorChain, operatorChain.getStats(), e);
+ thrown = e;
+ } finally {
+ if (isFinished) {
+ closeOpChain(operatorChain);
+ } else if (thrown != null) {
+ cancelOpChain(operatorChain, thrown);
+ }
+ }
}
- }
- }
- }
-
- private boolean enterMonitor()
- throws InterruptedException {
- if (_pollIntervalMs >= 0) {
- return _monitor.enterWhen(_hasNextOrClosing, _pollIntervalMs, TimeUnit.MILLISECONDS);
- } else {
- _monitor.enterWhen(_hasNextOrClosing);
- return true;
+ });
}
}
@@ -154,24 +114,15 @@ public class OpChainSchedulerService extends AbstractExecutionThreadService {
* @param operatorChain the chain to register
*/
public final void register(OpChain operatorChain) {
- register(operatorChain, true);
+ operatorChain.getStats().queued();
+ _scheduler.register(operatorChain);
LOGGER.debug("({}): Scheduler is now handling operator chain listening to mailboxes {}. "
- + "There are a total of {} chains awaiting execution.", operatorChain, operatorChain.getReceivingMailbox(),
+ + "There are a total of {} chains awaiting execution.",
+ operatorChain,
+ operatorChain.getReceivingMailbox(),
_scheduler.size());
}
- public final void register(OpChain operatorChain, boolean isNew) {
- _monitor.enter();
- try {
- LOGGER.trace("({}): Registered operator chain (new: {}). Total: {}", operatorChain, isNew, _scheduler.size());
-
- _scheduler.register(operatorChain, isNew);
- } finally {
- operatorChain.getStats().queued();
- _monitor.leave();
- }
- }
-
/**
* This method should be called whenever data is available in a given mailbox.
* Implementations of this method should be idempotent, it may be called in the
@@ -180,17 +131,27 @@ public class OpChainSchedulerService extends AbstractExecutionThreadService {
* @param mailbox the identifier of the mailbox that now has data
*/
public final void onDataAvailable(MailboxIdentifier mailbox) {
- _monitor.enter();
- try {
- LOGGER.trace("Notified onDataAvailable for mailbox {}", mailbox);
- _scheduler.onDataAvailable(mailbox);
- } finally {
- _monitor.leave();
- }
+ _scheduler.onDataAvailable(mailbox);
}
// TODO: remove this method after we pipe down the proper executor pool to the v1 engine
public ExecutorService getWorkerPool() {
return _workerPool;
}
+
+ private void closeOpChain(OpChain opChain) {
+ try {
+ opChain.close();
+ } finally {
+ _scheduler.deregister(opChain);
+ }
+ }
+
+ private void cancelOpChain(OpChain opChain, Throwable e) {
+ try {
+ opChain.cancel(e);
+ } finally {
+ _scheduler.deregister(opChain);
+ }
+ }
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
index c7b77f85e7..863e403c04 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
@@ -19,17 +19,25 @@
package org.apache.pinot.query.runtime.executor;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Queue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
-import javax.annotation.concurrent.NotThreadSafe;
+import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.runtime.operator.OpChain;
+import org.apache.pinot.query.runtime.operator.OpChainId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,168 +47,221 @@ import org.slf4j.LoggerFactory;
* but will only schedule them when there is work to be done. The availability
* of work is signaled using the {@link #onDataAvailable(MailboxIdentifier)}
* callback.
+ * <p>
+ * Design: There are 3 states for a OpChain:
+ *
+ * 1. Ready: This state means that an OpChain is ready to be run. If an OpChain is in this state, then it would be in
+ * the _ready queue. The _ready queue is polled on every call to {@link #next}.
+ * 2. Available/Yielded: This is when the OpChain was run and it returned a no-op block, indicating that it has no
+ * new data to process. The {@link OpChainId} for these OpChains are stored in the _available
+ * map, where the value of the map is the release-timeout.
+ * 3. Running: When the OpChain is returned by the {@link #next} method, it is considered to be in the Running state.
+ *
+ * The following state transitions are possible:
+ * 1. Ready ==> Running: This happens when the OpChain is returned by the {@link #next} method.
+ * 2. Running ==> Available/Yielded: This happens when a running OpChain returns a No-op block, following which a
+ * yield is called and there's no entry for the OpChain in _seenMail.
+ * 3. Available/Yielded ==> Ready: This can happen in two cases: (1) When yield is called but _seenMail has an
+ * entry for the corresponding OpChainId, which means there was some data received
+ * by MailboxReceiveOperator after the last poll. (2) When a sender has died or hasn't
+ * sent any data in the last _releaseTimeoutMs milliseconds.
+ *
+ * |--------------( #yield() )---------|
+ * | |
+ * \/ |
+ * [ START ] --> [ READY ] -----------( #next() )--> [ RUNNING ]
+ * /\ |
+ * | |
+ * ( #seenMail() or #periodic()) |
+ * | |
+ * [ AVAILABLE ] <------( #yield() )---------|
+ * |
+ * |
+ * [ EXIT ] <-----------------( #deregistered() )---------|
+ *
+ * The OpChain is considered "alive" from the time it is registered until it is de-registered. Any reference to the
+ * OpChain or its related metadata is kept only while the OpChain is alive. The {@link #onDataAvailable} callback
+ * can be called before an OpChain was ever registered. In that case, this scheduler will simply ignore the callback,
+ * since once the OpChain gets registered it will anyways be put into the _ready queue immediately. In case the
+ * OpChain never gets registered (e.g. if the broker couldn't dispatch it), as long as the sender cleans up all
+ * resources that it has acquired, there will be no leak, since the scheduler doesn't hold any references for
+ * non-alive OpChains.
+ * </p>
*/
-@NotThreadSafe
+@ThreadSafe
public class RoundRobinScheduler implements OpChainScheduler {
private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinScheduler.class);
- private static final long DEFAULT_RELEASE_TIMEOUT = TimeUnit.MINUTES.toMillis(1);
+ private static final String AVAILABLE_RELEASE_THREAD_NAME = "round-robin-scheduler-release-thread";
- private final long _releaseTimeout;
+ private final long _releaseTimeoutMs;
private final Supplier<Long> _ticker;
- // the _available queue contains operator chains that are available
- // to this scheduler but do not have any data available to schedule
- // while the _ready queue contains the operator chains that are ready
- // to be scheduled (have data, or are first-time scheduled)
- private final Queue<AvailableEntry> _available = new LinkedList<>();
- private final Queue<OpChain> _ready = new LinkedList<>();
-
- private boolean _isShutDown = false;
-
- // using a Set here is acceptable because calling hasNext() and
- // onDataAvailable() cannot occur concurrently - that means that
- // anytime we schedule a new operator based on the presence of
- // mail we can be certain that it will consume all of the mail
- // form that mailbox, even if there are multiple items in it. If,
- // during execution of that operator, more mail appears, then the
- // operator will be rescheduled immediately potentially resulting
- // in a false-positive schedule
- @VisibleForTesting
- final Set<MailboxIdentifier> _seenMail = new HashSet<>();
+ private final Map<OpChainId, OpChain> _aliveChains = new ConcurrentHashMap<>();
+ final Set<OpChainId> _seenMail = Sets.newConcurrentHashSet();
+ private final Map<OpChainId, Long> _available = new ConcurrentHashMap<>();
- public RoundRobinScheduler() {
- this(DEFAULT_RELEASE_TIMEOUT);
- }
+ private final BlockingQueue<OpChain> _ready = new LinkedBlockingQueue<>();
- public RoundRobinScheduler(long releaseTimeout) {
- this(releaseTimeout, System::currentTimeMillis);
+ private final Lock _lock = new ReentrantLock();
+ private final ScheduledExecutorService _availableOpChainReleaseService;
+
+ public RoundRobinScheduler(long releaseTimeoutMs) {
+ this(releaseTimeoutMs, System::currentTimeMillis);
}
- public RoundRobinScheduler(long releaseTimeoutMs, Supplier<Long> ticker) {
- _releaseTimeout = releaseTimeoutMs;
+ RoundRobinScheduler(long releaseTimeoutMs, Supplier<Long> ticker) {
+ Preconditions.checkArgument(releaseTimeoutMs > 0, "Release timeout for round-robin scheduler should be > 0ms");
+ _releaseTimeoutMs = releaseTimeoutMs;
_ticker = ticker;
+ _availableOpChainReleaseService = Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r);
+ t.setName(AVAILABLE_RELEASE_THREAD_NAME);
+ t.setDaemon(true);
+ return t;
+ });
+ _availableOpChainReleaseService.scheduleAtFixedRate(() -> {
+ List<OpChainId> timedOutWaiting = new ArrayList<>();
+ for (Map.Entry<OpChainId, Long> entry : _available.entrySet()) {
+ if (Thread.interrupted()) {
+ LOGGER.warn("Thread={} interrupted. Scheduler may be shutting down.", AVAILABLE_RELEASE_THREAD_NAME);
+ break;
+ }
+ if (_ticker.get() > entry.getValue()) {
+ timedOutWaiting.add(entry.getKey());
+ }
+ }
+ for (OpChainId opChainId : timedOutWaiting) {
+ _lock.lock();
+ try {
+ if (_available.containsKey(opChainId)) {
+ _available.remove(opChainId);
+ _ready.offer(_aliveChains.get(opChainId));
+ }
+ } finally {
+ _lock.unlock();
+ }
+ }
+ }, _releaseTimeoutMs, _releaseTimeoutMs, TimeUnit.MILLISECONDS);
}
@Override
- public void register(OpChain operatorChain, boolean isNew) {
- if (_isShutDown) {
- return;
- }
- // the first time an operator chain is scheduled, it should
- // immediately be considered ready in case it does not need
- // read from any mailbox (e.g. with a LiteralValueOperator)
- if (isNew) {
- _ready.add(operatorChain);
- } else {
- long releaseTs = _releaseTimeout < 0 ? Long.MAX_VALUE : _ticker.get() + _releaseTimeout;
- _available.add(new AvailableEntry(operatorChain, releaseTs));
+ public void register(OpChain operatorChain) {
+ Preconditions.checkState(!_aliveChains.containsKey(operatorChain.getId()),
+ String.format("Tried to re-register op-chain: %s", operatorChain.getId()));
+ _lock.lock();
+ try {
+ _aliveChains.put(operatorChain.getId(), operatorChain);
+ _ready.offer(operatorChain);
+ } finally {
+ _lock.unlock();
}
trace("registered " + operatorChain);
}
@Override
- public void onDataAvailable(MailboxIdentifier mailbox) {
- // it may be possible to receive this callback when there's no corresponding
- // operator chain registered to the mailbox - this can happen when either
- // (1) we get the callback before the first register is called or (2) we get
- // the callback while the operator chain is executing. to account for this,
- // we just store it in a set of seen mail and only check for it when hasNext
- // is called.
- //
- // note that scenario (2) may cause a false-positive schedule where an operator
- // chain gets scheduled for mail that it had already processed, in which case
- // the operator chain will simply do nothing and get put back onto the queue.
- // scenario (2) may additionally cause a memory leak - if onDataAvailable is
- // called with an EOS block _while_ the operator chain is executing, the chain
- // will consume the EOS block and computeReady() will never remove the mailbox
- // from the _seenMail set.
- //
- // TODO: fix the memory leak by adding a close(opChain) callback
- _seenMail.add(mailbox);
- trace("got mail for " + mailbox);
+ public void deregister(OpChain operatorChain) {
+ Preconditions.checkState(_aliveChains.containsKey(operatorChain.getId()),
+ "Tried to de-register an un-registered op-chain");
+ _lock.lock();
+ try {
+ OpChainId chainId = operatorChain.getId();
+ _aliveChains.remove(chainId);
+ // it could be that the onDataAvailable callback was called when the OpChain was executing, in which case there
+ // could be a dangling entry in _seenMail.
+ _seenMail.remove(chainId);
+ } finally {
+ _lock.unlock();
+ }
+ }
+
+ @Override
+ public void yield(OpChain operatorChain) {
+ long releaseTs = _ticker.get() + _releaseTimeoutMs;
+ _lock.lock();
+ try {
+ // It could be that this OpChain received data before it could be yielded completely. In that case, mark it ready
+ // to get it scheduled asap.
+ if (_seenMail.contains(operatorChain.getId())) {
+ _seenMail.remove(operatorChain.getId());
+ _ready.add(operatorChain);
+ return;
+ }
+ _available.put(operatorChain.getId(), releaseTs);
+ } finally {
+ _lock.unlock();
+ }
}
@Override
- public boolean hasNext() {
- if (!_ready.isEmpty()) {
- return true;
+ public void onDataAvailable(MailboxIdentifier mailbox) {
+ // TODO: Should we add an API in MailboxIdentifier to get the requestId?
+ OpChainId opChainId = new OpChainId(Long.parseLong(mailbox.getJobId().split("_")[0]),
+ mailbox.getToHost().virtualId(), mailbox.getReceiverStageId());
+ // If this chain isn't alive as per the scheduler, don't do anything. If the OpChain is registered after this, it
+ // will anyways be scheduled to run since new OpChains are run immediately.
+ if (!_aliveChains.containsKey(opChainId)) {
+ trace("got mail, but the OpChain is not registered so ignoring the event " + mailbox);
+ return;
}
- computeReady();
- return !_ready.isEmpty();
+ _lock.lock();
+ try {
+ if (!_aliveChains.containsKey(opChainId)) {
+ return;
+ }
+ if (_available.containsKey(opChainId)) {
+ _available.remove(opChainId);
+ _ready.offer(_aliveChains.get(opChainId));
+ } else {
+ // There are two cases here:
+ // 1. OpChain is in the _ready queue: the next time it gets polled, we'll remove the _seenMail entry.
+ // 2. OpChain is running: the next time yield is called for it, we'll check against _seenMail and put it back
+ // in the _ready queue again.
+ _seenMail.add(opChainId);
+ }
+ } finally {
+ _lock.unlock();
+ }
+ trace("got mail for " + mailbox);
}
@Override
- public OpChain next() {
- OpChain op = _ready.poll();
- trace("Polled " + op);
- return op;
+ public OpChain next(long time, TimeUnit timeUnit) throws InterruptedException {
+ return _ready.poll(time, timeUnit);
}
@Override
public int size() {
- return _ready.size() + _available.size();
+ return _aliveChains.size();
}
@Override
- public void shutDown() {
- if (_isShutDown) {
- return;
- }
- while (!_ready.isEmpty()) {
- _ready.poll().close();
- }
- while (!_available.isEmpty()) {
- _available.poll()._opChain.close();
- }
- _isShutDown = true;
+ public void shutdownNow() {
+ // TODO: Figure out shutdown flow in context of graceful shutdown.
+ _availableOpChainReleaseService.shutdownNow();
}
- private void computeReady() {
- Iterator<AvailableEntry> availableChains = _available.iterator();
-
- // the algorithm here iterates through all available chains and checks
- // to see whether or not any of the available chains have seen mail for
- // at least one of the mailboxes they receive from - if they do, then
- // we should make that chain available and remove any mail from the
- // mailboxes that it would consume from (after it is scheduled, all
- // mail available to it will have been consumed).
- while (availableChains.hasNext()) {
- AvailableEntry chain = availableChains.next();
- Sets.SetView<MailboxIdentifier> intersect = Sets.intersection(chain._opChain.getReceivingMailbox(), _seenMail);
-
- if (!intersect.isEmpty()) {
- // use an immutable copy because set views use the underlying sets
- // directly, which would cause a concurrent modification exception
- // when removing data from _seenMail
- _seenMail.removeAll(intersect.immutableCopy());
- _ready.add(chain._opChain);
- availableChains.remove();
- } else if (_ticker.get() > chain._releaseTs) {
- LOGGER.warn("({}) Scheduling operator chain reading from {} after timeout. Ready: {}, Available: {}, Mail: {}.",
- chain._opChain, chain._opChain.getReceivingMailbox(), _ready, _available, _seenMail);
- _ready.add(chain._opChain);
- availableChains.remove();
- }
- }
+ @VisibleForTesting
+ int readySize() {
+ return _ready.size();
}
- private void trace(String operation) {
- LOGGER.trace("({}) Ready: {}, Available: {}, Mail: {}", operation, _ready, _available, _seenMail);
+ @VisibleForTesting
+ int availableSize() {
+ return _available.size();
}
- private static class AvailableEntry {
-
- final OpChain _opChain;
- final long _releaseTs;
+ @VisibleForTesting
+ int seenMailSize() {
+ return _seenMail.size();
+ }
- private AvailableEntry(OpChain opChain, long releaseTs) {
- _opChain = opChain;
- _releaseTs = releaseTs;
- }
+ @VisibleForTesting
+ int aliveChainsSize() {
+ return _aliveChains.size();
+ }
- @Override
- public String toString() {
- return _opChain.toString();
- }
+ private void trace(String operation) {
+ LOGGER.trace("({}) Ready: {}, Available: {}, Mail: {}",
+ operation, _ready, _available, _seenMail);
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
index ddbe1939ff..6de4e92040 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
@@ -34,13 +34,14 @@ public class OpChain implements AutoCloseable {
private final MultiStageOperator _root;
private final Set<MailboxIdentifier> _receivingMailbox;
private final OpChainStats _stats;
- private final String _id;
+ private final OpChainId _id;
- public OpChain(MultiStageOperator root, List<MailboxIdentifier> receivingMailboxes, long requestId, int stageId) {
+ public OpChain(MultiStageOperator root, List<MailboxIdentifier> receivingMailboxes, int virtualServerId,
+ long requestId, int stageId) {
_root = root;
_receivingMailbox = new HashSet<>(receivingMailboxes);
- _id = String.format("%s_%s", requestId, stageId);
- _stats = new OpChainStats(_id);
+ _id = new OpChainId(requestId, virtualServerId, stageId);
+ _stats = new OpChainStats(_id.toString());
}
public Operator<TransferableBlock> getRoot() {
@@ -51,6 +52,10 @@ public class OpChain implements AutoCloseable {
return _receivingMailbox;
}
+ public OpChainId getId() {
+ return _id;
+ }
+
// TODO: Move OperatorStats here.
public OpChainStats getStats() {
return _stats;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainId.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainId.java
new file mode 100644
index 0000000000..355021e58f
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainId.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.util.Objects;
+
+
+public class OpChainId {
+ final long _requestId;
+ final int _virtualServerId;
+ final int _stageId;
+
+ public OpChainId(long requestId, int virtualServerId, int stageId) {
+ _requestId = requestId;
+ _virtualServerId = virtualServerId;
+ _stageId = stageId;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s_%s_%s", _requestId, _virtualServerId, _stageId);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ OpChainId opChainId = (OpChainId) o;
+ return _requestId == opChainId._requestId && _virtualServerId == opChainId._virtualServerId
+ && _stageId == opChainId._stageId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(_requestId, _virtualServerId, _stageId);
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
index a80233bee1..60dba3a0de 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
@@ -26,12 +26,14 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
+import javax.annotation.concurrent.NotThreadSafe;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
/**
* {@code OpChainStats} tracks execution statistics for {@link OpChain}s.
*/
+@NotThreadSafe
public class OpChainStats {
// use memoized supplier so that the timing doesn't start until the
@@ -71,14 +73,6 @@ public class OpChainStats {
}
}
- public void startExecutionTimer() {
- _exTimerStarted = true;
- _exTimer.get();
- if (!_executeStopwatch.isRunning()) {
- _executeStopwatch.start();
- }
- }
-
public Map<String, OperatorStats> getOperatorStatsMap() {
return _operatorStatsMap;
}
@@ -87,6 +81,14 @@ public class OpChainStats {
_operatorStatsMap = operatorStatsMap;
}
+ private void startExecutionTimer() {
+ _exTimerStarted = true;
+ _exTimer.get();
+ if (!_executeStopwatch.isRunning()) {
+ _executeStopwatch.start();
+ }
+ }
+
@Override
public String toString() {
return String.format("(%s) Queued Count: %s, Executing Time: %sms, Queued Time: %sms", _id, _queuedCount.get(),
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 77de6197ca..fb2de69d89 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -58,7 +58,8 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
public static OpChain build(StageNode node, PlanRequestContext context) {
MultiStageOperator root = node.visit(INSTANCE, context);
- return new OpChain(root, context.getReceivingMailboxes(), context.getRequestId(), context.getStageId());
+ return new OpChain(root, context.getReceivingMailboxes(), context._server.virtualId(), context.getRequestId(),
+ context.getStageId());
}
@Override
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
index d521f0f926..bc80c96df6 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryConfig.java
@@ -70,7 +70,7 @@ public class QueryConfig {
* an operator chain despite any amount of time elapsed.
*/
public static final String KEY_OF_SCHEDULER_RELEASE_TIMEOUT_MS = "pinot.query.scheduler.release.timeout.ms";
- public static final long DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS = -1;
+ public static final long DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS = 10_000;
private QueryConfig() {
// do not instantiate.
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index fa6ba5e7c4..36df3de6ff 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -65,7 +65,7 @@ public class QueryServerEnclosure {
private final InstanceDataManager _instanceDataManager;
private final HelixManager _helixManager;
- private QueryRunner _queryRunner;
+ private final QueryRunner _queryRunner;
public QueryServerEnclosure(MockInstanceDataManagerFactory factory) {
try {
@@ -112,7 +112,6 @@ public class QueryServerEnclosure {
public void start()
throws Exception {
PinotConfiguration configuration = new PinotConfiguration(_runnerConfig);
- _queryRunner = new QueryRunner();
_queryRunner.init(configuration, _instanceDataManager, _helixManager, mockServiceMetrics());
_queryRunner.start();
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index c38a210e95..9f0d9e8963 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.MultiStageOperator;
import org.apache.pinot.query.runtime.operator.OpChain;
-import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
@@ -44,11 +43,7 @@ public class OpChainSchedulerServiceTest {
private ExecutorService _executor;
private AutoCloseable _mocks;
- @Mock
private MultiStageOperator _operatorA;
- @Mock
- private MultiStageOperator _operatorB;
- @Mock
private OpChainScheduler _scheduler;
@BeforeClass
@@ -64,9 +59,10 @@ public class OpChainSchedulerServiceTest {
@BeforeMethod
public void beforeMethod() {
+ _operatorA = Mockito.mock(MultiStageOperator.class);
+ _scheduler = Mockito.mock(OpChainScheduler.class);
clearInvocations(_scheduler);
clearInvocations(_operatorA);
- clearInvocations(_operatorB);
}
private void initExecutor(int numThreads) {
@@ -74,16 +70,15 @@ public class OpChainSchedulerServiceTest {
}
private OpChain getChain(MultiStageOperator operator) {
- return new OpChain(operator, ImmutableList.of(), 123, 1);
+ return new OpChain(operator, ImmutableList.of(), 1, 123, 1);
}
@Test
public void shouldScheduleSingleOpChainRegisteredAfterStart()
throws InterruptedException {
- // Given:
initExecutor(1);
- Mockito.when(_scheduler.hasNext()).thenReturn(true);
- Mockito.when(_scheduler.next()).thenReturn(getChain(_operatorA));
+ OpChain opChain = getChain(_operatorA);
+ Mockito.when(_scheduler.next(Mockito.anyLong(), Mockito.any())).thenReturn(opChain).thenReturn(null);
OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor);
CountDownLatch latch = new CountDownLatch(1);
@@ -92,11 +87,9 @@ public class OpChainSchedulerServiceTest {
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
});
- // When:
scheduler.startAsync().awaitRunning();
- scheduler.register(new OpChain(_operatorA, ImmutableList.of(), 123, 1));
+ scheduler.register(opChain);
- // Then:
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be called in less than 10 seconds");
scheduler.stopAsync().awaitTerminated();
}
@@ -104,10 +97,9 @@ public class OpChainSchedulerServiceTest {
@Test
public void shouldScheduleSingleOpChainRegisteredBeforeStart()
throws InterruptedException {
- // Given:
initExecutor(1);
- Mockito.when(_scheduler.hasNext()).thenReturn(true);
- Mockito.when(_scheduler.next()).thenReturn(getChain(_operatorA));
+ OpChain opChain = getChain(_operatorA);
+ Mockito.when(_scheduler.next(Mockito.anyLong(), Mockito.any())).thenReturn(opChain).thenReturn(null);
OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor);
CountDownLatch latch = new CountDownLatch(1);
@@ -116,152 +108,100 @@ public class OpChainSchedulerServiceTest {
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
});
- // When:
- scheduler.register(new OpChain(_operatorA, ImmutableList.of(), 123, 1));
+ scheduler.register(opChain);
scheduler.startAsync().awaitRunning();
- // Then:
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be called in less than 10 seconds");
scheduler.stopAsync().awaitTerminated();
}
@Test
- public void shouldReRegisterOpChainOnNoOpBlock()
+ public void shouldYieldOpChainOnNoOpBlock()
throws InterruptedException {
- // Given:
initExecutor(1);
- Mockito.when(_scheduler.hasNext()).thenReturn(true);
- Mockito.when(_scheduler.next()).thenReturn(getChain(_operatorA));
+ OpChain opChain = getChain(_operatorA);
+ Mockito.when(_scheduler.next(Mockito.anyLong(), Mockito.any())).thenReturn(opChain).thenReturn(null);
OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor);
CountDownLatch latch = new CountDownLatch(1);
- Mockito.when(_operatorA.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock())
- .thenAnswer(inv -> {
- latch.countDown();
- return TransferableBlockUtils.getEndOfStreamTransferableBlock();
- });
-
- // When:
- scheduler.startAsync().awaitRunning();
- scheduler.register(new OpChain(_operatorA, ImmutableList.of(), 123, 1));
-
- // Then:
- Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be called in less than 10 seconds");
- Mockito.verify(_scheduler, Mockito.times(2)).register(Mockito.any(OpChain.class), Mockito.any(boolean.class));
- scheduler.stopAsync().awaitTerminated();
- }
-
- @Test
- public void shouldYieldOpChainsWhenNoWorkCanBeDone()
- throws InterruptedException {
- // Given:
- initExecutor(1);
- Mockito.when(_scheduler.hasNext()).thenReturn(true);
- Mockito.when(_scheduler.next()).thenReturn(getChain(_operatorA)).thenReturn(getChain(_operatorB))
- .thenReturn(getChain(_operatorA));
- OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor);
-
- AtomicBoolean opAReturnedNoOp = new AtomicBoolean(false);
- AtomicBoolean hasOpBRan = new AtomicBoolean(false);
-
- CountDownLatch latch = new CountDownLatch(1);
- Mockito.when(_operatorA.nextBlock()).thenAnswer(inv -> {
- if (hasOpBRan.get()) {
- latch.countDown();
- return TransferableBlockUtils.getEndOfStreamTransferableBlock();
- } else {
- opAReturnedNoOp.set(true);
- return TransferableBlockUtils.getNoOpTransferableBlock();
- }
- });
-
- Mockito.when(_operatorB.nextBlock()).thenAnswer(inv -> {
- hasOpBRan.set(true);
- return TransferableBlockUtils.getEndOfStreamTransferableBlock();
- });
+ Mockito.when(_operatorA.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
+ Mockito.doAnswer(inv -> {
+ latch.countDown();
+ return null;
+ }).when(_scheduler).yield(Mockito.any());
- // When:
scheduler.startAsync().awaitRunning();
- scheduler.register(new OpChain(_operatorA, ImmutableList.of(), 123, 1));
- scheduler.register(new OpChain(_operatorB, ImmutableList.of(), 123, 1));
+ scheduler.register(opChain);
- // Then:
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be called in less than 10 seconds");
- Assert.assertTrue(opAReturnedNoOp.get(), "expected opA to be scheduled first");
scheduler.stopAsync().awaitTerminated();
}
@Test
- public void shouldNotCallSchedulerNextWhenHasNextReturnsFalse()
+ public void shouldScheduleOpChainEvenIfNoOpChainsInAWhile()
throws InterruptedException {
- // Given:
initExecutor(1);
- CountDownLatch latch = new CountDownLatch(1);
- Mockito.when(_scheduler.hasNext()).thenAnswer(inv -> {
+ OpChain opChain = getChain(_operatorA);
+ CountDownLatch latch = new CountDownLatch(3);
+ AtomicBoolean returnedOpChain = new AtomicBoolean(false);
+ Mockito.when(_scheduler.next(Mockito.anyLong(), Mockito.any())).thenAnswer(inv -> {
latch.countDown();
- return false;
+ if (latch.getCount() == 0 && returnedOpChain.compareAndSet(false, true)) {
+ return opChain;
+ }
+ return null;
});
OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor);
- // When:
scheduler.startAsync().awaitRunning();
- // Then:
- Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected hasNext to be called");
+ Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected opChain to be scheduled");
scheduler.stopAsync().awaitTerminated();
-
- Mockito.verify(_scheduler, Mockito.never()).next();
}
@Test
- public void shouldReevaluateHasNextWhenOnDataAvailableIsCalled()
+ public void shouldCallCloseOnOperators()
throws InterruptedException {
- // Given:
initExecutor(1);
- CountDownLatch firstHasNext = new CountDownLatch(1);
- CountDownLatch secondHasNext = new CountDownLatch(1);
- Mockito.when(_scheduler.hasNext()).thenAnswer(inv -> {
- firstHasNext.countDown();
- return false;
- }).then(inv -> {
- secondHasNext.countDown();
- return false;
- });
-
+ OpChain opChain = getChain(_operatorA);
+ Mockito.when(_scheduler.next(Mockito.anyLong(), Mockito.any())).thenReturn(opChain).thenReturn(null);
OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor);
- // When:
+ CountDownLatch latch = new CountDownLatch(1);
+ Mockito.when(_operatorA.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ Mockito.doAnswer(inv -> {
+ latch.countDown();
+ return null;
+ }).when(_operatorA).close();
+
scheduler.startAsync().awaitRunning();
- Assert.assertTrue(firstHasNext.await(10, TimeUnit.SECONDS), "expected hasNext to be called");
- scheduler.onDataAvailable(null);
+ scheduler.register(opChain);
- // Then:
- Assert.assertTrue(secondHasNext.await(10, TimeUnit.SECONDS), "expected hasNext to be called again");
+ Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be called in less than 10 seconds");
scheduler.stopAsync().awaitTerminated();
}
@Test
- public void shouldCallCloseOnOperators()
+ public void shouldCallCancelOnOpChainsThatThrow()
throws InterruptedException {
- // Given:
initExecutor(1);
- Mockito.when(_scheduler.hasNext()).thenReturn(true).thenReturn(false);
- Mockito.when(_scheduler.next()).thenReturn(getChain(_operatorA));
+ OpChain opChain = getChain(_operatorA);
+ Mockito.when(_scheduler.next(Mockito.anyLong(), Mockito.any())).thenReturn(opChain).thenReturn(null);
OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor);
CountDownLatch latch = new CountDownLatch(1);
- Mockito.when(_operatorA.nextBlock()).thenAnswer(inv -> {
+ Mockito.when(_operatorA.nextBlock()).thenThrow(new RuntimeException("foo"));
+ Mockito.doAnswer(inv -> {
latch.countDown();
- return TransferableBlockUtils.getEndOfStreamTransferableBlock();
- });
+ return null;
+ }).when(_operatorA).cancel(Mockito.any());
- // When:
scheduler.startAsync().awaitRunning();
- scheduler.register(new OpChain(_operatorA, ImmutableList.of(), 123, 1));
+ scheduler.register(opChain);
- // Then:
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be called in less than 10 seconds");
scheduler.stopAsync().awaitTerminated();
- Mockito.verify(_operatorA, Mockito.times(1)).close();
+ Mockito.verify(_operatorA, Mockito.times(1)).cancel(Mockito.any());
+ Mockito.verify(_scheduler, Mockito.times(1)).deregister(Mockito.any());
}
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
index 6bf18a73b3..88974b1cb3 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
@@ -19,7 +19,7 @@
package org.apache.pinot.query.runtime.executor;
import com.google.common.collect.ImmutableList;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.TimeUnit;
import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.runtime.operator.MultiStageOperator;
@@ -28,6 +28,7 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -35,17 +36,27 @@ import org.testng.annotations.Test;
public class RoundRobinSchedulerTest {
private static final int DEFAULT_SENDER_STAGE_ID = 0;
private static final int DEFAULT_RECEIVER_STAGE_ID = 1;
-
- private static final MailboxIdentifier MAILBOX_1 = new JsonMailboxIdentifier("1_1", "0@foo:2", "0@bar:3",
- DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
- private static final MailboxIdentifier MAILBOX_2 = new JsonMailboxIdentifier("1_2", "0@foo:2", "0@bar:3",
- DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
+ private static final int DEFAULT_VIRTUAL_SERVER_ID = 1;
+ private static final int DEFAULT_POLL_TIMEOUT_MS = 0;
+ private static final int DEFAULT_RELEASE_TIMEOUT_MS = 10;
+ private static final long DEFAULT_REQUEST_ID = 123;
+ private static final String DEFAULT_SENDER_SERIALIZED = "0@foo:2";
+ private static final String DEFAULT_JOB_ID = String.format("%s_%s", DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID);
+
+ private static final MailboxIdentifier MAILBOX_1 = new JsonMailboxIdentifier(DEFAULT_JOB_ID,
+ DEFAULT_SENDER_SERIALIZED, "1@bar:1", DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
+ private static final MailboxIdentifier MAILBOX_2 = new JsonMailboxIdentifier(DEFAULT_JOB_ID,
+ DEFAULT_SENDER_SERIALIZED, "2@bar:1", DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
+ private static final MailboxIdentifier MAILBOX_3 = new JsonMailboxIdentifier(DEFAULT_JOB_ID,
+ DEFAULT_SENDER_SERIALIZED, "3@bar:1", DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
@Mock
private MultiStageOperator _operator;
private AutoCloseable _mocks;
+ private RoundRobinScheduler _scheduler;
+
@BeforeClass
public void beforeClass() {
_mocks = MockitoAnnotations.openMocks(this);
@@ -57,109 +68,108 @@ public class RoundRobinSchedulerTest {
_mocks.close();
}
- @Test
- public void shouldScheduleNewOpChainsImmediately() {
- // Given:
- OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1), 123, DEFAULT_RECEIVER_STAGE_ID);
- RoundRobinScheduler scheduler = new RoundRobinScheduler();
-
- // When:
- scheduler.register(chain, true);
-
- // Then:
- Assert.assertTrue(scheduler.hasNext());
- Assert.assertEquals(scheduler.next(), chain);
- }
-
- @Test
- public void shouldNotScheduleRescheduledOpChainsImmediately() {
- // Given:
- OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1), 123, DEFAULT_RECEIVER_STAGE_ID);
- RoundRobinScheduler scheduler = new RoundRobinScheduler();
-
- // When:
- scheduler.register(chain, false);
-
- // Then:
- Assert.assertFalse(scheduler.hasNext());
- }
-
- @Test
- public void shouldScheduleRescheduledOpChainOnDataAvailable() {
- // Given:
- OpChain chain1 = new OpChain(_operator, ImmutableList.of(MAILBOX_1), 123, DEFAULT_RECEIVER_STAGE_ID);
- OpChain chain2 = new OpChain(_operator, ImmutableList.of(MAILBOX_2), 123, DEFAULT_RECEIVER_STAGE_ID);
- RoundRobinScheduler scheduler = new RoundRobinScheduler();
-
- // When:
- scheduler.register(chain1, false);
- scheduler.register(chain2, false);
- scheduler.onDataAvailable(MAILBOX_1);
-
- // Then:
- Assert.assertTrue(scheduler.hasNext());
- Assert.assertEquals(scheduler.next(), chain1);
- Assert.assertFalse(scheduler.hasNext());
+ @AfterTest
+ public void afterTest() {
+ _scheduler.shutdownNow();
}
@Test
- public void shouldScheduleRescheduledOpChainAfterTimeout() {
- // Given:
- OpChain chain1 = new OpChain(_operator, ImmutableList.of(MAILBOX_1), 123, DEFAULT_RECEIVER_STAGE_ID);
- AtomicLong ticker = new AtomicLong(0);
- RoundRobinScheduler scheduler = new RoundRobinScheduler(100, ticker::get);
-
- // When:
- scheduler.register(chain1, false);
- ticker.set(101);
-
- // Then:
- Assert.assertTrue(scheduler.hasNext());
- Assert.assertEquals(scheduler.next(), chain1);
+ public void testSchedulerHappyPath()
+ throws InterruptedException {
+ OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1), DEFAULT_VIRTUAL_SERVER_ID,
+ 123, DEFAULT_RECEIVER_STAGE_ID);
+ _scheduler = new RoundRobinScheduler(DEFAULT_RELEASE_TIMEOUT_MS);
+ _scheduler.register(chain);
+
+ // OpChain is scheduled immediately
+ Assert.assertEquals(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS), chain);
+ // No op-chains ready, so scheduler returns null
+ Assert.assertNull(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS));
+ // When Op-Chain is done executing, yield is called
+ _scheduler.yield(chain);
+ // When data is received, callback is called
+ _scheduler.onDataAvailable(MAILBOX_1);
+ // next should return the OpChain immediately after the callback
+ Assert.assertEquals(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS), chain);
+ // Say the OpChain is done, then a de-register will be called
+ _scheduler.deregister(chain);
+
+ // There should be no entries left in the scheduler
+ Assert.assertEquals(0,
+ _scheduler.aliveChainsSize() + _scheduler.readySize() + _scheduler.seenMailSize() + _scheduler.availableSize());
}
@Test
- public void shouldScheduleRescheduledOpChainOnDataAvailableBeforeRegister() {
- // Given:
- OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1), 123, DEFAULT_RECEIVER_STAGE_ID);
- RoundRobinScheduler scheduler = new RoundRobinScheduler();
-
- // When:
- scheduler.onDataAvailable(MAILBOX_1);
- scheduler.register(chain, false);
-
- // Then:
- Assert.assertTrue(scheduler.hasNext());
- Assert.assertEquals(scheduler.next(), chain);
- }
-
- @Test
- public void shouldNotScheduleRescheduledOpChainOnDataAvailableForDifferentMailbox() {
- // Given:
- OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1), 123, DEFAULT_RECEIVER_STAGE_ID);
- RoundRobinScheduler scheduler = new RoundRobinScheduler();
-
- // When:
- scheduler.register(chain, false);
- scheduler.onDataAvailable(MAILBOX_2);
-
- // Then:
- Assert.assertFalse(scheduler.hasNext());
+ public void testSchedulerWhenSenderDies()
+ throws InterruptedException {
+ OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1), DEFAULT_VIRTUAL_SERVER_ID,
+ DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID);
+ _scheduler = new RoundRobinScheduler(DEFAULT_RELEASE_TIMEOUT_MS);
+ _scheduler.register(chain);
+
+ // OpChain runs immediately after registration
+ Assert.assertEquals(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS), chain);
+ // No more OpChains to run
+ Assert.assertNull(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS));
+ // When op-chain returns a no-op block, suspend it
+ _scheduler.yield(chain);
+
+ // Unless a callback is called, the chain would remain suspended. However, the scheduler will automatically
+ // promote available OpChains to ready every releaseMs.
+ Assert.assertEquals(_scheduler.next(DEFAULT_RELEASE_TIMEOUT_MS + 100, TimeUnit.MILLISECONDS), chain);
+
+ // Assuming the OpChain has timed out, the OpChain will be de-registered
+ _scheduler.deregister(chain);
+
+ // There should be no entries left in the scheduler
+ Assert.assertEquals(0,
+ _scheduler.aliveChainsSize() + _scheduler.readySize() + _scheduler.seenMailSize() + _scheduler.availableSize());
}
@Test
- public void shouldScheduleRescheduledOpChainOnDataAvailableForAnyMailbox() {
- // Given:
- OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1, MAILBOX_2), 123, DEFAULT_RECEIVER_STAGE_ID);
- RoundRobinScheduler scheduler = new RoundRobinScheduler();
-
- // When:
- scheduler.register(chain, false);
- scheduler.onDataAvailable(MAILBOX_2);
-
- // Then:
- Assert.assertTrue(scheduler.hasNext());
- Assert.assertEquals(scheduler.next(), chain);
- Assert.assertEquals(scheduler._seenMail.size(), 0);
+ public void testSchedulerWhenParallelismGtOne()
+ throws InterruptedException {
+ // When parallelism is > 1, multiple OpChains with the same requestId and stageId would be registered in the same
+ // scheduler. Data received on a given mailbox should wake up exactly 1 OpChain corresponding to the virtual
+ // server-id determined by the Mailbox.
+ OpChain chain1 = new OpChain(_operator, ImmutableList.of(MAILBOX_1), MAILBOX_1.getToHost().virtualId(),
+ DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID);
+ OpChain chain2 = new OpChain(_operator, ImmutableList.of(MAILBOX_2), MAILBOX_2.getToHost().virtualId(),
+ DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID);
+ OpChain chain3 = new OpChain(_operator, ImmutableList.of(MAILBOX_3), MAILBOX_3.getToHost().virtualId(),
+ DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID);
+ // Register 3 OpChains. Keep release timeout high to avoid unintended OpChain wake-ups.
+ _scheduler = new RoundRobinScheduler(10_000);
+ _scheduler.register(chain1);
+ _scheduler.register(chain2);
+ _scheduler.register(chain3);
+
+ // OpChains are returned in the order in which they were registered
+ Assert.assertEquals(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS), chain1);
+ Assert.assertEquals(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS), chain2);
+ Assert.assertEquals(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS), chain3);
+ // No op-chains ready, so scheduler returns null
+ Assert.assertNull(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS));
+ // When Op-Chains are done executing, yield is called in any order
+ _scheduler.yield(chain1);
+ _scheduler.yield(chain3);
+ _scheduler.yield(chain2);
+ // Data may be received in arbitrary order
+ _scheduler.onDataAvailable(MAILBOX_2);
+ _scheduler.onDataAvailable(MAILBOX_3);
+ _scheduler.onDataAvailable(MAILBOX_1);
+ // Subsequent polls would be in the order the callback was processed. A callback here is said to be "processed"
+ // if it has successfully returned.
+ Assert.assertEquals(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS), chain2);
+ Assert.assertEquals(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS), chain3);
+ Assert.assertEquals(_scheduler.next(DEFAULT_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS), chain1);
+ // De-register may be called in any order again
+ _scheduler.deregister(chain3);
+ _scheduler.deregister(chain2);
+ _scheduler.deregister(chain1);
+
+ // There should be no entries left in the scheduler after everything is done
+ Assert.assertEquals(0,
+ _scheduler.aliveChainsSize() + _scheduler.readySize() + _scheduler.seenMailSize() + _scheduler.availableSize());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org