You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/05/13 16:53:29 UTC
[pinot] branch master updated: [multistage] follow up leaf scheduler (#10760)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0912544cce [multistage] follow up leaf scheduler (#10760)
0912544cce is described below
commit 0912544cce9307462c5931abdc61131ec2d50bfa
Author: Rong Rong <ro...@apache.org>
AuthorDate: Sat May 13 09:53:21 2023 -0700
[multistage] follow up leaf scheduler (#10760)
this is a follow-up on comments on #10711
* with the execution model change, mailbox send no longer need to do a while loop
* with the changes to leaf-stage operator it is no longer needed for a separate scheduler.
* also fix scheduler yielding naming in test
---------
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../apache/pinot/query/runtime/QueryRunner.java | 7 +-
.../runtime/executor/LeafSchedulerService.java | 102 ---------------------
.../runtime/executor/OpChainSchedulerService.java | 10 +-
.../runtime/executor/RoundRobinScheduler.java | 2 +-
.../runtime/operator/MailboxSendOperator.java | 29 +++---
...eTest.java => OpChainSchedulerServiceTest.java} | 2 +-
.../runtime/operator/MailboxSendOperatorTest.java | 8 +-
7 files changed, 31 insertions(+), 129 deletions(-)
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 0ff209b904..92bf32638d 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -45,7 +45,6 @@ import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
-import org.apache.pinot.query.runtime.executor.LeafSchedulerService;
import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
import org.apache.pinot.query.runtime.executor.RoundRobinScheduler;
import org.apache.pinot.query.runtime.operator.LeafStageTransferableBlockOperator;
@@ -91,7 +90,7 @@ public class QueryRunner {
private ExecutorService _queryRunnerExecutorService;
private OpChainSchedulerService _intermScheduler;
- private LeafSchedulerService _leafScheduler;
+ private OpChainSchedulerService _leafScheduler;
/**
* Initializes the query executor.
@@ -117,7 +116,7 @@ public class QueryRunner {
new NamedThreadFactory("query_runner_on_" + _port + "_port"));
_intermScheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs),
getQueryWorkerIntermExecutorService());
- _leafScheduler = new LeafSchedulerService(getQueryRunnerExecutorService());
+ _leafScheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs), getQueryRunnerExecutorService());
_mailboxService = new MailboxService(_hostname, _port, config, _intermScheduler::onDataAvailable);
_serverExecutor = new ServerQueryExecutorV1Impl();
_serverExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX), instanceDataManager, serverMetrics);
@@ -131,6 +130,7 @@ public class QueryRunner {
_helixPropertyStore = _helixManager.getHelixPropertyStore();
_mailboxService.start();
_serverExecutor.start();
+ _leafScheduler.startAsync().awaitRunning(30, TimeUnit.SECONDS);
_intermScheduler.startAsync().awaitRunning(30, TimeUnit.SECONDS);
}
@@ -138,6 +138,7 @@ public class QueryRunner {
throws TimeoutException {
_serverExecutor.shutDown();
_mailboxService.shutdown();
+ _leafScheduler.stopAsync().awaitTerminated(30, TimeUnit.SECONDS);
_intermScheduler.stopAsync().awaitTerminated(30, TimeUnit.SECONDS);
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/LeafSchedulerService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/LeafSchedulerService.java
deleted file mode 100644
index faa4bf4945..0000000000
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/LeafSchedulerService.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.runtime.executor;
-
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
-import org.apache.pinot.core.util.trace.TraceRunnable;
-import org.apache.pinot.query.runtime.blocks.TransferableBlock;
-import org.apache.pinot.query.runtime.operator.OpChain;
-import org.apache.pinot.query.runtime.operator.OpChainId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class LeafSchedulerService {
- private static final Logger LOGGER = LoggerFactory.getLogger(LeafSchedulerService.class);
-
- private final ExecutorService _executorService;
- private final ConcurrentHashMap<OpChainId, Future<?>> _submittedOpChainMap;
-
- public LeafSchedulerService(ExecutorService executorService) {
- _executorService = executorService;
- _submittedOpChainMap = new ConcurrentHashMap<>();
- }
-
- public void register(OpChain operatorChain) {
- Future<?> scheduledFuture = _executorService.submit(new TraceRunnable() {
- @Override
- public void runJob() {
- boolean isFinished = false;
- boolean returnedErrorBlock = false;
- Throwable thrown = null;
- try {
- LOGGER.trace("({}): Executing", operatorChain);
- operatorChain.getStats().executing();
- TransferableBlock result = operatorChain.getRoot().nextBlock();
- while (!result.isEndOfStreamBlock()) {
- result = operatorChain.getRoot().nextBlock();
- }
- isFinished = true;
- if (result.isErrorBlock()) {
- returnedErrorBlock = true;
- LOGGER.error("({}): Completed erroneously {} {}", operatorChain, operatorChain.getStats(),
- result.getDataBlock().getExceptions());
- } else {
- LOGGER.debug("({}): Completed {}", operatorChain, operatorChain.getStats());
- }
- } catch (Exception e) {
- LOGGER.error("({}): Failed to execute operator chain! {}", operatorChain, operatorChain.getStats(), e);
- thrown = e;
- } finally {
- if (returnedErrorBlock || thrown != null) {
- cancelOpChain(operatorChain, thrown);
- } else if (isFinished) {
- closeOpChain(operatorChain);
- }
- }
- }
- });
- _submittedOpChainMap.put(operatorChain.getId(), scheduledFuture);
- }
-
- public void cancel(long requestId) {
- // simple cancellation. for leaf stage this cannot be a dangling opchain b/c they will eventually be cleared up
- // via query timeout.
- List<OpChainId> opChainIdsToCancel = _submittedOpChainMap.keySet()
- .stream().filter(opChainId -> opChainId.getRequestId() == requestId).collect(Collectors.toList());
- for (OpChainId opChainId : opChainIdsToCancel) {
- Future<?> future = _submittedOpChainMap.get(opChainId);
- if (future != null) {
- future.cancel(true);
- }
- }
- }
-
- private void closeOpChain(OpChain opChain) {
- opChain.close();
- }
-
- private void cancelOpChain(OpChain opChain, Throwable t) {
- opChain.cancel(t);
- }
-}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
index a886269ab9..b4e55796b7 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -48,20 +48,16 @@ public class OpChainSchedulerService extends AbstractExecutionThreadService {
* Default cancel signal retention, this should be set to several times larger than
* {@link org.apache.pinot.query.service.QueryConfig#DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS}.
*/
- private static final long DEFAULT_SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS = 60_000L;
+ private static final long SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS = 60_000L;
private final OpChainScheduler _scheduler;
private final ExecutorService _workerPool;
- private final Cache<Long, Long> _cancelledRequests;
+ private final Cache<Long, Long> _cancelledRequests = CacheBuilder.newBuilder()
+ .expireAfterWrite(SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS, TimeUnit.MILLISECONDS).build();
public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService workerPool) {
- this(scheduler, workerPool, DEFAULT_SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS);
- }
-
- public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService workerPool, long cancelRetentionMs) {
_scheduler = scheduler;
_workerPool = workerPool;
- _cancelledRequests = CacheBuilder.newBuilder().expireAfterWrite(cancelRetentionMs, TimeUnit.MILLISECONDS).build();
}
@Override
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
index a1934d884a..b143cee77e 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java
@@ -94,7 +94,7 @@ public class RoundRobinScheduler implements OpChainScheduler {
private final Supplier<Long> _ticker;
private final Map<OpChainId, OpChain> _aliveChains = new ConcurrentHashMap<>();
- final Set<OpChainId> _seenMail = Sets.newConcurrentHashSet();
+ private final Set<OpChainId> _seenMail = Sets.newConcurrentHashSet();
private final Map<OpChainId, Long> _available = new ConcurrentHashMap<>();
private final BlockingQueue<OpChain> _ready = new LinkedBlockingQueue<>();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 5fba67f735..4741d0a151 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -109,7 +109,8 @@ public class MailboxSendOperator extends MultiStageOperator {
sendingMailboxes.add(mailboxService.getSendingMailbox(receiverMailboxMetadatas.getVirtualAddress(i).hostname(),
receiverMailboxMetadatas.getVirtualAddress(i).port(), sendingMailboxIds.get(i), deadlineMs));
}
- return BlockExchange.getExchange(sendingMailboxes, exchangeType, keySelector, TransferableBlockUtils::splitBlock);
+ return BlockExchange.getExchange(sendingMailboxes, exchangeType, keySelector,
+ TransferableBlockUtils::splitBlock);
}
@Override
@@ -128,21 +129,21 @@ public class MailboxSendOperator extends MultiStageOperator {
TransferableBlock transferableBlock;
try {
transferableBlock = _sourceOperator.nextBlock();
- while (!transferableBlock.isNoOpBlock()) {
- if (transferableBlock.isEndOfStreamBlock()) {
- if (transferableBlock.isSuccessfulEndOfStreamBlock()) {
- //Stats need to be populated here because the block is being sent to the mailbox
- // and the receiving opChain will not be able to access the stats from the previous opChain
- TransferableBlock eosBlockWithStats = TransferableBlockUtils.getEndOfStreamTransferableBlock(
- OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
- _exchange.send(eosBlockWithStats);
- } else {
- _exchange.send(transferableBlock);
- }
- return transferableBlock;
+ if (transferableBlock.isNoOpBlock()) {
+ return transferableBlock;
+ } else if (transferableBlock.isEndOfStreamBlock()) {
+ if (transferableBlock.isSuccessfulEndOfStreamBlock()) {
+ // Stats need to be populated here because the block is being sent to the mailbox
+ // and the receiving opChain will not be able to access the stats from the previous opChain
+ TransferableBlock eosBlockWithStats = TransferableBlockUtils.getEndOfStreamTransferableBlock(
+ OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
+ _exchange.send(eosBlockWithStats);
+ } else {
+ _exchange.send(transferableBlock);
}
+ } else { // normal blocks
+ // check whether we should continue depending on exchange queue condition.
_exchange.send(transferableBlock);
- transferableBlock = _sourceOperator.nextBlock();
}
} catch (Exception e) {
transferableBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/YieldingSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
similarity index 99%
rename from pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/YieldingSchedulerServiceTest.java
rename to pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index 4f7f8e5565..fbbab9e4d9 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/YieldingSchedulerServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -41,7 +41,7 @@ import org.testng.annotations.Test;
import static org.mockito.Mockito.clearInvocations;
-public class YieldingSchedulerServiceTest {
+public class OpChainSchedulerServiceTest {
private ExecutorService _executor;
private AutoCloseable _mocks;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 5080ad8454..1d13e5f097 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -151,7 +151,13 @@ public class MailboxSendOperatorTest {
TransferableBlock block = mailboxSendOperator.nextBlock();
// Then:
- assertSame(block, eosBlock, "expected EOS block to propagate");
+ assertSame(block, dataBlock, "expected data block to propagate first");
+
+ // When:
+ block = mailboxSendOperator.nextBlock();
+
+ // Then:
+ assertSame(block, eosBlock, "expected EOS block to propagate next");
ArgumentCaptor<TransferableBlock> captor = ArgumentCaptor.forClass(TransferableBlock.class);
verify(_exchange, times(2)).send(captor.capture());
List<TransferableBlock> blocks = captor.getAllValues();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org