You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/07/26 18:24:07 UTC

[pinot] branch master updated: Add brokerId and brokerReduceTimeMs to the broker response stats (#11142)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a86ba9c42f Add brokerId and brokerReduceTimeMs to the broker response stats (#11142)
a86ba9c42f is described below

commit a86ba9c42fa94703988a477620966eb5f31f6d89
Author: Shounak kulkarni <sh...@gmail.com>
AuthorDate: Wed Jul 26 23:54:01 2023 +0530

    Add brokerId and brokerReduceTimeMs to the broker response stats (#11142)
---
 .../requesthandler/BaseBrokerRequestHandler.java   |  2 ++
 .../requesthandler/GrpcBrokerRequestHandler.java   |  7 ++--
 .../MultiStageBrokerRequestHandler.java            | 32 +++++++++--------
 .../org/apache/pinot/client/BrokerResponse.java    |  6 ++++
 .../org/apache/pinot/client/ExecutionStats.java    |  6 ++++
 .../pinot/common/response/BrokerResponse.java      | 14 ++++++++
 .../response/broker/BrokerResponseNative.java      | 41 +++++++++++++++++-----
 .../response/broker/BrokerResponseStats.java       | 16 ++++-----
 .../query/service/dispatch/QueryDispatcher.java    | 12 ++++---
 .../service/dispatch/QueryDispatcherTest.java      | 10 ++++--
 10 files changed, 108 insertions(+), 38 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 5921435aab..4a8ed8f3e7 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -265,6 +265,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     }
 
     brokerResponse.setRequestId(String.valueOf(requestId));
+    brokerResponse.setBrokerId(_brokerId);
+    brokerResponse.setBrokerReduceTimeMs(requestContext.getReduceTimeMillis());
     return brokerResponse;
   }
 
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
index 95d17c955b..56be2ea4fb 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
@@ -103,8 +103,11 @@ public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler {
       sendRequest(requestId, TableType.REALTIME, realtimeBrokerRequest, realtimeRoutingTable, responseMap,
           requestContext.isSampledRequest());
     }
-    return _streamingReduceService.reduceOnStreamResponse(originalBrokerRequest, responseMap, timeoutMs,
-        _brokerMetrics);
+    final long startReduceTimeNanos = System.nanoTime();
+    BrokerResponseNative brokerResponse = _streamingReduceService.reduceOnStreamResponse(originalBrokerRequest,
+        responseMap, timeoutMs, _brokerMetrics);
+    requestContext.setReduceTimeNanos(System.nanoTime() - startReduceTimeNanos);
+    return brokerResponse;
   }
 
   /**
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 27e48e2ed7..5e030919fa 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -145,12 +145,18 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
     }
     String query = sql.asText();
     requestContext.setQuery(query);
-    return handleRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext);
+    BrokerResponse brokerResponse = handleRequest(sqlNodeAndOptions, request, requesterIdentity, requestContext);
+
+    brokerResponse.setRequestId(String.valueOf(requestId));
+    brokerResponse.setBrokerId(_brokerId);
+    brokerResponse.setBrokerReduceTimeMs(requestContext.getReduceTimeMillis());
+    return brokerResponse;
   }
 
-  private BrokerResponse handleRequest(long requestId, String query, @Nullable SqlNodeAndOptions sqlNodeAndOptions,
-      JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext)
-      throws Exception {
+  private BrokerResponse handleRequest(@Nullable SqlNodeAndOptions sqlNodeAndOptions,
+      JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext) {
+    final String query = requestContext.getQuery();
+    final long requestId = requestContext.getRequestId();
     LOGGER.debug("SQL query for request {}: {}", requestId, query);
 
     long compilationStartTimeNs;
@@ -168,7 +174,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
           queryPlanResult = _queryEnvironment.explainQuery(query, sqlNodeAndOptions, requestId);
           String plan = queryPlanResult.getExplainPlan();
           Set<String> tableNames = queryPlanResult.getTableNames();
-          if (!hasTableAccess(requesterIdentity, tableNames, requestId, requestContext)) {
+          if (!hasTableAccess(requesterIdentity, tableNames, requestContext)) {
             throw new WebApplicationException("Permission denied", Response.Status.FORBIDDEN);
           }
 
@@ -194,13 +200,13 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
     updatePhaseTimingForTables(tableNames, BrokerQueryPhase.REQUEST_COMPILATION, compilationTimeNs);
 
     // Validate table access.
-    if (!hasTableAccess(requesterIdentity, tableNames, requestId, requestContext)) {
+    if (!hasTableAccess(requesterIdentity, tableNames, requestContext)) {
       throw new WebApplicationException("Permission denied", Response.Status.FORBIDDEN);
     }
     updatePhaseTimingForTables(tableNames, BrokerQueryPhase.AUTHORIZATION, System.nanoTime() - compilationEndTimeNs);
 
     // Validate QPS quota
-    if (hasExceededQPSQuota(tableNames, requestId, requestContext)) {
+    if (hasExceededQPSQuota(tableNames, requestContext)) {
       String errorMessage = String.format("Request %d: %s exceeds query quota.", requestId, query);
       return new BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR, errorMessage));
     }
@@ -217,7 +223,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
 
     long executionStartTimeNs = System.nanoTime();
     try {
-      queryResults = _queryDispatcher.submitAndReduce(requestId, dispatchableSubPlan, _mailboxService,
+      queryResults = _queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, _mailboxService,
           _reducerScheduler,
           queryTimeoutMs, sqlNodeAndOptions.getOptions(), stageIdStatsMap, traceEnabled);
     } catch (Throwable t) {
@@ -234,7 +240,6 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
         sqlNodeAndOptions.getParseTimeNs() + (executionEndTimeNs - compilationStartTimeNs));
     brokerResponse.setTimeUsedMs(totalTimeMs);
     brokerResponse.setResultTable(queryResults);
-    brokerResponse.setRequestId(String.valueOf(requestId));
 
     for (Map.Entry<Integer, ExecutionStatsAggregator> entry : stageIdStatsMap.entrySet()) {
       if (entry.getKey() == 0) {
@@ -263,26 +268,25 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
   /**
    * Validates whether the requester has access to all the tables.
    */
-  private boolean hasTableAccess(RequesterIdentity requesterIdentity, Set<String> tableNames, long requestId,
+  private boolean hasTableAccess(RequesterIdentity requesterIdentity, Set<String> tableNames,
       RequestContext requestContext) {
     boolean hasAccess = _accessControlFactory.create().hasAccess(requesterIdentity, tableNames);
     if (!hasAccess) {
       _brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1);
-      LOGGER.warn("Access denied for requestId {}", requestId);
+      LOGGER.warn("Access denied for requestId {}", requestContext.getRequestId());
       requestContext.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE);
       return false;
     }
-
     return true;
   }
 
   /**
    * Returns true if the QPS quota of the tables has exceeded.
    */
-  private boolean hasExceededQPSQuota(Set<String> tableNames, long requestId, RequestContext requestContext) {
+  private boolean hasExceededQPSQuota(Set<String> tableNames, RequestContext requestContext) {
     for (String tableName : tableNames) {
       if (!_queryQuotaManager.acquire(tableName)) {
-        LOGGER.warn("Request {}: query exceeds quota for table: {}", requestId, tableName);
+        LOGGER.warn("Request {}: query exceeds quota for table: {}", requestContext.getRequestId(), tableName);
         requestContext.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE);
         String rawTableName = TableNameBuilder.extractRawTableName(tableName);
         _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_QUOTA_EXCEEDED, 1);
diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerResponse.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerResponse.java
index c8cd0b8ce3..6c04097b5c 100644
--- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerResponse.java
+++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerResponse.java
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.JsonNode;
  */
 public class BrokerResponse {
   private String _requestId;
+  private String _brokerId;
   private JsonNode _aggregationResults;
   private JsonNode _selectionResults;
   private JsonNode _resultTable;
@@ -37,6 +38,7 @@ public class BrokerResponse {
 
   private BrokerResponse(JsonNode brokerResponse) {
     _requestId = brokerResponse.get("requestId") != null ? brokerResponse.get("requestId").asText() : "unknown";
+    _brokerId = brokerResponse.get("brokerId") != null ? brokerResponse.get("brokerId").asText() : "unknown";
     _aggregationResults = brokerResponse.get("aggregationResults");
     _exceptions = brokerResponse.get("exceptions");
     _selectionResults = brokerResponse.get("selectionResults");
@@ -87,4 +89,8 @@ public class BrokerResponse {
   public String getRequestId() {
     return _requestId;
   }
+
+  public String getBrokerId() {
+    return _brokerId;
+  }
 }
diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExecutionStats.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExecutionStats.java
index c66ca39b98..dc6bfd6b03 100644
--- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExecutionStats.java
+++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ExecutionStats.java
@@ -44,6 +44,7 @@ public class ExecutionStats {
   private static final String MIN_CONSUMING_FRESHNESS_TIME_MS = "minConsumingFreshnessTimeMs";
   private static final String TOTAL_DOCS = "totalDocs";
   private static final String NUM_GROUPS_LIMIT_REACHED = "numGroupsLimitReached";
+  private static final String BROKER_REDUCE_TIME_MS = "brokerReduceTimeMs";
   private static final String TIME_USED_MS = "timeUsedMs";
 
   private final JsonNode _brokerResponse;
@@ -113,6 +114,10 @@ public class ExecutionStats {
     return _brokerResponse.has(TIME_USED_MS) ? _brokerResponse.get(TIME_USED_MS).asLong() : -1L;
   }
 
+  public long getBrokerReduceTimeMs() {
+    return _brokerResponse.has(BROKER_REDUCE_TIME_MS) ? _brokerResponse.get(BROKER_REDUCE_TIME_MS).asLong() : -1L;
+  }
+
   @Override
   public String toString() {
     Map<String, Object> map = new HashMap<>();
@@ -128,6 +133,7 @@ public class ExecutionStats {
     map.put(MIN_CONSUMING_FRESHNESS_TIME_MS, getMinConsumingFreshnessTimeMs() + "ms");
     map.put(TOTAL_DOCS, getTotalDocs());
     map.put(NUM_GROUPS_LIMIT_REACHED, isNumGroupsLimitReached());
+    map.put(BROKER_REDUCE_TIME_MS, getBrokerReduceTimeMs() + "ms");
     map.put(TIME_USED_MS, getTimeUsedMs() + "ms");
     return map.toString();
   }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
index 3ad49460bc..89fcc8c04b 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
@@ -335,4 +335,18 @@ public interface BrokerResponse {
    * set request ID generated by broker
    */
   void setRequestId(String requestId);
+
+  /**
+   * get broker ID of the processing broker
+   */
+  String getBrokerId();
+
+  /**
+   * set broker ID of the processing broker
+   */
+  void setBrokerId(String requestId);
+
+  long getBrokerReduceTimeMs();
+
+  void setBrokerReduceTimeMs(long brokerReduceTimeMs);
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index f2580b9cc3..3e9345c25f 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -41,14 +41,13 @@ import org.apache.pinot.spi.utils.JsonUtils;
  * Supports serialization via JSON.
  */
 @JsonPropertyOrder({
-    "resultTable", "requestId", "exceptions", "numServersQueried", "numServersResponded", "numSegmentsQueried",
-    "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
-    "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter",
-    "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs",
-    "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
-    "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs", "segmentStatistics",
-    "traceInfo"
-})
+    "resultTable", "requestId", "brokerId", "exceptions", "numServersQueried", "numServersResponded",
+    "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried",
+    "numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter",
+    "numEntriesScannedPostFilter", "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs",
+    "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs",
+    "offlineResponseSerializationCpuTimeNs", "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
+    "realtimeTotalCpuTimeNs", "brokerReduceTimeMs", "segmentStatistics", "traceInfo"})
 public class BrokerResponseNative implements BrokerResponse {
   public static final BrokerResponseNative EMPTY_RESULT = BrokerResponseNative.empty();
   public static final BrokerResponseNative NO_TABLE_RESULT =
@@ -58,6 +57,7 @@ public class BrokerResponseNative implements BrokerResponse {
   public static final BrokerResponseNative BROKER_ONLY_EXPLAIN_PLAN_OUTPUT = getBrokerResponseExplainPlanOutput();
 
   private String _requestId;
+  private String _brokerId;
   private int _numServersQueried = 0;
   private int _numServersResponded = 0;
   private long _numDocsScanned = 0L;
@@ -72,6 +72,7 @@ public class BrokerResponseNative implements BrokerResponse {
   // the timestamp indicating the freshness of the data queried in consuming segments.
   // This can be ingestion timestamp if provided by the stream, or the last index time
   private long _minConsumingFreshnessTimeMs = 0L;
+  private long _brokerReduceTimeMs = 0L;
 
   private long _totalDocs = 0L;
   private boolean _numGroupsLimitReached = false;
@@ -570,4 +571,28 @@ public class BrokerResponseNative implements BrokerResponse {
   public void setRequestId(String requestId) {
     _requestId = requestId;
   }
+
+  @JsonProperty("brokerId")
+  @Override
+  public String getBrokerId() {
+    return _brokerId;
+  }
+
+  @JsonProperty("brokerId")
+  @Override
+  public void setBrokerId(String requestId) {
+    _brokerId = requestId;
+  }
+
+  @JsonProperty("brokerReduceTimeMs")
+  @Override
+  public long getBrokerReduceTimeMs() {
+    return _brokerReduceTimeMs;
+  }
+
+  @JsonProperty("brokerReduceTimeMs")
+  @Override
+  public void setBrokerReduceTimeMs(long brokerReduceTimeMs) {
+    _brokerReduceTimeMs = brokerReduceTimeMs;
+  }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
index f2361a7908..db23034efc 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
@@ -33,14 +33,14 @@ import org.apache.pinot.spi.utils.JsonUtils;
 //  same metadataKey
 // TODO: Replace member fields with a simple map of <MetadataKey, Object>
 // TODO: Add a subStat field, stage level subStats will contain each operator stats
-@JsonPropertyOrder({"requestId", "exceptions", "numBlocks", "numRows", "stageExecutionTimeMs", "stageExecutionUnit",
-    "stageExecWallTimeMs", "stageExecEndTimeMs", "numServersQueried", "numServersResponded", "numSegmentsQueried",
-    "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
-    "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter",
-    "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs",
-    "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
-    "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs",
-    "traceInfo", "operatorStats", "tableNames"})
+@JsonPropertyOrder({"brokerId", "requestId", "exceptions", "numBlocks", "numRows", "stageExecutionTimeMs",
+    "stageExecutionUnit", "stageExecWallTimeMs", "stageExecEndTimeMs", "numServersQueried", "numServersResponded",
+    "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried",
+    "numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter",
+    "numEntriesScannedPostFilter", "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs",
+    "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs",
+    "offlineResponseSerializationCpuTimeNs", "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
+    "realtimeTotalCpuTimeNs", "brokerReduceTimeMs", "traceInfo", "operatorStats", "tableNames"})
 @JsonInclude(JsonInclude.Include.NON_DEFAULT)
 public class BrokerResponseStats extends BrokerResponseNative {
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index e5ca95cbbe..9eff0bc6c4 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -64,6 +64,7 @@ import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerExecutor;
 import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
 import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
 import org.apache.pinot.query.service.QueryConfig;
+import org.apache.pinot.spi.trace.RequestContext;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
@@ -86,18 +87,21 @@ public class QueryDispatcher {
         new TracedThreadFactory(Thread.NORM_PRIORITY, false, PINOT_BROKER_QUERY_DISPATCHER_FORMAT));
   }
 
-  public ResultTable submitAndReduce(long requestId, DispatchableSubPlan dispatchableSubPlan,
+  public ResultTable submitAndReduce(RequestContext context, DispatchableSubPlan dispatchableSubPlan,
       MailboxService mailboxService, OpChainSchedulerService scheduler, long timeoutMs,
       Map<String, String> queryOptions, Map<Integer, ExecutionStatsAggregator> executionStatsAggregator,
       boolean traceEnabled)
       throws Exception {
+    final long requestId = context.getRequestId();
     try {
       // submit all the distributed stages.
       int reduceStageId = submit(requestId, dispatchableSubPlan, timeoutMs, queryOptions);
       // run reduce stage and return result.
-      return runReducer(requestId, dispatchableSubPlan, reduceStageId, timeoutMs, mailboxService, scheduler,
-          executionStatsAggregator,
-          traceEnabled);
+      long reduceStartTimeInNanos = System.nanoTime();
+      ResultTable resultTable = runReducer(requestId, dispatchableSubPlan, reduceStageId, timeoutMs, mailboxService,
+          scheduler, executionStatsAggregator, traceEnabled);
+      context.setReduceTimeNanos(System.nanoTime() - reduceStartTimeInNanos);
+      return resultTable;
     } catch (Exception e) {
       cancel(requestId, dispatchableSubPlan);
       throw new RuntimeException("Error executing query: "
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
index 3cd3c839fb..ab85a15d60 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
@@ -38,6 +38,8 @@ import org.apache.pinot.query.planner.PlannerUtils;
 import org.apache.pinot.query.runtime.QueryRunner;
 import org.apache.pinot.query.service.QueryServer;
 import org.apache.pinot.query.testutils.QueryTestUtils;
+import org.apache.pinot.spi.trace.DefaultRequestContext;
+import org.apache.pinot.spi.trace.RequestContext;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -138,8 +140,10 @@ public class QueryDispatcherTest extends QueryTestSet {
     DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql);
     QueryDispatcher dispatcher = new QueryDispatcher();
     long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
+    RequestContext context = new DefaultRequestContext();
+    context.setRequestId(requestId);
     try {
-      dispatcher.submitAndReduce(requestId, dispatchableSubPlan, null, null, 10_000L, new HashMap<>(), null, false);
+      dispatcher.submitAndReduce(context, dispatchableSubPlan, null, null, 10_000L, new HashMap<>(), null, false);
       Assert.fail("Method call above should have failed");
     } catch (Exception e) {
       Assert.assertTrue(e.getMessage().contains("Error executing query"));
@@ -160,9 +164,11 @@ public class QueryDispatcherTest extends QueryTestSet {
     DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql);
     QueryDispatcher dispatcher = new QueryDispatcher();
     long requestId = RANDOM_REQUEST_ID_GEN.nextLong();
+    RequestContext context = new DefaultRequestContext();
+    context.setRequestId(requestId);
     try {
       // will throw b/c mailboxService is null
-      dispatcher.submitAndReduce(requestId, dispatchableSubPlan, null, null, 10_000L, new HashMap<>(), null, false);
+      dispatcher.submitAndReduce(context, dispatchableSubPlan, null, null, 10_000L, new HashMap<>(), null, false);
       Assert.fail("Method call above should have failed");
     } catch (Exception e) {
       System.out.println("e = " + e);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org