You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/05/09 04:23:37 UTC
[pinot] branch master updated: [multistage][cleanup] make leaf stage also registered with SchedulerService (#10711)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7ab0af00a1 [multistage][cleanup] make leaf stage also registered with SchedulerService (#10711)
7ab0af00a1 is described below
commit 7ab0af00a1179b011a8ab9211f28eeb318ea336c
Author: Rong Rong <ro...@apache.org>
AuthorDate: Mon May 8 21:23:27 2023 -0700
[multistage][cleanup] make leaf stage also registered with SchedulerService (#10711)
* Use cached threadpool to avoid sending starvation
* Separate leaf stage to a different scheduler
---------
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../apache/pinot/query/runtime/QueryRunner.java | 117 +++++++--------------
.../runtime/executor/LeafSchedulerService.java | 102 ++++++++++++++++++
.../LeafStageTransferableBlockOperator.java | 58 ++++++----
.../apache/pinot/query/service/QueryServer.java | 6 +-
.../apache/pinot/query/QueryServerEnclosure.java | 3 +-
...Test.java => YieldingSchedulerServiceTest.java} | 2 +-
.../LeafStageTransferableBlockOperatorTest.java | 82 ++++++++++++---
.../pinot/query/runtime/operator/OpChainTest.java | 7 +-
8 files changed, 254 insertions(+), 123 deletions(-)
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index ced674d269..8876a950e3 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -35,7 +36,6 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.NamedThreadFactory;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
-import org.apache.pinot.core.operator.combine.BaseCombineOperator;
import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
@@ -45,7 +45,7 @@ import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.routing.PlanFragmentMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
-import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.executor.LeafSchedulerService;
import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
import org.apache.pinot.query.runtime.executor.RoundRobinScheduler;
import org.apache.pinot.query.runtime.operator.LeafStageTransferableBlockOperator;
@@ -85,34 +85,13 @@ public class QueryRunner {
private String _hostname;
private int _port;
private VirtualServerAddress _rootServer;
- // Query worker threads are used for (1) running intermediate stage operators (2) running segment level operators
- /**
- * Query worker threads are used for:
- * <ol>
- * <li>
- * Running intermediate stage operators (v2 engine operators).
- * </li>
- * <li>
- * Running per-segment operators submitted in {@link BaseCombineOperator}.
- * </li>
- * </ol>
- */
+
private ExecutorService _queryWorkerIntermExecutorService;
private ExecutorService _queryWorkerLeafExecutorService;
- /**
- * Query runner threads are used for:
- * <ol>
- * <li>
- * Merging results in BaseCombineOperator for leaf stages. Results are provided by per-segment operators run in
- * worker threads
- * </li>
- * <li>
- * Building the OperatorChain and submitting to the scheduler for non-leaf stages (intermediate stages).
- * </li>
- * </ol>
- */
private ExecutorService _queryRunnerExecutorService;
- private OpChainSchedulerService _scheduler;
+
+ private OpChainSchedulerService _intermScheduler;
+ private LeafSchedulerService _leafScheduler;
/**
* Initializes the query executor.
@@ -130,15 +109,16 @@ public class QueryRunner {
try {
long releaseMs = config.getProperty(QueryConfig.KEY_OF_SCHEDULER_RELEASE_TIMEOUT_MS,
QueryConfig.DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS);
- _queryWorkerIntermExecutorService = Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
+ _queryWorkerIntermExecutorService = Executors.newCachedThreadPool(
new NamedThreadFactory("query_intermediate_worker_on_" + _port + "_port"));
_queryWorkerLeafExecutorService = Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
new NamedThreadFactory("query_leaf_worker_on_" + _port + "_port"));
- _queryRunnerExecutorService = Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_RUNNER_THREADS,
+ _queryRunnerExecutorService = Executors.newCachedThreadPool(
new NamedThreadFactory("query_runner_on_" + _port + "_port"));
- _scheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs),
+ _intermScheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs),
getQueryWorkerIntermExecutorService());
- _mailboxService = new MailboxService(_hostname, _port, config, _scheduler::onDataAvailable);
+ _leafScheduler = new LeafSchedulerService(getQueryRunnerExecutorService());
+ _mailboxService = new MailboxService(_hostname, _port, config, _intermScheduler::onDataAvailable);
_serverExecutor = new ServerQueryExecutorV1Impl();
_serverExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX), instanceDataManager, serverMetrics);
} catch (Exception e) {
@@ -151,14 +131,14 @@ public class QueryRunner {
_helixPropertyStore = _helixManager.getHelixPropertyStore();
_mailboxService.start();
_serverExecutor.start();
- _scheduler.startAsync().awaitRunning(30, TimeUnit.SECONDS);
+ _intermScheduler.startAsync().awaitRunning(30, TimeUnit.SECONDS);
}
public void shutDown()
throws TimeoutException {
_serverExecutor.shutDown();
_mailboxService.shutdown();
- _scheduler.stopAsync().awaitTerminated(30, TimeUnit.SECONDS);
+ _intermScheduler.stopAsync().awaitTerminated(30, TimeUnit.SECONDS);
}
public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) {
@@ -168,18 +148,20 @@ public class QueryRunner {
Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
long deadlineMs = System.currentTimeMillis() + timeoutMs;
if (isLeafStage(distributedStagePlan)) {
- runLeafStage(distributedStagePlan, requestMetadataMap, timeoutMs, deadlineMs, requestId);
+ OpChain rootOperator = compileLeafStage(distributedStagePlan, requestMetadataMap, timeoutMs, deadlineMs,
+ requestId);
+ _leafScheduler.register(rootOperator);
} else {
PlanNode stageRoot = distributedStagePlan.getStageRoot();
OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot,
new PlanRequestContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs,
distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), isTraceEnabled));
- _scheduler.register(rootOperator);
+ _intermScheduler.register(rootOperator);
}
}
public void cancel(long requestId) {
- _scheduler.cancel(requestId);
+ _intermScheduler.cancel(requestId);
}
@VisibleForTesting
@@ -196,7 +178,7 @@ public class QueryRunner {
return _queryRunnerExecutorService;
}
- private void runLeafStage(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap,
+ private OpChain compileLeafStage(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap,
long timeoutMs, long deadlineMs, long requestId) {
boolean isTraceEnabled =
Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
@@ -208,9 +190,19 @@ public class QueryRunner {
serverQueryRequests.add(new ServerQueryRequest(requestContext.getInstanceRequest(),
new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), System.currentTimeMillis()));
}
- getQueryRunnerExecutorService().submit(() -> {
- processServerQuery(requestId, timeoutMs, deadlineMs, isTraceEnabled, distributedStagePlan, serverQueryRequests);
- });
+ MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot();
+ OpChainExecutionContext opChainExecutionContext =
+ new OpChainExecutionContext(_mailboxService, requestId, sendNode.getPlanFragmentId(),
+ distributedStagePlan.getServer(), timeoutMs, deadlineMs, distributedStagePlan.getStageMetadata(),
+ isTraceEnabled);
+ MultiStageOperator leafStageOperator =
+ new LeafStageTransferableBlockOperator(opChainExecutionContext, this::processServerQueryRequest,
+ serverQueryRequests, sendNode.getDataSchema());
+ MailboxSendOperator mailboxSendOperator =
+ new MailboxSendOperator(opChainExecutionContext, leafStageOperator, sendNode.getExchangeType(),
+ sendNode.getPartitionKeySelector(), sendNode.getCollationKeys(), sendNode.getCollationDirections(),
+ sendNode.isSortOnSender(), sendNode.getReceiverStageId());
+ return new OpChain(opChainExecutionContext, mailboxSendOperator, Collections.emptyList());
}
private static List<ServerPlanRequestContext> constructServerQueryRequests(DistributedStagePlan distributedStagePlan,
@@ -249,46 +241,17 @@ public class QueryRunner {
return requests;
}
- private void processServerQuery(long requestId, long timeoutMs, long deadlineMs, boolean isTraceEnabled,
- DistributedStagePlan distributedStagePlan, List<ServerQueryRequest> serverQueryRequests) {
- MailboxSendOperator mailboxSendOperator = null;
+ private InstanceResponseBlock processServerQueryRequest(ServerQueryRequest request) {
+ InstanceResponseBlock result;
try {
- // send the data table via mailbox in one-off fashion (e.g. no block-level split, one data table/partition key)
- List<InstanceResponseBlock> serverQueryResults = new ArrayList<>(serverQueryRequests.size());
- for (ServerQueryRequest request : serverQueryRequests) {
- InstanceResponseBlock result;
- try {
- result = _serverExecutor.execute(request, getQueryWorkerLeafExecutorService());
- } catch (Exception e) {
- InstanceResponseBlock errorResponse = new InstanceResponseBlock();
- errorResponse.getExceptions().put(QueryException.QUERY_EXECUTION_ERROR_CODE,
- e.getMessage() + QueryException.getTruncatedStackTrace(e));
- result = errorResponse;
- }
- serverQueryResults.add(result);
- }
- MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot();
- OpChainExecutionContext opChainExecutionContext =
- new OpChainExecutionContext(_mailboxService, requestId, sendNode.getPlanFragmentId(),
- distributedStagePlan.getServer(), timeoutMs, deadlineMs, distributedStagePlan.getStageMetadata(),
- isTraceEnabled);
- MultiStageOperator leafStageOperator =
- new LeafStageTransferableBlockOperator(opChainExecutionContext, serverQueryResults, sendNode.getDataSchema());
- mailboxSendOperator =
- new MailboxSendOperator(opChainExecutionContext, leafStageOperator, sendNode.getExchangeType(),
- sendNode.getPartitionKeySelector(), sendNode.getCollationKeys(), sendNode.getCollationDirections(),
- sendNode.isSortOnSender(), sendNode.getReceiverStageId());
- int blockCounter = 0;
- while (!TransferableBlockUtils.isEndOfStream(mailboxSendOperator.nextBlock())) {
- LOGGER.debug("Acquired transferable block: {}", blockCounter++);
- }
- mailboxSendOperator.close();
+ result = _serverExecutor.execute(request, getQueryWorkerLeafExecutorService());
} catch (Exception e) {
- LOGGER.error(String.format("Error running leafStage for requestId=%s", requestId), e);
- if (mailboxSendOperator != null) {
- mailboxSendOperator.cancel(e);
- }
+ InstanceResponseBlock errorResponse = new InstanceResponseBlock();
+ errorResponse.getExceptions().put(QueryException.QUERY_EXECUTION_ERROR_CODE,
+ e.getMessage() + QueryException.getTruncatedStackTrace(e));
+ result = errorResponse;
}
+ return result;
}
private boolean isLeafStage(DistributedStagePlan distributedStagePlan) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/LeafSchedulerService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/LeafSchedulerService.java
new file mode 100644
index 0000000000..faa4bf4945
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/LeafSchedulerService.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.OpChain;
+import org.apache.pinot.query.runtime.operator.OpChainId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LeafSchedulerService {
+ private static final Logger LOGGER = LoggerFactory.getLogger(LeafSchedulerService.class);
+
+ private final ExecutorService _executorService;
+ private final ConcurrentHashMap<OpChainId, Future<?>> _submittedOpChainMap;
+
+ public LeafSchedulerService(ExecutorService executorService) {
+ _executorService = executorService;
+ _submittedOpChainMap = new ConcurrentHashMap<>();
+ }
+
+ public void register(OpChain operatorChain) {
+ Future<?> scheduledFuture = _executorService.submit(new TraceRunnable() {
+ @Override
+ public void runJob() {
+ boolean isFinished = false;
+ boolean returnedErrorBlock = false;
+ Throwable thrown = null;
+ try {
+ LOGGER.trace("({}): Executing", operatorChain);
+ operatorChain.getStats().executing();
+ TransferableBlock result = operatorChain.getRoot().nextBlock();
+ while (!result.isEndOfStreamBlock()) {
+ result = operatorChain.getRoot().nextBlock();
+ }
+ isFinished = true;
+ if (result.isErrorBlock()) {
+ returnedErrorBlock = true;
+ LOGGER.error("({}): Completed erroneously {} {}", operatorChain, operatorChain.getStats(),
+ result.getDataBlock().getExceptions());
+ } else {
+ LOGGER.debug("({}): Completed {}", operatorChain, operatorChain.getStats());
+ }
+ } catch (Exception e) {
+ LOGGER.error("({}): Failed to execute operator chain! {}", operatorChain, operatorChain.getStats(), e);
+ thrown = e;
+ } finally {
+ if (returnedErrorBlock || thrown != null) {
+ cancelOpChain(operatorChain, thrown);
+ } else if (isFinished) {
+ closeOpChain(operatorChain);
+ }
+ }
+ }
+ });
+ _submittedOpChainMap.put(operatorChain.getId(), scheduledFuture);
+ }
+
+ public void cancel(long requestId) {
+ // simple cancellation. for leaf stage this cannot be a dangling opchain b/c they will eventually be cleared up
+ // via query timeout.
+ List<OpChainId> opChainIdsToCancel = _submittedOpChainMap.keySet()
+ .stream().filter(opChainId -> opChainId.getRequestId() == requestId).collect(Collectors.toList());
+ for (OpChainId opChainId : opChainIdsToCancel) {
+ Future<?> future = _submittedOpChainMap.get(opChainId);
+ if (future != null) {
+ future.cancel(true);
+ }
+ }
+ }
+
+ private void closeOpChain(OpChain opChain) {
+ opChain.close();
+ }
+
+ private void cancelOpChain(OpChain opChain, Throwable t) {
+ opChain.cancel(t);
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index 5656baebe8..64e9117a92 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -24,8 +24,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
+import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.DataBlockUtils;
@@ -38,6 +40,7 @@ import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
@@ -62,22 +65,19 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator {
private static final String EXPLAIN_NAME = "LEAF_STAGE_TRANSFER_OPERATOR";
private static final Logger LOGGER = LoggerFactory.getLogger(LeafStageTransferableBlockOperator.class);
- private final InstanceResponseBlock _errorBlock;
- private final List<InstanceResponseBlock> _baseResultBlock;
+ private final LinkedList<ServerQueryRequest> _serverQueryRequestQueue;
private final DataSchema _desiredDataSchema;
- private int _currentIndex;
+ private final Function<ServerQueryRequest, InstanceResponseBlock> _processCall;
+
+ private InstanceResponseBlock _errorBlock;
public LeafStageTransferableBlockOperator(OpChainExecutionContext context,
- List<InstanceResponseBlock> baseResultBlock, DataSchema dataSchema) {
+ Function<ServerQueryRequest, InstanceResponseBlock> processCall,
+ List<ServerQueryRequest> serverQueryRequestList, DataSchema dataSchema) {
super(context);
- _baseResultBlock = baseResultBlock;
+ _processCall = processCall;
+ _serverQueryRequestQueue = new LinkedList<>(serverQueryRequestList);
_desiredDataSchema = dataSchema;
- _errorBlock = baseResultBlock.stream().filter(e -> !e.getExceptions().isEmpty()).findFirst().orElse(null);
- _currentIndex = 0;
- for (InstanceResponseBlock instanceResponseBlock : baseResultBlock) {
- OperatorStats operatorStats = _opChainStats.getOperatorStats(context, getOperatorId());
- operatorStats.recordExecutionStats(instanceResponseBlock.getResponseMetadata());
- }
}
@Override
@@ -93,27 +93,39 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator {
@Override
protected TransferableBlock getNextBlock() {
- if (_currentIndex < 0) {
+ if (_errorBlock != null) {
throw new RuntimeException("Leaf transfer terminated. next block should no longer be called.");
}
- if (_errorBlock != null) {
- _currentIndex = -1;
+ // runLeafStage
+ InstanceResponseBlock responseBlock = getNextBlockFromLeafStage();
+ if (responseBlock == null) {
+ // finished getting next block from leaf stage. returning EOS
+ return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
+ } else if (!responseBlock.getExceptions().isEmpty()) {
+ // get error from leaf stage, return ERROR
+ _errorBlock = responseBlock;
return new TransferableBlock(DataBlockUtils.getErrorDataBlock(_errorBlock.getExceptions()));
} else {
- if (_currentIndex < _baseResultBlock.size()) {
- InstanceResponseBlock responseBlock = _baseResultBlock.get(_currentIndex++);
- if (responseBlock.getResultsBlock() != null && responseBlock.getResultsBlock().getNumRows() > 0) {
- return composeTransferableBlock(responseBlock, _desiredDataSchema);
- } else {
- return new TransferableBlock(Collections.emptyList(), _desiredDataSchema, DataBlock.Type.ROW);
- }
+ // return normal block.
+ OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, getOperatorId());
+ operatorStats.recordExecutionStats(responseBlock.getResponseMetadata());
+ if (responseBlock.getResultsBlock() != null && responseBlock.getResultsBlock().getNumRows() > 0) {
+ return composeTransferableBlock(responseBlock, _desiredDataSchema);
} else {
- _currentIndex = -1;
- return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
+ return new TransferableBlock(Collections.emptyList(), _desiredDataSchema, DataBlock.Type.ROW);
}
}
}
+ private @Nullable InstanceResponseBlock getNextBlockFromLeafStage() {
+ if (!_serverQueryRequestQueue.isEmpty()) {
+ ServerQueryRequest request = _serverQueryRequestQueue.pop();
+ return _processCall.apply(request);
+ } else {
+ return null;
+ }
+ }
+
/**
* Leaf stage operators should always collect stats for the tables used in queries
* Otherwise the Broker response will just contain zeros for every stat value
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
index 8e7061bff8..4cd92ab32e 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
@@ -22,9 +22,7 @@ import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
-import java.io.IOException;
import java.util.Map;
-import java.util.concurrent.TimeoutException;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
import org.apache.pinot.common.proto.Worker;
@@ -64,7 +62,7 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
}
_queryRunner.start();
_server.start();
- } catch (IOException | TimeoutException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
@@ -77,7 +75,7 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
_server.shutdown();
_server.awaitTermination();
}
- } catch (InterruptedException | TimeoutException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index 36df3de6ff..75401ca07f 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -20,7 +20,6 @@ package org.apache.pinot.query;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.TimeoutException;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -119,7 +118,7 @@ public class QueryServerEnclosure {
public void shutDown() {
try {
_queryRunner.shutDown();
- } catch (TimeoutException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/YieldingSchedulerServiceTest.java
similarity index 99%
rename from pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
rename to pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/YieldingSchedulerServiceTest.java
index fbbab9e4d9..4f7f8e5565 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/YieldingSchedulerServiceTest.java
@@ -41,7 +41,7 @@ import org.testng.annotations.Test;
import static org.mockito.Mockito.clearInvocations;
-public class OpChainSchedulerServiceTest {
+public class YieldingSchedulerServiceTest {
private ExecutorService _executor;
private AutoCloseable _mocks;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
index 9d04d07df9..dc2d2dcb75 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java
@@ -18,10 +18,13 @@
*/
package org.apache.pinot.query.runtime.operator;
+import com.google.common.annotations.VisibleForTesting;
import java.sql.Timestamp;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.function.Function;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.data.table.Record;
@@ -32,6 +35,7 @@ import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
import org.apache.pinot.core.query.distinct.DistinctTable;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.query.routing.VirtualServerAddress;
@@ -75,7 +79,8 @@ public class LeafStageTransferableBlockOperatorTest {
List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock(
new SelectionResultsBlock(schema, Arrays.asList(new Object[]{"foo", 1}, new Object[]{"", 2})), queryContext));
LeafStageTransferableBlockOperator operator =
- new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema);
+ new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(),
+ getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema);
// When:
TransferableBlock resultBlock = operator.nextBlock();
@@ -101,7 +106,9 @@ public class LeafStageTransferableBlockOperatorTest {
new SelectionResultsBlock(resultSchema,
Arrays.asList(new Object[]{1, 1660000000000L}, new Object[]{0, 1600000000000L})), queryContext));
LeafStageTransferableBlockOperator operator =
- new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, desiredSchema);
+ new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(),
+ getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()),
+ desiredSchema);
// When:
TransferableBlock resultBlock = operator.nextBlock();
@@ -123,7 +130,8 @@ public class LeafStageTransferableBlockOperatorTest {
new SelectionResultsBlock(schema,
Arrays.asList(new Object[]{1, 1660000000000L}, new Object[]{0, 1600000000000L})), queryContext));
LeafStageTransferableBlockOperator operator =
- new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema);
+ new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(),
+ getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema);
// When:
TransferableBlock resultBlock = operator.nextBlock();
@@ -148,7 +156,8 @@ public class LeafStageTransferableBlockOperatorTest {
queryContext),
new InstanceResponseBlock(new SelectionResultsBlock(schema, Collections.emptyList()), queryContext));
LeafStageTransferableBlockOperator operator =
- new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema);
+ new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(),
+ getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema);
// When:
TransferableBlock resultBlock1 = operator.nextBlock();
@@ -178,12 +187,17 @@ public class LeafStageTransferableBlockOperatorTest {
errorBlock,
new InstanceResponseBlock(new SelectionResultsBlock(schema, Collections.emptyList()), queryContext));
LeafStageTransferableBlockOperator operator =
- new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema);
+ new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(),
+ getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema);
// When:
TransferableBlock resultBlock = operator.nextBlock();
-
// Then:
+ Assert.assertEquals(resultBlock.getContainer().get(0), new Object[]{"foo", 1});
+ Assert.assertEquals(resultBlock.getContainer().get(1), new Object[]{"", 2});
+
+ // When:
+ resultBlock = operator.nextBlock();
Assert.assertTrue(resultBlock.isErrorBlock());
}
@@ -199,7 +213,8 @@ public class LeafStageTransferableBlockOperatorTest {
new DistinctResultsBlock(mock(DistinctAggregationFunction.class), new DistinctTable(schema,
Arrays.asList(new Record(new Object[]{1, "foo"}), new Record(new Object[]{2, "bar"})))), queryContext));
LeafStageTransferableBlockOperator operator =
- new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema);
+ new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(),
+ getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema);
// When:
TransferableBlock resultBlock = operator.nextBlock();
@@ -220,7 +235,8 @@ public class LeafStageTransferableBlockOperatorTest {
new DistinctResultsBlock(mock(DistinctAggregationFunction.class), new DistinctTable(schema,
Arrays.asList(new Record(new Object[]{"foo", 1}), new Record(new Object[]{"bar", 2})))), queryContext));
LeafStageTransferableBlockOperator operator =
- new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema);
+ new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(),
+ getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema);
// When:
TransferableBlock resultBlock = operator.nextBlock();
@@ -244,7 +260,8 @@ public class LeafStageTransferableBlockOperatorTest {
List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(
new InstanceResponseBlock(new GroupByResultsBlock(schema, Collections.emptyList()), queryContext));
LeafStageTransferableBlockOperator operator =
- new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema);
+ new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(),
+ getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema);
// When:
TransferableBlock resultBlock = operator.nextBlock();
@@ -267,7 +284,8 @@ public class LeafStageTransferableBlockOperatorTest {
List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(
new InstanceResponseBlock(new GroupByResultsBlock(schema, Collections.emptyList()), queryContext));
LeafStageTransferableBlockOperator operator =
- new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema);
+ new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(),
+ getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema);
// When:
TransferableBlock resultBlock = operator.nextBlock();
@@ -286,7 +304,8 @@ public class LeafStageTransferableBlockOperatorTest {
List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock(
new AggregationResultsBlock(queryContext.getAggregationFunctions(), Collections.emptyList()), queryContext));
LeafStageTransferableBlockOperator operator =
- new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema);
+ new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(),
+ getStaticBlockProcessor(resultsBlockList), getStaticServerQueryRequests(resultsBlockList.size()), schema);
// When:
TransferableBlock resultBlock = operator.nextBlock();
@@ -308,7 +327,9 @@ public class LeafStageTransferableBlockOperatorTest {
List<InstanceResponseBlock> responseBlockList = Collections.singletonList(
new InstanceResponseBlock(new SelectionResultsBlock(resultSchema, Collections.emptyList()), queryContext));
LeafStageTransferableBlockOperator operator =
- new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), responseBlockList, desiredSchema);
+ new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(),
+ getStaticBlockProcessor(responseBlockList), getStaticServerQueryRequests(responseBlockList.size()),
+ desiredSchema);
TransferableBlock resultBlock = operator.nextBlock();
// Then:
@@ -331,7 +352,9 @@ public class LeafStageTransferableBlockOperatorTest {
new DistinctResultsBlock(mock(DistinctAggregationFunction.class),
new DistinctTable(resultSchema, Collections.emptyList())), queryContext));
LeafStageTransferableBlockOperator operator =
- new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), responseBlockList, desiredSchema);
+ new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(),
+ getStaticBlockProcessor(responseBlockList), getStaticServerQueryRequests(responseBlockList.size()),
+ desiredSchema);
TransferableBlock resultBlock = operator.nextBlock();
// Then:
@@ -353,11 +376,42 @@ public class LeafStageTransferableBlockOperatorTest {
List<InstanceResponseBlock> responseBlockList = Collections.singletonList(
new InstanceResponseBlock(new GroupByResultsBlock(resultSchema, Collections.emptyList()), queryContext));
LeafStageTransferableBlockOperator operator =
- new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), responseBlockList, desiredSchema);
+ new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(),
+ getStaticBlockProcessor(responseBlockList), getStaticServerQueryRequests(responseBlockList.size()),
+ desiredSchema);
TransferableBlock resultBlock = operator.nextBlock();
// Then:
Assert.assertEquals(resultBlock.getContainer().size(), 0);
Assert.assertEquals(resultBlock.getDataSchema(), desiredSchema);
}
+
+ @VisibleForTesting
+ static Function<ServerQueryRequest, InstanceResponseBlock> getStaticBlockProcessor(
+ List<InstanceResponseBlock> resultBlockList) {
+ return new StaticBlockProcessor(resultBlockList)::process;
+ }
+
+ static List<ServerQueryRequest> getStaticServerQueryRequests(int count) {
+ List<ServerQueryRequest> staticMockRequests = new ArrayList<>();
+ while (count > 0) {
+ staticMockRequests.add(mock(ServerQueryRequest.class));
+ count--;
+ }
+ return staticMockRequests;
+ }
+
+ private static class StaticBlockProcessor {
+ private final List<InstanceResponseBlock> _resultBlockList;
+ private int _currentIdx;
+
+ StaticBlockProcessor(List<InstanceResponseBlock> resultBlockList) {
+ _resultBlockList = resultBlockList;
+ _currentIdx = 0;
+ }
+
+ public InstanceResponseBlock process(ServerQueryRequest request) {
+ return _resultBlockList.get(_currentIdx++);
+ }
+ }
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
index 3ec9fc71de..585e884e65 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
@@ -34,6 +34,7 @@ import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.query.mailbox.MailboxService;
@@ -57,6 +58,7 @@ import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@@ -286,8 +288,9 @@ public class OpChainTest {
QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT intCol FROM tbl");
List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock(
new SelectionResultsBlock(upStreamSchema, Arrays.asList(new Object[]{1}, new Object[]{2})), queryContext));
- LeafStageTransferableBlockOperator leafOp =
- new LeafStageTransferableBlockOperator(context, resultsBlockList, upStreamSchema);
+ LeafStageTransferableBlockOperator leafOp = new LeafStageTransferableBlockOperator(context,
+ LeafStageTransferableBlockOperatorTest.getStaticBlockProcessor(resultsBlockList),
+ Collections.singletonList(mock(ServerQueryRequest.class)), upStreamSchema);
//Transform operator
RexExpression.InputRef ref0 = new RexExpression.InputRef(0);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org