You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/07/26 18:24:07 UTC
[pinot] branch master updated: Add brokerId and brokerReduceTimeMs to the broker response stats (#11142)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a86ba9c42f Add brokerId and brokerReduceTimeMs to the broker response stats (#11142)
a86ba9c42f is described below
commit a86ba9c42fa94703988a477620966eb5f31f6d89
Author: Shounak kulkarni <sh...@gmail.com>
AuthorDate: Wed Jul 26 23:54:01 2023 +0530
Add brokerId and brokerReduceTimeMs to the broker response stats (#11142)
---
.../requesthandler/BaseBrokerRequestHandler.java | 2 ++
.../requesthandler/GrpcBrokerRequestHandler.java | 7 ++--
.../MultiStageBrokerRequestHandler.java | 32 +++++++++--------
.../org/apache/pinot/client/BrokerResponse.java | 6 ++++
.../org/apache/pinot/client/ExecutionStats.java | 6 ++++
.../pinot/common/response/BrokerResponse.java | 14 ++++++++
.../response/broker/BrokerResponseNative.java | 41 +++++++++++++++++-----
.../response/broker/BrokerResponseStats.java | 16 ++++-----
.../query/service/dispatch/QueryDispatcher.java | 12 ++++---
.../service/dispatch/QueryDispatcherTest.java | 10 ++++--
10 files changed, 108 insertions(+), 38 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 5921435aab..4a8ed8f3e7 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -265,6 +265,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
}
brokerResponse.setRequestId(String.valueOf(requestId));
+ brokerResponse.setBrokerId(_brokerId);
+ brokerResponse.setBrokerReduceTimeMs(requestContext.getReduceTimeMillis());
return brokerResponse;
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
index 95d17c955b..56be2ea4fb 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
@@ -103,8 +103,11 @@ public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler {
sendRequest(requestId, TableType.REALTIME, realtimeBrokerRequest, realtimeRoutingTable, responseMap,
requestContext.isSampledRequest());
}
- return _streamingReduceService.reduceOnStreamResponse(originalBrokerRequest, responseMap, timeoutMs,
- _brokerMetrics);
+ final long startReduceTimeNanos = System.nanoTime();
+ BrokerResponseNative brokerResponse = _streamingReduceService.reduceOnStreamResponse(originalBrokerRequest,
+ responseMap, timeoutMs, _brokerMetrics);
+ requestContext.setReduceTimeNanos(System.nanoTime() - startReduceTimeNanos);
+ return brokerResponse;
}
/**
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 27e48e2ed7..5e030919fa 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -145,12 +145,18 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
}
String query = sql.asText();
requestContext.setQuery(query);
- return handleRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext);
+ BrokerResponse brokerResponse = handleRequest(sqlNodeAndOptions, request, requesterIdentity, requestContext);
+
+ brokerResponse.setRequestId(String.valueOf(requestId));
+ brokerResponse.setBrokerId(_brokerId);
+ brokerResponse.setBrokerReduceTimeMs(requestContext.getReduceTimeMillis());
+ return brokerResponse;
}
- private BrokerResponse handleRequest(long requestId, String query, @Nullable SqlNodeAndOptions sqlNodeAndOptions,
- JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext)
- throws Exception {
+ private BrokerResponse handleRequest(@Nullable SqlNodeAndOptions sqlNodeAndOptions,
+ JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext) {
+ final String query = requestContext.getQuery();
+ final long requestId = requestContext.getRequestId();
LOGGER.debug("SQL query for request {}: {}", requestId, query);
long compilationStartTimeNs;
@@ -168,7 +174,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
queryPlanResult = _queryEnvironment.explainQuery(query, sqlNodeAndOptions, requestId);
String plan = queryPlanResult.getExplainPlan();
Set<String> tableNames = queryPlanResult.getTableNames();
- if (!hasTableAccess(requesterIdentity, tableNames, requestId, requestContext)) {
+ if (!hasTableAccess(requesterIdentity, tableNames, requestContext)) {
throw new WebApplicationException("Permission denied", Response.Status.FORBIDDEN);
}
@@ -194,13 +200,13 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
updatePhaseTimingForTables(tableNames, BrokerQueryPhase.REQUEST_COMPILATION, compilationTimeNs);
// Validate table access.
- if (!hasTableAccess(requesterIdentity, tableNames, requestId, requestContext)) {
+ if (!hasTableAccess(requesterIdentity, tableNames, requestContext)) {
throw new WebApplicationException("Permission denied", Response.Status.FORBIDDEN);
}
updatePhaseTimingForTables(tableNames, BrokerQueryPhase.AUTHORIZATION, System.nanoTime() - compilationEndTimeNs);
// Validate QPS quota
- if (hasExceededQPSQuota(tableNames, requestId, requestContext)) {
+ if (hasExceededQPSQuota(tableNames, requestContext)) {
String errorMessage = String.format("Request %d: %s exceeds query quota.", requestId, query);
return new BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR, errorMessage));
}
@@ -217,7 +223,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
long executionStartTimeNs = System.nanoTime();
try {
- queryResults = _queryDispatcher.submitAndReduce(requestId, dispatchableSubPlan, _mailboxService,
+ queryResults = _queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, _mailboxService,
_reducerScheduler,
queryTimeoutMs, sqlNodeAndOptions.getOptions(), stageIdStatsMap, traceEnabled);
} catch (Throwable t) {
@@ -234,7 +240,6 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
sqlNodeAndOptions.getParseTimeNs() + (executionEndTimeNs - compilationStartTimeNs));
brokerResponse.setTimeUsedMs(totalTimeMs);
brokerResponse.setResultTable(queryResults);
- brokerResponse.setRequestId(String.valueOf(requestId));
for (Map.Entry<Integer, ExecutionStatsAggregator> entry : stageIdStatsMap.entrySet()) {
if (entry.getKey() == 0) {
@@ -263,26 +268,25 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
/**
* Validates whether the requester has access to all the tables.
*/
- private boolean hasTableAccess(RequesterIdentity requesterIdentity, Set<String> tableNames, long requestId,
+ private boolean hasTableAccess(RequesterIdentity requesterIdentity, Set<String> tableNames,
RequestContext requestContext) {
boolean hasAccess = _accessControlFactory.create().hasAccess(requesterIdentity, tableNames);
if (!hasAccess) {
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1);
- LOGGER.warn("Access denied for requestId {}", requestId);
+ LOGGER.warn("Access denied for requestId {}", requestContext.getRequestId());
requestContext.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE);
return false;
}
-
return true;
}
/**
* Returns true if the QPS quota of the tables has exceeded.
*/
- private boolean hasExceededQPSQuota(Set<String> tableNames, long requestId, RequestContext requestContext) {
+ private boolean hasExceededQPSQuota(Set<String> tableNames, RequestContext requestContext) {
for (String tableName : tableNames) {
if (!_queryQuotaManager.acquire(tableName)) {
- LOGGER.warn("Request {}: query exceeds quota for table: {}", requestId, tableName);
+ LOGGER.warn("Request {}: query exceeds quota for table: {}", requestContext.getRequestId(), tableName);
requestContext.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE);
String rawTableName = TableNameBuilder.extractRawTableName(tableName);
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_QUOTA_EXCEEDED, 1);
diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerResponse.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerResponse.java
index c8cd0b8ce3..6c04097b5c 100644
--- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerResponse.java
+++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerResponse.java
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.JsonNode;
*/
public class BrokerResponse {
private String _requestId;
+ private String _brokerId;
private JsonNode _aggregationResults;
private JsonNode _selectionResults;
private JsonNode _resultTable;
@@ -37,6 +38,7 @@ public class BrokerResponse {
private BrokerResponse(JsonNode brokerResponse) {
_requestId = brokerResponse.get("requestId") != null ? brokerResponse.get("requestId").asText() : "unknown";
+ _brokerId = brokerResponse.get("brokerId") != null ? brokerResponse.get("brokerId").asText() : "unknown";
_aggregationResults = brokerResponse.get("aggregationResults");
_exceptions = brokerResponse.get("exceptions");
_selectionResults = brokerResponse.get("selectionResults");
@@ -87,4 +89,8 @@ public class BrokerResponse {
public String getRequestId() {
return _requestId;
}
+
+ public String getBrokerId() {
+ return _brokerId;
+ }
}
diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExecutionStats.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExecutionStats.java
index c66ca39b98..dc6bfd6b03 100644
--- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExecutionStats.java
+++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExecutionStats.java
@@ -44,6 +44,7 @@ public class ExecutionStats {
private static final String MIN_CONSUMING_FRESHNESS_TIME_MS = "minConsumingFreshnessTimeMs";
private static final String TOTAL_DOCS = "totalDocs";
private static final String NUM_GROUPS_LIMIT_REACHED = "numGroupsLimitReached";
+ private static final String BROKER_REDUCE_TIME_MS = "brokerReduceTimeMs";
private static final String TIME_USED_MS = "timeUsedMs";
private final JsonNode _brokerResponse;
@@ -113,6 +114,10 @@ public class ExecutionStats {
return _brokerResponse.has(TIME_USED_MS) ? _brokerResponse.get(TIME_USED_MS).asLong() : -1L;
}
+ public long getBrokerReduceTimeMs() {
+ return _brokerResponse.has(BROKER_REDUCE_TIME_MS) ? _brokerResponse.get(BROKER_REDUCE_TIME_MS).asLong() : -1L;
+ }
+
@Override
public String toString() {
Map<String, Object> map = new HashMap<>();
@@ -128,6 +133,7 @@ public class ExecutionStats {
map.put(MIN_CONSUMING_FRESHNESS_TIME_MS, getMinConsumingFreshnessTimeMs() + "ms");
map.put(TOTAL_DOCS, getTotalDocs());
map.put(NUM_GROUPS_LIMIT_REACHED, isNumGroupsLimitReached());
+ map.put(BROKER_REDUCE_TIME_MS, getBrokerReduceTimeMs() + "ms");
map.put(TIME_USED_MS, getTimeUsedMs() + "ms");
return map.toString();
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
index 3ad49460bc..89fcc8c04b 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
@@ -335,4 +335,18 @@ public interface BrokerResponse {
* set request ID generated by broker
*/
void setRequestId(String requestId);
+
+ /**
+ * get broker ID of the processing broker
+ */
+ String getBrokerId();
+
+ /**
+ * set broker ID of the processing broker
+ */
+ void setBrokerId(String requestId);
+
+ long getBrokerReduceTimeMs();
+
+ void setBrokerReduceTimeMs(long brokerReduceTimeMs);
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index f2580b9cc3..3e9345c25f 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -41,14 +41,13 @@ import org.apache.pinot.spi.utils.JsonUtils;
* Supports serialization via JSON.
*/
@JsonPropertyOrder({
- "resultTable", "requestId", "exceptions", "numServersQueried", "numServersResponded", "numSegmentsQueried",
- "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
- "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter",
- "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs",
- "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
- "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs", "segmentStatistics",
- "traceInfo"
-})
+ "resultTable", "requestId", "brokerId", "exceptions", "numServersQueried", "numServersResponded",
+ "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried",
+ "numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter",
+ "numEntriesScannedPostFilter", "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs",
+ "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs",
+ "offlineResponseSerializationCpuTimeNs", "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
+ "realtimeTotalCpuTimeNs", "brokerReduceTimeMs", "segmentStatistics", "traceInfo"})
public class BrokerResponseNative implements BrokerResponse {
public static final BrokerResponseNative EMPTY_RESULT = BrokerResponseNative.empty();
public static final BrokerResponseNative NO_TABLE_RESULT =
@@ -58,6 +57,7 @@ public class BrokerResponseNative implements BrokerResponse {
public static final BrokerResponseNative BROKER_ONLY_EXPLAIN_PLAN_OUTPUT = getBrokerResponseExplainPlanOutput();
private String _requestId;
+ private String _brokerId;
private int _numServersQueried = 0;
private int _numServersResponded = 0;
private long _numDocsScanned = 0L;
@@ -72,6 +72,7 @@ public class BrokerResponseNative implements BrokerResponse {
// the timestamp indicating the freshness of the data queried in consuming segments.
// This can be ingestion timestamp if provided by the stream, or the last index time
private long _minConsumingFreshnessTimeMs = 0L;
+ private long _brokerReduceTimeMs = 0L;
private long _totalDocs = 0L;
private boolean _numGroupsLimitReached = false;
@@ -570,4 +571,28 @@ public class BrokerResponseNative implements BrokerResponse {
public void setRequestId(String requestId) {
_requestId = requestId;
}
+
+ @JsonProperty("brokerId")
+ @Override
+ public String getBrokerId() {
+ return _brokerId;
+ }
+
+ @JsonProperty("brokerId")
+ @Override
+ public void setBrokerId(String requestId) {
+ _brokerId = requestId;
+ }
+
+ @JsonProperty("brokerReduceTimeMs")
+ @Override
+ public long getBrokerReduceTimeMs() {
+ return _brokerReduceTimeMs;
+ }
+
+ @JsonProperty("brokerReduceTimeMs")
+ @Override
+ public void setBrokerReduceTimeMs(long brokerReduceTimeMs) {
+ _brokerReduceTimeMs = brokerReduceTimeMs;
+ }
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
index f2361a7908..db23034efc 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
@@ -33,14 +33,14 @@ import org.apache.pinot.spi.utils.JsonUtils;
// same metadataKey
// TODO: Replace member fields with a simple map of <MetadataKey, Object>
// TODO: Add a subStat field, stage level subStats will contain each operator stats
-@JsonPropertyOrder({"requestId", "exceptions", "numBlocks", "numRows", "stageExecutionTimeMs", "stageExecutionUnit",
- "stageExecWallTimeMs", "stageExecEndTimeMs", "numServersQueried", "numServersResponded", "numSegmentsQueried",
- "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
- "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter",
- "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs",
- "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
- "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs",
- "traceInfo", "operatorStats", "tableNames"})
+@JsonPropertyOrder({"brokerId", "requestId", "exceptions", "numBlocks", "numRows", "stageExecutionTimeMs",
+ "stageExecutionUnit", "stageExecWallTimeMs", "stageExecEndTimeMs", "numServersQueried", "numServersResponded",
+ "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried",
+ "numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter",
+ "numEntriesScannedPostFilter", "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs",
+ "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs",
+ "offlineResponseSerializationCpuTimeNs", "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
+ "realtimeTotalCpuTimeNs", "brokerReduceTimeMs", "traceInfo", "operatorStats", "tableNames"})
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public class BrokerResponseStats extends BrokerResponseNative {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index e5ca95cbbe..9eff0bc6c4 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -64,6 +64,7 @@ import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerExecutor;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
import org.apache.pinot.query.service.QueryConfig;
+import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.ByteArray;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
@@ -86,18 +87,21 @@ public class QueryDispatcher {
new TracedThreadFactory(Thread.NORM_PRIORITY, false, PINOT_BROKER_QUERY_DISPATCHER_FORMAT));
}
- public ResultTable submitAndReduce(long requestId, DispatchableSubPlan dispatchableSubPlan,
+ public ResultTable submitAndReduce(RequestContext context, DispatchableSubPlan dispatchableSubPlan,
MailboxService mailboxService, OpChainSchedulerService scheduler, long timeoutMs,
Map<String, String> queryOptions, Map<Integer, ExecutionStatsAggregator> executionStatsAggregator,
boolean traceEnabled)
throws Exception {
+ final long requestId = context.getRequestId();
try {
// submit all the distributed stages.
int reduceStageId = submit(requestId, dispatchableSubPlan, timeoutMs, queryOptions);
// run reduce stage and return result.
- return runReducer(requestId, dispatchableSubPlan, reduceStageId, timeoutMs, mailboxService, scheduler,
- executionStatsAggregator,
- traceEnabled);
+ long reduceStartTimeInNanos = System.nanoTime();
+ ResultTable resultTable = runReducer(requestId, dispatchableSubPlan, reduceStageId, timeoutMs, mailboxService,
+ scheduler, executionStatsAggregator, traceEnabled);
+ context.setReduceTimeNanos(System.nanoTime() - reduceStartTimeInNanos);
+ return resultTable;
} catch (Exception e) {
cancel(requestId, dispatchableSubPlan);
throw new RuntimeException("Error executing query: "
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
index 3cd3c839fb..ab85a15d60 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
@@ -38,6 +38,8 @@ import org.apache.pinot.query.planner.PlannerUtils;
import org.apache.pinot.query.runtime.QueryRunner;
import org.apache.pinot.query.service.QueryServer;
import org.apache.pinot.query.testutils.QueryTestUtils;
+import org.apache.pinot.spi.trace.DefaultRequestContext;
+import org.apache.pinot.spi.trace.RequestContext;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -138,8 +140,10 @@ public class QueryDispatcherTest extends QueryTestSet {
DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql);
QueryDispatcher dispatcher = new QueryDispatcher();
long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
+ RequestContext context = new DefaultRequestContext();
+ context.setRequestId(requestId);
try {
- dispatcher.submitAndReduce(requestId, dispatchableSubPlan, null, null, 10_000L, new HashMap<>(), null, false);
+ dispatcher.submitAndReduce(context, dispatchableSubPlan, null, null, 10_000L, new HashMap<>(), null, false);
Assert.fail("Method call above should have failed");
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Error executing query"));
@@ -160,9 +164,11 @@ public class QueryDispatcherTest extends QueryTestSet {
DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql);
QueryDispatcher dispatcher = new QueryDispatcher();
long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
+ RequestContext context = new DefaultRequestContext();
+ context.setRequestId(requestId);
try {
// will throw b/c mailboxService is null
- dispatcher.submitAndReduce(requestId, dispatchableSubPlan, null, null, 10_000L, new HashMap<>(), null, false);
+ dispatcher.submitAndReduce(context, dispatchableSubPlan, null, null, 10_000L, new HashMap<>(), null, false);
Assert.fail("Method call above should have failed");
} catch (Exception e) {
System.out.println("e = " + e);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org