You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2023/04/11 04:38:51 UTC
[pinot] branch master updated: Do not record operator stats when tracing is enabled (#10447)
This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fe7a72b9d2 Do not record operator stats when tracing is enabled (#10447)
fe7a72b9d2 is described below
commit fe7a72b9d27d8764670778474595ebe18d101e16
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Tue Apr 11 10:08:45 2023 +0530
Do not record operator stats when tracing is enabled (#10447)
* Add tracing parameter to opChainContext
* Do not record stats when tracing is disabled
* Do not collect stats at end of leafstage
* Do not fill stage stats when tracing is disabled
* Fix tests
* Record stageWallTime even when tracing is disabled
* Add javadocs for operators
* Add more operators in the chain
* Bug fix: End time pointing to serialization time and not operator end time
* Change test to chain actual operators instead of mock
* Bug fix: Record stage block and row stats even when tracing is disabled
* Fix tests after rebasing with SortedReceivedOperator PR
* Table names should always be populated in the stats
* Fix tests
---------
Co-authored-by: Kartik Khare <kh...@Kartiks-MacBook-Pro.local>
Co-authored-by: Kartik Khare <kh...@kartiks-macbook-pro.tail8a064.ts.net>
---
.../MultiStageBrokerRequestHandler.java | 4 +-
.../query/reduce/ExecutionStatsAggregator.java | 15 +-
.../apache/pinot/query/runtime/QueryRunner.java | 9 +-
.../LeafStageTransferableBlockOperator.java | 9 +
.../runtime/operator/MailboxSendOperator.java | 9 +
.../query/runtime/operator/MultiStageOperator.java | 19 +-
.../pinot/query/runtime/operator/OpChainStats.java | 12 +-
.../query/runtime/operator/OperatorStats.java | 15 +-
.../runtime/plan/OpChainExecutionContext.java | 11 +-
.../query/runtime/plan/PlanRequestContext.java | 9 +-
.../runtime/plan/ServerRequestPlanVisitor.java | 3 +-
.../plan/server/ServerPlanRequestContext.java | 4 +-
.../query/service/dispatch/QueryDispatcher.java | 22 +-
.../pinot/query/runtime/QueryRunnerTest.java | 3 +-
.../pinot/query/runtime/QueryRunnerTestBase.java | 16 +-
.../executor/OpChainSchedulerServiceTest.java | 2 +-
.../runtime/executor/RoundRobinSchedulerTest.java | 2 +-
.../operator/MailboxReceiveOperatorTest.java | 32 +--
.../runtime/operator/MailboxSendOperatorTest.java | 7 +-
.../pinot/query/runtime/operator/OpChainTest.java | 257 ++++++++++++++++++++-
.../query/runtime/operator/OperatorTestUtil.java | 10 +-
.../operator/SortedMailboxReceiveOperatorTest.java | 38 +--
.../runtime/queries/ResourceBasedQueriesTest.java | 5 +-
.../service/dispatch/QueryDispatcherTest.java | 4 +-
24 files changed, 425 insertions(+), 92 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index ebb7b7dc3e..9e74dcf400 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -209,14 +209,14 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
ResultTable queryResults;
Map<Integer, ExecutionStatsAggregator> stageIdStatsMap = new HashMap<>();
- for (Integer stageId: queryPlan.getStageMetadataMap().keySet()) {
+ for (Integer stageId : queryPlan.getStageMetadataMap().keySet()) {
stageIdStatsMap.put(stageId, new ExecutionStatsAggregator(traceEnabled));
}
long executionStartTimeNs = System.nanoTime();
try {
queryResults = _queryDispatcher.submitAndReduce(requestId, queryPlan, _mailboxService, queryTimeoutMs,
- sqlNodeAndOptions.getOptions(), stageIdStatsMap);
+ sqlNodeAndOptions.getOptions(), stageIdStatsMap, traceEnabled);
} catch (Exception e) {
LOGGER.info("query execution failed", e);
return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
index c8ef48af4a..bc8612c551 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
@@ -20,7 +20,6 @@ package org.apache.pinot.core.query.reduce;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -363,19 +362,19 @@ public class ExecutionStatsAggregator {
public void setStageLevelStats(@Nullable String rawTableName, BrokerResponseStats brokerResponseStats,
@Nullable BrokerMetrics brokerMetrics) {
- setStats(rawTableName, brokerResponseStats, brokerMetrics);
+ if (_enableTrace) {
+ setStats(rawTableName, brokerResponseStats, brokerMetrics);
+ brokerResponseStats.setOperatorStats(_operatorStats);
+ }
brokerResponseStats.setNumBlocks(_numBlocks);
brokerResponseStats.setNumRows(_numRows);
brokerResponseStats.setStageExecutionTimeMs(_stageExecutionTimeMs);
- brokerResponseStats.setStageExecWallTimeMs(_stageExecEndTimeMs - _stageExecStartTimeMs);
brokerResponseStats.setStageExecutionUnit(_stageExecutionUnit);
- if (_enableTrace) {
- brokerResponseStats.setOperatorStats(_operatorStats);
- } else {
- brokerResponseStats.setOperatorStats(Collections.emptyMap());
- }
brokerResponseStats.setTableNames(new ArrayList<>(_tableNames));
+ if (_stageExecStartTimeMs >= 0 && _stageExecEndTimeMs >= 0) {
+ brokerResponseStats.setStageExecWallTimeMs(_stageExecEndTimeMs - _stageExecStartTimeMs);
+ }
}
private void withNotNullLongMetadata(Map<String, String> metadata, DataTable.MetadataKey key, LongConsumer consumer) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 097b05b455..a1cfafd8ee 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -164,6 +164,8 @@ public class QueryRunner {
public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) {
long requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
long timeoutMs = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
+ boolean isTraceEnabled =
+ Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
long deadlineMs = System.currentTimeMillis() + timeoutMs;
if (isLeafStage(distributedStagePlan)) {
runLeafStage(distributedStagePlan, requestMetadataMap, deadlineMs, requestId);
@@ -171,7 +173,8 @@ public class QueryRunner {
StageNode stageRoot = distributedStagePlan.getStageRoot();
OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot,
new PlanRequestContext(_mailboxService, requestId, stageRoot.getStageId(), timeoutMs, deadlineMs,
- new VirtualServerAddress(distributedStagePlan.getServer()), distributedStagePlan.getMetadataMap()));
+ new VirtualServerAddress(distributedStagePlan.getServer()), distributedStagePlan.getMetadataMap(),
+ isTraceEnabled));
_scheduler.register(rootOperator);
}
}
@@ -195,6 +198,8 @@ public class QueryRunner {
// server executor.
MailboxSendOperator mailboxSendOperator = null;
try {
+ boolean isTraceEnabled =
+ Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
long leafStageStartMillis = System.currentTimeMillis();
List<ServerPlanRequestContext> serverQueryRequests =
constructServerQueryRequests(distributedStagePlan, requestMetadataMap, _helixPropertyStore, _mailboxService,
@@ -213,7 +218,7 @@ public class QueryRunner {
MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot();
OpChainExecutionContext opChainExecutionContext =
new OpChainExecutionContext(_mailboxService, requestId, sendNode.getStageId(), _rootServer, deadlineMs,
- deadlineMs, distributedStagePlan.getMetadataMap());
+ deadlineMs, distributedStagePlan.getMetadataMap(), isTraceEnabled);
MultiStageOperator leafStageOperator =
new LeafStageTransferableBlockOperator(opChainExecutionContext, serverQueryResults, sendNode.getDataSchema());
mailboxSendOperator = new MailboxSendOperator(opChainExecutionContext, leafStageOperator,
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index a6ed81a3d8..5656baebe8 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -114,6 +114,15 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator {
}
}
+ /**
+ * Leaf stage operators should always collect stats for the tables used in queries
+ * Otherwise the Broker response will just contain zeros for every stat value
+ */
+ @Override
+ protected boolean shouldCollectStats() {
+ return true;
+ }
+
/**
* this is data transfer block compose method is here to ensure that V1 results match what the expected projection
* schema in the calcite logical operator.
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index ad0c24abb7..bb807cb22c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -178,6 +178,15 @@ public class MailboxSendOperator extends MultiStageOperator {
return transferableBlock;
}
+ /**
+ * This method is overridden to return true because this operator is last in the chain and needs to collect
+ * execution time stats
+ */
+ @Override
+ protected boolean shouldCollectStats() {
+ return true;
+ }
+
@Override
public void close() {
super.close();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index 881c499c0f..6b9f630eb5 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -49,11 +49,16 @@ public abstract class MultiStageOperator implements Operator<TransferableBlock>,
throw new EarlyTerminationException("Interrupted while processing next block");
}
try (InvocationScope ignored = Tracing.getTracer().createScope(getClass())) {
- OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, _operatorId);
- operatorStats.startTimer();
- TransferableBlock nextBlock = getNextBlock();
- operatorStats.recordRow(1, nextBlock.getNumRows());
- operatorStats.endTimer(nextBlock);
+ TransferableBlock nextBlock;
+ if (shouldCollectStats()) {
+ OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, _operatorId);
+ operatorStats.startTimer();
+ nextBlock = getNextBlock();
+ operatorStats.recordRow(1, nextBlock.getNumRows());
+ operatorStats.endTimer(nextBlock);
+ } else {
+ nextBlock = getNextBlock();
+ }
return nextBlock;
}
}
@@ -65,6 +70,10 @@ public abstract class MultiStageOperator implements Operator<TransferableBlock>,
// Make it protected because we should always call nextBlock()
protected abstract TransferableBlock getNextBlock();
+ protected boolean shouldCollectStats() {
+ return _context.isTraceEnabled();
+ }
+
@Override
public List<MultiStageOperator> getChildOperators() {
throw new UnsupportedOperationException();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
index 5b2cc2a065..6c259eaff2 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
@@ -79,11 +79,13 @@ public class OpChainStats {
}
public OperatorStats getOperatorStats(OpChainExecutionContext context, String operatorId) {
- return _operatorStatsMap.computeIfAbsent(operatorId, (id) -> {
- OperatorStats operatorStats = new OperatorStats(context);
- operatorStats.recordSingleStat(DataTable.MetadataKey.OPERATOR_ID.getName(), operatorId);
- return operatorStats;
- });
+ return _operatorStatsMap.computeIfAbsent(operatorId, (id) -> {
+ OperatorStats operatorStats = new OperatorStats(context);
+ if (context.isTraceEnabled()) {
+ operatorStats.recordSingleStat(DataTable.MetadataKey.OPERATOR_ID.getName(), operatorId);
+ }
+ return operatorStats;
+ });
}
private void startExecutionTimer() {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
index 2655a5c286..5985e23b13 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
@@ -41,6 +41,7 @@ public class OperatorStats {
private int _numBlock = 0;
private int _numRows = 0;
private long _startTimeMs = -1;
+ private long _endTimeMs = -1;
private final Map<String, String> _executionStats;
private boolean _processingStarted = false;
@@ -66,9 +67,11 @@ public class OperatorStats {
public void endTimer(TransferableBlock block) {
if (_executeStopwatch.isRunning()) {
_executeStopwatch.stop();
+ _endTimeMs = System.currentTimeMillis();
}
if (!_processingStarted && block.isNoOpBlock()) {
_startTimeMs = -1;
+ _endTimeMs = -1;
_executeStopwatch.reset();
} else {
_processingStarted = true;
@@ -94,10 +97,14 @@ public class OperatorStats {
_executionStats.putIfAbsent(DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS.getName(),
String.valueOf(_executeStopwatch.elapsed(TimeUnit.MILLISECONDS)));
// wall time are recorded slightly longer than actual execution but it is OK.
- _executionStats.putIfAbsent(DataTable.MetadataKey.OPERATOR_EXEC_START_TIME_MS.getName(),
- String.valueOf(_startTimeMs));
- _executionStats.putIfAbsent(DataTable.MetadataKey.OPERATOR_EXEC_END_TIME_MS.getName(),
- String.valueOf(System.currentTimeMillis()));
+
+ if (_startTimeMs != -1) {
+ _executionStats.putIfAbsent(DataTable.MetadataKey.OPERATOR_EXEC_START_TIME_MS.getName(),
+ String.valueOf(_startTimeMs));
+ long endTimeMs = _endTimeMs == -1 ? System.currentTimeMillis() : _endTimeMs;
+ _executionStats.putIfAbsent(DataTable.MetadataKey.OPERATOR_EXEC_END_TIME_MS.getName(),
+ String.valueOf(endTimeMs));
+ }
return _executionStats;
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index 64141a024b..68d8d4b4ae 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -42,9 +42,11 @@ public class OpChainExecutionContext {
private final Map<Integer, StageMetadata> _metadataMap;
private final OpChainId _id;
private final OpChainStats _stats;
+ private final boolean _traceEnabled;
public OpChainExecutionContext(MailboxService<TransferableBlock> mailboxService, long requestId, int stageId,
- VirtualServerAddress server, long timeoutMs, long deadlineMs, Map<Integer, StageMetadata> metadataMap) {
+ VirtualServerAddress server, long timeoutMs, long deadlineMs, Map<Integer, StageMetadata> metadataMap,
+ boolean traceEnabled) {
_mailboxService = mailboxService;
_requestId = requestId;
_stageId = stageId;
@@ -54,12 +56,13 @@ public class OpChainExecutionContext {
_metadataMap = metadataMap;
_id = new OpChainId(requestId, server.virtualId(), stageId);
_stats = new OpChainStats(_id.toString());
+ _traceEnabled = traceEnabled;
}
public OpChainExecutionContext(PlanRequestContext planRequestContext) {
this(planRequestContext.getMailboxService(), planRequestContext.getRequestId(), planRequestContext.getStageId(),
planRequestContext.getServer(), planRequestContext.getTimeoutMs(), planRequestContext.getDeadlineMs(),
- planRequestContext.getMetadataMap());
+ planRequestContext.getMetadataMap(), planRequestContext.isTraceEnabled());
}
public MailboxService<TransferableBlock> getMailboxService() {
@@ -97,4 +100,8 @@ public class OpChainExecutionContext {
public OpChainStats getStats() {
return _stats;
}
+
+ public boolean isTraceEnabled() {
+ return _traceEnabled;
+ }
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
index fb610ebb46..21899d7d34 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
@@ -40,9 +40,11 @@ public class PlanRequestContext {
protected final Map<Integer, StageMetadata> _metadataMap;
protected final List<MailboxIdentifier> _receivingMailboxes = new ArrayList<>();
private final OpChainExecutionContext _opChainExecutionContext;
+ private final boolean _traceEnabled;
public PlanRequestContext(MailboxService<TransferableBlock> mailboxService, long requestId, int stageId,
- long timeoutMs, long deadlineMs, VirtualServerAddress server, Map<Integer, StageMetadata> metadataMap) {
+ long timeoutMs, long deadlineMs, VirtualServerAddress server, Map<Integer, StageMetadata> metadataMap,
+ boolean traceEnabled) {
_mailboxService = mailboxService;
_requestId = requestId;
_stageId = stageId;
@@ -50,6 +52,7 @@ public class PlanRequestContext {
_deadlineMs = deadlineMs;
_server = server;
_metadataMap = metadataMap;
+ _traceEnabled = traceEnabled;
_opChainExecutionContext = new OpChainExecutionContext(this);
}
@@ -92,4 +95,8 @@ public class PlanRequestContext {
public OpChainExecutionContext getOpChainExecutionContext() {
return _opChainExecutionContext;
}
+
+ public boolean isTraceEnabled() {
+ return _traceEnabled;
+ }
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
index b6194bd15b..6255a99ddc 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
@@ -97,6 +97,7 @@ public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, ServerPl
long requestId = (Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID)) << 16)
+ (stagePlan.getStageId() << 8) + (tableType == TableType.REALTIME ? 1 : 0);
long timeoutMs = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
+ boolean traceEnabled = Boolean.parseBoolean(requestMetadataMap.get(CommonConstants.Broker.Request.TRACE));
PinotQuery pinotQuery = new PinotQuery();
Integer leafNodeLimit = QueryOptionsUtils.getMultiStageLeafLimit(requestMetadataMap);
if (leafNodeLimit != null) {
@@ -109,7 +110,7 @@ public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, ServerPl
ServerPlanRequestContext context =
new ServerPlanRequestContext(mailboxService, requestId, stagePlan.getStageId(), timeoutMs, deadlineMs,
new VirtualServerAddress(stagePlan.getServer()), stagePlan.getMetadataMap(), pinotQuery, tableType,
- timeBoundaryInfo);
+ timeBoundaryInfo, traceEnabled);
// visit the plan and create query physical plan.
ServerRequestPlanVisitor.walkStageNode(stagePlan.getStageRoot(), context);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
index 4403d35e6d..c9ce748101 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
@@ -43,8 +43,8 @@ public class ServerPlanRequestContext extends PlanRequestContext {
public ServerPlanRequestContext(MailboxService<TransferableBlock> mailboxService, long requestId, int stageId,
long timeoutMs, long deadlineMs, VirtualServerAddress server, Map<Integer, StageMetadata> metadataMap,
- PinotQuery pinotQuery, TableType tableType, TimeBoundaryInfo timeBoundaryInfo) {
- super(mailboxService, requestId, stageId, timeoutMs, deadlineMs, server, metadataMap);
+ PinotQuery pinotQuery, TableType tableType, TimeBoundaryInfo timeBoundaryInfo, boolean traceEnabled) {
+ super(mailboxService, requestId, stageId, timeoutMs, deadlineMs, server, metadataMap, traceEnabled);
_pinotQuery = pinotQuery;
_tableType = tableType;
_timeBoundaryInfo = timeBoundaryInfo;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 011fed8472..6d113c9fa2 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -85,13 +85,14 @@ public class QueryDispatcher {
public ResultTable submitAndReduce(long requestId, QueryPlan queryPlan,
MailboxService<TransferableBlock> mailboxService, long timeoutMs, Map<String, String> queryOptions,
- Map<Integer, ExecutionStatsAggregator> executionStatsAggregator)
+ Map<Integer, ExecutionStatsAggregator> executionStatsAggregator, boolean traceEnabled)
throws Exception {
try {
// submit all the distributed stages.
int reduceStageId = submit(requestId, queryPlan, timeoutMs, queryOptions);
// run reduce stage and return result.
- return runReducer(requestId, queryPlan, reduceStageId, timeoutMs, mailboxService, executionStatsAggregator);
+ return runReducer(requestId, queryPlan, reduceStageId, timeoutMs, mailboxService, executionStatsAggregator,
+ traceEnabled);
} catch (Exception e) {
cancel(requestId, queryPlan);
throw new RuntimeException("Error executing query: " + ExplainPlanStageVisitor.explain(queryPlan), e);
@@ -175,13 +176,14 @@ public class QueryDispatcher {
@VisibleForTesting
public static ResultTable runReducer(long requestId, QueryPlan queryPlan, int reduceStageId, long timeoutMs,
- MailboxService<TransferableBlock> mailboxService, Map<Integer, ExecutionStatsAggregator> statsAggregatorMap) {
+ MailboxService<TransferableBlock> mailboxService, Map<Integer, ExecutionStatsAggregator> statsAggregatorMap,
+ boolean traceEnabled) {
MailboxReceiveNode reduceNode = (MailboxReceiveNode) queryPlan.getQueryStageMap().get(reduceStageId);
VirtualServerAddress server =
new VirtualServerAddress(mailboxService.getHostname(), mailboxService.getMailboxPort(), 0);
OpChainExecutionContext context =
new OpChainExecutionContext(mailboxService, requestId, reduceStageId, server, timeoutMs, timeoutMs,
- queryPlan.getStageMetadataMap());
+ queryPlan.getStageMetadataMap(), traceEnabled);
MailboxReceiveOperator mailboxReceiveOperator =
createReduceStageOperator(reduceNode.getSenderStageId(), reduceStageId, reduceNode.getDataSchema(), context);
List<DataBlock> resultDataBlocks =
@@ -221,12 +223,14 @@ public class QueryDispatcher {
OperatorStats operatorStats = entry.getValue();
ExecutionStatsAggregator rootStatsAggregator = executionStatsAggregatorMap.get(0);
ExecutionStatsAggregator stageStatsAggregator = executionStatsAggregatorMap.get(operatorStats.getStageId());
- if (queryPlan != null) {
- StageMetadata operatorStageMetadata = queryPlan.getStageMetadataMap().get(operatorStats.getStageId());
- OperatorUtils.recordTableName(operatorStats, operatorStageMetadata);
- }
rootStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>());
- stageStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>());
+ if (stageStatsAggregator != null) {
+ if (queryPlan != null) {
+ StageMetadata operatorStageMetadata = queryPlan.getStageMetadataMap().get(operatorStats.getStageId());
+ OperatorUtils.recordTableName(operatorStats, operatorStageMetadata);
+ }
+ stageStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>());
+ }
}
}
return resultDataBlocks;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 649a2bee05..e1e2e17ab1 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -196,7 +196,8 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
try {
QueryDispatcher.runReducer(requestId, queryPlan, reducerStageId,
- Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS)), _mailboxService, null);
+ Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS)), _mailboxService, null,
+ false);
} catch (RuntimeException rte) {
Assert.assertTrue(rte.getMessage().contains("Received error query execution result block"));
Assert.assertTrue(rte.getMessage().contains(exceptionMsg), "Exception should contain: " + exceptionMsg
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index 3bd3156ce6..affc0317a0 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -21,7 +21,6 @@ package org.apache.pinot.query.runtime;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
import com.google.common.math.DoubleMath;
import java.math.BigDecimal;
import java.sql.Connection;
@@ -88,10 +87,15 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
protected List<Object[]> queryRunner(String sql, Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap) {
QueryPlan queryPlan = _queryEnvironment.planQuery(sql);
long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
- Map<String, String> requestMetadataMap =
- ImmutableMap.of(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(requestId),
- QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS,
- String.valueOf(CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS));
+ Map<String, String> requestMetadataMap = new HashMap<>();
+ requestMetadataMap.put(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(requestId));
+ requestMetadataMap.put(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS,
+ String.valueOf(CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS));
+
+ if (executionStatsAggregatorMap != null) {
+ requestMetadataMap.put(CommonConstants.Broker.Request.TRACE, "true");
+ }
+
int reducerStageId = -1;
for (int stageId : queryPlan.getStageMetadataMap().keySet()) {
if (queryPlan.getQueryStageMap().get(stageId) instanceof MailboxReceiveNode) {
@@ -110,7 +114,7 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
Preconditions.checkState(reducerStageId != -1);
ResultTable resultTable = QueryDispatcher.runReducer(requestId, queryPlan, reducerStageId,
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS)), _mailboxService,
- executionStatsAggregatorMap);
+ executionStatsAggregatorMap, true);
return resultTable.getRows();
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index a30adf04f4..fbbab9e4d9 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -74,7 +74,7 @@ public class OpChainSchedulerServiceTest {
private OpChain getChain(MultiStageOperator operator) {
VirtualServerAddress address = new VirtualServerAddress("localhost", 1234, 1);
- OpChainExecutionContext context = new OpChainExecutionContext(null, 123L, 1, address, 0, 0, null);
+ OpChainExecutionContext context = new OpChainExecutionContext(null, 123L, 1, address, 0, 0, null, true);
return new OpChain(context, operator, ImmutableList.of());
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
index 34b39a697b..1f8eb9e254 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
@@ -184,6 +184,6 @@ public class RoundRobinSchedulerTest {
private OpChainExecutionContext getOpChainExecutionContext(long requestId, int stageId, int virtualServerId) {
return new OpChainExecutionContext(null, requestId, stageId,
- new VirtualServerAddress("localhost", 1234, virtualServerId), 0, 0, null);
+ new VirtualServerAddress("localhost", 1234, virtualServerId), 0, 0, null, true);
}
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index 7e932067e1..cb67b22127 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -86,7 +86,7 @@ public class MailboxReceiveOperatorTest {
// shorter timeoutMs should result in error.
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, 10L, 10L,
- new HashMap<>());
+ new HashMap<>(), false);
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, 456, 789, 10L);
Thread.sleep(200L);
@@ -97,13 +97,13 @@ public class MailboxReceiveOperatorTest {
// longer timeout or default timeout (10s) doesn't result in error.
context = new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, 2000L, 2000L,
- new HashMap<>());
+ new HashMap<>(), false);
receiveOp = new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, 456, 789, 2000L);
Thread.sleep(200L);
mailbox = receiveOp.nextBlock();
Assert.assertFalse(mailbox.isErrorBlock());
context = new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
receiveOp = new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, 456, 789, null);
Thread.sleep(200L);
mailbox = receiveOp.nextBlock();
@@ -125,7 +125,7 @@ public class MailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
456, 789, null);
@@ -144,7 +144,7 @@ public class MailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
RelDistribution.Type.RANGE_DISTRIBUTED, 456, 789, null);
}
@@ -172,7 +172,7 @@ public class MailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
@@ -210,7 +210,7 @@ public class MailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
@@ -251,7 +251,7 @@ public class MailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
@@ -290,7 +290,7 @@ public class MailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
@@ -332,7 +332,7 @@ public class MailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
@@ -378,7 +378,7 @@ public class MailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
@@ -424,7 +424,7 @@ public class MailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
@@ -474,7 +474,7 @@ public class MailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
@@ -526,7 +526,7 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3));
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
@@ -585,7 +585,7 @@ public class MailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
@@ -632,7 +632,7 @@ public class MailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 25b307b22b..fe363d9e8b 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -194,6 +194,11 @@ public class MailboxSendOperatorTest {
ArgumentCaptor<TransferableBlock> captor = ArgumentCaptor.forClass(TransferableBlock.class);
Mockito.verify(_exchange).send(captor.capture());
Assert.assertSame(captor.getValue().getType(), DataBlock.Type.ROW, "expected data block to propagate");
+
+ // EOS block should contain statistics
+ Assert.assertFalse(context.getStats().getOperatorStatsMap().isEmpty());
+ Assert.assertEquals(context.getStats().getOperatorStatsMap().size(), 1);
+ Assert.assertTrue(context.getStats().getOperatorStatsMap().containsKey(operator.getOperatorId()));
}
private static TransferableBlock block(DataSchema schema, Object[]... rows) {
@@ -206,7 +211,7 @@ public class MailboxSendOperatorTest {
Map<Integer, StageMetadata> stageMetadataMap = Collections.singletonMap(DEFAULT_RECEIVER_STAGE_ID, stageMetadata);
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 1, DEFAULT_SENDER_STAGE_ID, new VirtualServerAddress(_server),
- deadlineMs, deadlineMs, stageMetadataMap);
+ deadlineMs, deadlineMs, stageMetadataMap, false);
return context;
}
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
index 6f83f9ac77..56ff2b36e3 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
@@ -18,13 +18,34 @@
*/
package org.apache.pinot.query.runtime.operator;
+import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.Stack;
import javax.annotation.Nullable;
+import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
+import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.routing.VirtualServer;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.spi.data.FieldSpec;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
@@ -39,9 +60,62 @@ public class OpChainTest {
@Mock
private MultiStageOperator _upstreamOperator;
+ private static int _numOperatorsInitialized = 0;
+ private final List<TransferableBlock> _blockList = new ArrayList<>();
+
+ @Mock
+ private MailboxService<TransferableBlock> _mailboxService;
+ @Mock
+ private VirtualServer _server;
+ @Mock
+ private KeySelector<Object[], Object[]> _selector;
+ @Mock
+ private MailboxSendOperator.BlockExchangeFactory _exchangeFactory;
+ @Mock
+ private BlockExchange _exchange;
+
+ @Mock
+ private ReceivingMailbox<TransferableBlock> _mailbox;
+
+
+ @Mock
+ private MailboxService<TransferableBlock> _mailboxService2;
+ @Mock
+ private ReceivingMailbox<TransferableBlock> _mailbox2;
+
+
@BeforeMethod
public void setUp() {
_mocks = MockitoAnnotations.openMocks(this);
+
+ Mockito.when(_exchangeFactory.build(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(),
+ Mockito.anyLong())).thenReturn(_exchange);
+
+ Mockito.when(_server.getHostname()).thenReturn("mock");
+ Mockito.when(_server.getQueryMailboxPort()).thenReturn(0);
+ Mockito.when(_server.getVirtualId()).thenReturn(0);
+
+ Mockito.when(_mailboxService.getReceivingMailbox(Mockito.any())).thenReturn(_mailbox);
+ Mockito.when(_mailboxService2.getReceivingMailbox(Mockito.any())).thenReturn(_mailbox2);
+
+ try {
+ Mockito.doAnswer(invocation -> {
+ TransferableBlock arg = invocation.getArgument(0);
+ _blockList.add(arg);
+ return null;
+ }).when(_exchange).send(Mockito.any(TransferableBlock.class));
+
+ Mockito.when(_mailbox2.receive()).then(x -> {
+ if (_blockList.isEmpty()) {
+ return TransferableBlockUtils.getNoOpTransferableBlock();
+ }
+ return _blockList.remove(0);
+ });
+
+ Mockito.when(_mailbox2.isClosed()).thenReturn(false);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
@AfterMethod
@@ -79,7 +153,7 @@ public class OpChainTest {
}
@Test
- public void testStatsCollection() {
+ public void testStatsCollectionTracingEnabled() {
OpChainExecutionContext context = OperatorTestUtil.getDefaultContext();
DummyMultiStageOperator dummyMultiStageOperator = new DummyMultiStageOperator(context);
@@ -100,7 +174,159 @@ public class OpChainTest {
Long.parseLong(executionStats.get(DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS.getName())) <= 2000);
}
+ @Test
+ public void testStatsCollectionTracingDisabled() {
+ OpChainExecutionContext context = OperatorTestUtil.getDefaultContextWithTracingDisabled();
+ DummyMultiStageOperator dummyMultiStageOperator = new DummyMultiStageOperator(context);
+
+ OpChain opChain = new OpChain(context, dummyMultiStageOperator, new ArrayList<>());
+ opChain.getStats().executing();
+ opChain.getRoot().nextBlock();
+ opChain.getStats().queued();
+
+ Assert.assertTrue(opChain.getStats().getExecutionTime() >= 1000);
+ Assert.assertEquals(opChain.getStats().getOperatorStatsMap().size(), 0);
+ }
+
+ @Test
+ public void testStatsCollectionTracingEnabledMultipleOperators() {
+ long dummyOperatorWaitTime = 1000L;
+
+ int receivedStageId = 2;
+ int senderStageId = 1;
+ StageMetadata stageMetadata = new StageMetadata();
+ stageMetadata.setServerInstances(ImmutableList.of(_server));
+ Map<Integer, StageMetadata> stageMetadataMap = Collections.singletonMap(receivedStageId, stageMetadata);
+ OpChainExecutionContext context =
+ new OpChainExecutionContext(_mailboxService, 1, senderStageId, new VirtualServerAddress(_server), 1000, 1000,
+ stageMetadataMap, true);
+
+ Stack<MultiStageOperator> operators =
+ getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime);
+
+ OpChain opChain = new OpChain(context, operators.peek(), new ArrayList<>());
+ opChain.getStats().executing();
+ while (!opChain.getRoot().nextBlock().isEndOfStreamBlock()) {
+ // Drain the opchain
+ }
+ opChain.getStats().queued();
+
+ OpChainExecutionContext secondStageContext =
+ new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, new VirtualServerAddress(_server), 1000,
+ 1000, stageMetadataMap, true);
+
+ MailboxReceiveOperator secondStageReceiveOp =
+ new MailboxReceiveOperator(secondStageContext, ImmutableList.of(_server),
+ RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId, receivedStageId + 1, null);
+
+ Assert.assertTrue(opChain.getStats().getExecutionTime() >= dummyOperatorWaitTime);
+ int numOperators = operators.size();
+ Assert.assertEquals(opChain.getStats().getOperatorStatsMap().size(), numOperators);
+ while (!operators.isEmpty()) {
+ Assert.assertTrue(opChain.getStats().getOperatorStatsMap().containsKey(operators.pop().getOperatorId()));
+ }
+
+ while (!secondStageReceiveOp.nextBlock().isEndOfStreamBlock()) {
+ // Drain the mailbox
+ }
+ Assert.assertEquals(secondStageContext.getStats().getOperatorStatsMap().size(), numOperators + 1);
+ }
+
+ @Test
+ public void testStatsCollectionTracingDisableMultipleOperators() {
+ long dummyOperatorWaitTime = 1000L;
+
+ int receivedStageId = 2;
+ int senderStageId = 1;
+ StageMetadata stageMetadata = new StageMetadata();
+ stageMetadata.setServerInstances(ImmutableList.of(_server));
+ Map<Integer, StageMetadata> stageMetadataMap = Collections.singletonMap(receivedStageId, stageMetadata);
+ OpChainExecutionContext context =
+ new OpChainExecutionContext(_mailboxService, 1, senderStageId, new VirtualServerAddress(_server), 1000, 1000,
+ stageMetadataMap, false);
+
+ Stack<MultiStageOperator> operators =
+ getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime);
+
+ OpChain opChain = new OpChain(context, operators.peek(), new ArrayList<>());
+ opChain.getStats().executing();
+ opChain.getRoot().nextBlock();
+ opChain.getStats().queued();
+
+ OpChainExecutionContext secondStageContext =
+ new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, new VirtualServerAddress(_server), 1000,
+ 1000, stageMetadataMap, false);
+
+ MailboxReceiveOperator secondStageReceiveOp =
+ new MailboxReceiveOperator(secondStageContext, ImmutableList.of(_server),
+ RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId, receivedStageId + 1, null);
+
+ Assert.assertTrue(opChain.getStats().getExecutionTime() >= dummyOperatorWaitTime);
+ Assert.assertEquals(opChain.getStats().getOperatorStatsMap().size(), 2);
+ Assert.assertTrue(opChain.getStats().getOperatorStatsMap().containsKey(operators.pop().getOperatorId()));
+
+ while (!secondStageReceiveOp.nextBlock().isEndOfStreamBlock()) {
+ // Drain the mailbox
+ }
+
+ while (!operators.isEmpty()) {
+ MultiStageOperator operator = operators.pop();
+ if (operator.toExplainString().contains("SEND") || operator.toExplainString().contains("LEAF")) {
+ Assert.assertTrue(opChain.getStats().getOperatorStatsMap().containsKey(operator.getOperatorId()));
+ }
+ }
+ Assert.assertEquals(secondStageContext.getStats().getOperatorStatsMap().size(), 2);
+ }
+
+ private Stack<MultiStageOperator> getFullOpchain(int receivedStageId, int senderStageId,
+ OpChainExecutionContext context, long waitTimeInMillis) {
+ Stack<MultiStageOperator> operators = new Stack<>();
+ DataSchema upStreamSchema =
+ new DataSchema(new String[]{"intCol"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
+ //Mailbox Receive Operator
+ try {
+ Mockito.when(_mailbox.receive()).thenReturn(OperatorTestUtil.block(upStreamSchema, new Object[]{1}),
+ TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ } catch (Exception e) {
+ Assert.fail("Exception while mocking mailbox receive: " + e.getMessage());
+ }
+
+ QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT intCol FROM tbl");
+ List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock(
+ new SelectionResultsBlock(upStreamSchema, Arrays.asList(new Object[]{1}, new Object[]{2})), queryContext));
+ LeafStageTransferableBlockOperator leafOp =
+ new LeafStageTransferableBlockOperator(context, resultsBlockList, upStreamSchema);
+
+ //Transform operator
+ RexExpression.InputRef ref0 = new RexExpression.InputRef(0);
+ TransformOperator transformOp =
+ new TransformOperator(context, leafOp, upStreamSchema, ImmutableList.of(ref0), upStreamSchema);
+
+ //Filter operator
+ RexExpression booleanLiteral = new RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
+ FilterOperator filterOp = new FilterOperator(context, transformOp, upStreamSchema, booleanLiteral);
+
+ // Dummy operator
+ MultiStageOperator dummyWaitOperator = new DummyMultiStageCallableOperator(context, filterOp, waitTimeInMillis);
+
+ //Mailbox Send operator
+ MailboxSendOperator sendOperator =
+ new MailboxSendOperator(context, dummyWaitOperator, RelDistribution.Type.HASH_DISTRIBUTED, _selector, null,
+ null, false,
+ server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", senderStageId, receivedStageId),
+ _exchangeFactory, receivedStageId);
+
+ operators.push(leafOp);
+ operators.push(transformOp);
+ operators.push(filterOp);
+ operators.push(dummyWaitOperator);
+ operators.push(sendOperator);
+ return operators;
+ }
+
+
static class DummyMultiStageOperator extends MultiStageOperator {
+
public DummyMultiStageOperator(OpChainExecutionContext context) {
super(context);
}
@@ -121,4 +347,33 @@ public class OpChainTest {
return "DUMMY";
}
}
+
+ static class DummyMultiStageCallableOperator extends MultiStageOperator {
+ private final MultiStageOperator _upstream;
+ private final long _sleepTimeInMillis;
+
+ public DummyMultiStageCallableOperator(OpChainExecutionContext context, MultiStageOperator upstream,
+ long sleepTimeInMillis) {
+ super(context);
+ _upstream = upstream;
+ _sleepTimeInMillis = sleepTimeInMillis;
+ }
+
+ @Override
+ protected TransferableBlock getNextBlock() {
+ try {
+ Thread.sleep(_sleepTimeInMillis);
+ _upstream.nextBlock();
+ } catch (InterruptedException e) {
+ // IGNORE
+ }
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ }
+
+ @Nullable
+ @Override
+ public String toExplainString() {
+ return "DUMMY_" + _numOperatorsInitialized++;
+ }
+ }
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
index cd1f3ffe8d..575df48d30 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
@@ -65,12 +65,18 @@ public class OperatorTestUtil {
public static OpChainExecutionContext getDefaultContext() {
VirtualServerAddress virtualServerAddress = new VirtualServerAddress("mock", 80, 0);
return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, Long.MAX_VALUE,
- new HashMap<>());
+ new HashMap<>(), true);
+ }
+
+ public static OpChainExecutionContext getDefaultContextWithTracingDisabled() {
+ VirtualServerAddress virtualServerAddress = new VirtualServerAddress("mock", 80, 0);
+ return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, Long.MAX_VALUE,
+ new HashMap<>(), false);
}
public static OpChainExecutionContext getContext(long requestId, int stageId,
VirtualServerAddress virtualServerAddress) {
return new OpChainExecutionContext(null, requestId, stageId, virtualServerAddress, Long.MAX_VALUE, Long.MAX_VALUE,
- new HashMap<>());
+ new HashMap<>(), true);
}
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
index 659f7e8860..e306875cf3 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
@@ -97,7 +97,7 @@ public class SortedMailboxReceiveOperatorTest {
DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, 10L, 10L,
- new HashMap<>());
+ new HashMap<>(), false);
SortedMailboxReceiveOperator receiveOp =
new SortedMailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, collationKeys,
collationDirections, false, true, inSchema, 456, 789, 10L);
@@ -109,14 +109,14 @@ public class SortedMailboxReceiveOperatorTest {
// longer timeout or default timeout (10s) doesn't result in error.
context = new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, 2000L, 2000L,
- new HashMap<>());
+ new HashMap<>(), false);
receiveOp = new SortedMailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON,
collationKeys, collationDirections, false, true, inSchema, 456, 789, 2000L);
Thread.sleep(200L);
mailbox = receiveOp.nextBlock();
Assert.assertFalse(mailbox.isErrorBlock());
context = new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
receiveOp = new SortedMailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON,
collationKeys, collationDirections, false, true, inSchema, 456, 789, null);
Thread.sleep(200L);
@@ -146,7 +146,7 @@ public class SortedMailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
SortedMailboxReceiveOperator receiveOp =
new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
collationKeys, collationDirections, false, true, inSchema, 456, 789, null);
@@ -172,7 +172,7 @@ public class SortedMailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
ImmutableList.of(_server1, _server2), RelDistribution.Type.RANGE_DISTRIBUTED, collationKeys,
collationDirections, false, true, inSchema, 456, 789, null);
@@ -208,7 +208,7 @@ public class SortedMailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
SortedMailboxReceiveOperator receiveOp =
new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
@@ -254,7 +254,7 @@ public class SortedMailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
SortedMailboxReceiveOperator receiveOp =
new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
@@ -303,7 +303,7 @@ public class SortedMailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
SortedMailboxReceiveOperator receiveOp =
new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
@@ -350,7 +350,7 @@ public class SortedMailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
SortedMailboxReceiveOperator receiveOp =
new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
@@ -398,7 +398,7 @@ public class SortedMailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
SortedMailboxReceiveOperator receiveOp =
new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
@@ -450,7 +450,7 @@ public class SortedMailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
SortedMailboxReceiveOperator receiveOp =
new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
@@ -489,7 +489,7 @@ public class SortedMailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
SortedMailboxReceiveOperator receiveOp =
new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
@@ -526,7 +526,7 @@ public class SortedMailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
SortedMailboxReceiveOperator receiveOp =
new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
@@ -574,7 +574,7 @@ public class SortedMailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
SortedMailboxReceiveOperator receiveOp =
new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
@@ -630,7 +630,7 @@ public class SortedMailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
SortedMailboxReceiveOperator receiveOp =
new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
@@ -686,7 +686,7 @@ public class SortedMailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
SortedMailboxReceiveOperator receiveOp =
new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
@@ -739,7 +739,7 @@ public class SortedMailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
SortedMailboxReceiveOperator receiveOp =
new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
@@ -798,7 +798,7 @@ public class SortedMailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirection,
@@ -882,7 +882,7 @@ public class SortedMailboxReceiveOperatorTest {
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
+ Long.MAX_VALUE, new HashMap<>(), false);
SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirection,
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index 4911f3b57f..614f983c66 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -246,6 +246,7 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
Assert.assertEquals(brokerResponseNative.getNumSegmentsQueried(), numSegments);
Map<Integer, BrokerResponseStats> stageIdStats = brokerResponseNative.getStageIdStats();
+ int numTables = 0;
for (Integer stageId : stageIdStats.keySet()) {
// check stats only for leaf stage
BrokerResponseStats brokerResponseStats = stageIdStats.get(stageId);
@@ -256,7 +257,7 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
String tableName = brokerResponseStats.getTableNames().get(0);
Assert.assertEquals(brokerResponseStats.getTableNames().size(), 1);
-
+ numTables++;
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
if (tableType == null) {
tableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
@@ -275,6 +276,8 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
}
}
}
+
+ Assert.assertTrue(numTables > 0);
});
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
index 9adf5fe627..af98b32d12 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
@@ -138,7 +138,7 @@ public class QueryDispatcherTest extends QueryTestSet {
QueryDispatcher dispatcher = new QueryDispatcher();
long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
try {
- dispatcher.submitAndReduce(requestId, queryPlan, null, 10_000L, new HashMap<>(), null);
+ dispatcher.submitAndReduce(requestId, queryPlan, null, 10_000L, new HashMap<>(), null, false);
Assert.fail("Method call above should have failed");
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Error executing query"));
@@ -161,7 +161,7 @@ public class QueryDispatcherTest extends QueryTestSet {
long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
try {
// will throw b/c mailboxService is null
- dispatcher.submitAndReduce(requestId, queryPlan, null, 10_000L, new HashMap<>(), null);
+ dispatcher.submitAndReduce(requestId, queryPlan, null, 10_000L, new HashMap<>(), null, false);
Assert.fail("Method call above should have failed");
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Error executing query"));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org