You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2023/05/10 06:04:43 UTC
[pinot] branch master updated: Add request id to the V2 broker response (#10706)
This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4f5030530f Add request id to the V2 broker response (#10706)
4f5030530f is described below
commit 4f5030530f55e9a08ad9e3cdbf8a5d96319bbb57
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Wed May 10 11:34:36 2023 +0530
Add request id to the V2 broker response (#10706)
* Add request id to the V2 broker response
* Add unit test
---------
Co-authored-by: Kartik Khare <kh...@Kartiks-MacBook-Pro.local>
---
.../MultiStageBrokerRequestHandler.java | 1 +
.../MultiStageBrokerRequestHandlerTest.java | 92 ++++++++++++++++++++++
.../response/broker/BrokerResponseNativeV2.java | 25 ++++--
3 files changed, 111 insertions(+), 7 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 860592392f..ffc72c0732 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -223,6 +223,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
sqlNodeAndOptions.getParseTimeNs() + (executionEndTimeNs - compilationStartTimeNs));
brokerResponse.setTimeUsedMs(totalTimeMs);
brokerResponse.setResultTable(queryResults);
+ brokerResponse.setRequestId(String.valueOf(requestId));
for (Map.Entry<Integer, ExecutionStatsAggregator> entry : stageIdStatsMap.entrySet()) {
if (entry.getKey() == 0) {
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
new file mode 100644
index 0000000000..c8ebfa0266
--- /dev/null
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.requesthandler;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.pinot.broker.broker.AccessControlFactory;
+import org.apache.pinot.broker.broker.AllowAllAccessControlFactory;
+import org.apache.pinot.broker.queryquota.QueryQuotaManager;
+import org.apache.pinot.broker.routing.BrokerRoutingManager;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.query.service.QueryConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.DefaultRequestContext;
+import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class MultiStageBrokerRequestHandlerTest {
+
+ private PinotConfiguration _config;
+ @Mock
+ private BrokerRoutingManager _routingManager;
+
+ private AccessControlFactory _accessControlFactory;
+ @Mock
+ private QueryQuotaManager _queryQuotaManager;
+ @Mock
+ private TableCache _tableCache;
+
+ @Mock
+ private BrokerMetrics _brokerMetrics;
+
+ private MultiStageBrokerRequestHandler _requestHandler;
+
+ @BeforeClass
+ public void setUp() {
+ MockitoAnnotations.openMocks(this);
+ _config = new PinotConfiguration();
+ _config.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS, "10000");
+ _config.setProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, "12345");
+ _accessControlFactory = new AllowAllAccessControlFactory();
+ _requestHandler =
+ new MultiStageBrokerRequestHandler(_config, "testBrokerId", _routingManager, _accessControlFactory,
+ _queryQuotaManager, _tableCache, _brokerMetrics);
+ }
+
+ @Test
+ public void testSetRequestId()
+ throws Exception {
+ String sampleSqlQuery = "SELECT * FROM testTable";
+ String sampleJsonRequest = String.format("{\"sql\":\"%s\"}", sampleSqlQuery);
+ ObjectMapper objectMapper = new ObjectMapper();
+ JsonNode jsonRequest = objectMapper.readTree(sampleJsonRequest);
+ RequestContext requestContext = new DefaultRequestContext();
+
+ _requestHandler.handleRequest(jsonRequest, null, null, requestContext);
+ long expectedRequestId = 1L;
+ Assert.assertEquals(requestContext.getRequestId(), expectedRequestId, "Request ID should be set correctly");
+
+ _requestHandler.handleRequest(jsonRequest, null, null, requestContext);
+ expectedRequestId += 1L;
+ Assert.assertEquals(requestContext.getRequestId(), expectedRequestId, "Request ID should be set correctly");
+ }
+
+ @AfterClass
+ public void tearDown() {
+ _requestHandler.shutDown();
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
index 2eacbc23aa..8465d47f81 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
@@ -37,15 +37,17 @@ import org.apache.pinot.spi.utils.JsonUtils;
* Supports serialization via JSON.
*/
@JsonPropertyOrder({
- "resultTable", "stageStats", "exceptions", "numServersQueried", "numServersResponded", "numSegmentsQueried",
- "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
- "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter",
- "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs",
- "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
- "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs", "segmentStatistics",
- "traceInfo"
+ "resultTable", "requestId", "stageStats", "exceptions", "numServersQueried", "numServersResponded",
+ "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried",
+ "numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter",
+ "numEntriesScannedPostFilter", "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs",
+ "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs",
+ "offlineResponseSerializationCpuTimeNs", "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
+ "realtimeTotalCpuTimeNs", "segmentStatistics", "traceInfo"
})
public class BrokerResponseNativeV2 extends BrokerResponseNative {
+ private String _requestId;
+
private final Map<Integer, BrokerResponseStats> _stageIdStats = new HashMap<>();
public BrokerResponseNativeV2() {
@@ -91,4 +93,13 @@ public class BrokerResponseNativeV2 extends BrokerResponseNative {
public Map<Integer, BrokerResponseStats> getStageIdStats() {
return _stageIdStats;
}
+
+ @JsonProperty("requestId")
+ public String getRequestId() {
+ return _requestId;
+ }
+
+ public void setRequestId(String requestId) {
+ _requestId = requestId;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org