You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2023/05/10 06:04:43 UTC

[pinot] branch master updated: Add request id to the V2 broker response (#10706)

This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f5030530f Add request id to the V2 broker response (#10706)
4f5030530f is described below

commit 4f5030530f55e9a08ad9e3cdbf8a5d96319bbb57
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Wed May 10 11:34:36 2023 +0530

    Add request id to the V2 broker response (#10706)
    
    * Add request id to the V2 broker response
    
    * Add unit test
    
    ---------
    
    Co-authored-by: Kartik Khare <kh...@Kartiks-MacBook-Pro.local>
---
 .../MultiStageBrokerRequestHandler.java            |  1 +
 .../MultiStageBrokerRequestHandlerTest.java        | 92 ++++++++++++++++++++++
 .../response/broker/BrokerResponseNativeV2.java    | 25 ++++--
 3 files changed, 111 insertions(+), 7 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 860592392f..ffc72c0732 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -223,6 +223,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
         sqlNodeAndOptions.getParseTimeNs() + (executionEndTimeNs - compilationStartTimeNs));
     brokerResponse.setTimeUsedMs(totalTimeMs);
     brokerResponse.setResultTable(queryResults);
+    brokerResponse.setRequestId(String.valueOf(requestId));
 
     for (Map.Entry<Integer, ExecutionStatsAggregator> entry : stageIdStatsMap.entrySet()) {
       if (entry.getKey() == 0) {
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
new file mode 100644
index 0000000000..c8ebfa0266
--- /dev/null
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.requesthandler;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.pinot.broker.broker.AccessControlFactory;
+import org.apache.pinot.broker.broker.AllowAllAccessControlFactory;
+import org.apache.pinot.broker.queryquota.QueryQuotaManager;
+import org.apache.pinot.broker.routing.BrokerRoutingManager;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.query.service.QueryConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.DefaultRequestContext;
+import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class MultiStageBrokerRequestHandlerTest {
+
+  private PinotConfiguration _config;
+  @Mock
+  private BrokerRoutingManager _routingManager;
+
+  private AccessControlFactory _accessControlFactory;
+  @Mock
+  private QueryQuotaManager _queryQuotaManager;
+  @Mock
+  private TableCache _tableCache;
+
+  @Mock
+  private BrokerMetrics _brokerMetrics;
+
+  private MultiStageBrokerRequestHandler _requestHandler;
+
+  @BeforeClass
+  public void setUp() {
+    MockitoAnnotations.openMocks(this);
+    _config = new PinotConfiguration();
+    _config.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS, "10000");
+    _config.setProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, "12345");
+    _accessControlFactory = new AllowAllAccessControlFactory();
+    _requestHandler =
+        new MultiStageBrokerRequestHandler(_config, "testBrokerId", _routingManager, _accessControlFactory,
+            _queryQuotaManager, _tableCache, _brokerMetrics);
+  }
+
+  @Test
+  public void testSetRequestId()
+      throws Exception {
+    String sampleSqlQuery = "SELECT * FROM testTable";
+    String sampleJsonRequest = String.format("{\"sql\":\"%s\"}", sampleSqlQuery);
+    ObjectMapper objectMapper = new ObjectMapper();
+    JsonNode jsonRequest = objectMapper.readTree(sampleJsonRequest);
+    RequestContext requestContext = new DefaultRequestContext();
+
+    _requestHandler.handleRequest(jsonRequest, null, null, requestContext);
+    long expectedRequestId = 1L;
+    Assert.assertEquals(requestContext.getRequestId(), expectedRequestId, "Request ID should be set correctly");
+
+    _requestHandler.handleRequest(jsonRequest, null, null, requestContext);
+    expectedRequestId += 1L;
+    Assert.assertEquals(requestContext.getRequestId(), expectedRequestId, "Request ID should be set correctly");
+  }
+
+  @AfterClass
+  public void tearDown() {
+    _requestHandler.shutDown();
+  }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
index 2eacbc23aa..8465d47f81 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
@@ -37,15 +37,17 @@ import org.apache.pinot.spi.utils.JsonUtils;
  * Supports serialization via JSON.
  */
 @JsonPropertyOrder({
-    "resultTable", "stageStats", "exceptions", "numServersQueried", "numServersResponded", "numSegmentsQueried",
-    "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
-    "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter",
-    "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs",
-    "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
-    "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs", "segmentStatistics",
-    "traceInfo"
+    "resultTable", "requestId", "stageStats", "exceptions", "numServersQueried", "numServersResponded",
+    "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried",
+    "numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter",
+    "numEntriesScannedPostFilter", "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs",
+    "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs",
+    "offlineResponseSerializationCpuTimeNs", "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
+    "realtimeTotalCpuTimeNs", "segmentStatistics", "traceInfo"
 })
 public class BrokerResponseNativeV2 extends BrokerResponseNative {
+  private String _requestId;
+
   private final Map<Integer, BrokerResponseStats> _stageIdStats = new HashMap<>();
 
   public BrokerResponseNativeV2() {
@@ -91,4 +93,13 @@ public class BrokerResponseNativeV2 extends BrokerResponseNative {
   public Map<Integer, BrokerResponseStats> getStageIdStats() {
     return _stageIdStats;
   }
+
+  @JsonProperty("requestId")
+  public String getRequestId() {
+    return _requestId;
+  }
+
+  public void setRequestId(String requestId) {
+    _requestId = requestId;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org