You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/06/23 06:49:31 UTC

[pinot] branch master updated: [#10884][feature] add requestId for BrokerResponse in pinot-broker and java-client (#10943)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a208607254 [#10884][feature] add requestId for BrokerResponse in pinot-broker and java-client (#10943)
a208607254 is described below

commit a2086072545fc67a35f5d6756c3b9efc97d15641
Author: mingmxu <mi...@gmail.com>
AuthorDate: Thu Jun 22 23:49:25 2023 -0700

    [#10884][feature] add requestId for BrokerResponse in pinot-broker and java-client (#10943)
---
 .../requesthandler/BaseBrokerRequestHandler.java   |  1 +
 .../org/apache/pinot/client/BrokerResponse.java    |  6 +++
 .../apache/pinot/client/BrokerResponseTest.java    | 52 ++++++++++++++++++++++
 .../org/apache/pinot/client/aggregation.json       |  1 +
 .../apache/pinot/client/aggregationGroupBy.json    |  1 +
 .../org/apache/pinot/client/exception.json         |  1 +
 .../org/apache/pinot/client/selection.json         |  1 +
 .../pinot/common/response/BrokerResponse.java      | 10 +++++
 .../response/broker/BrokerResponseNative.java      | 15 ++++++-
 .../response/broker/BrokerResponseNativeV2.java    | 10 -----
 .../response/broker/BrokerResponseStats.java       |  2 +-
 11 files changed, 88 insertions(+), 12 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 63089e863c..406734812d 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -262,6 +262,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
       }
     }
 
+    brokerResponse.setRequestId(String.valueOf(requestId));
     return brokerResponse;
   }
 
diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerResponse.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerResponse.java
index 8f06afab04..c8cd0b8ce3 100644
--- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerResponse.java
+++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerResponse.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.JsonNode;
  * Reimplementation of BrokerResponse from pinot-common, so that pinot-api does not depend on pinot-common.
  */
 public class BrokerResponse {
+  private String _requestId;
   private JsonNode _aggregationResults;
   private JsonNode _selectionResults;
   private JsonNode _resultTable;
@@ -35,6 +36,7 @@ public class BrokerResponse {
   }
 
   private BrokerResponse(JsonNode brokerResponse) {
+    _requestId = brokerResponse.get("requestId") != null ? brokerResponse.get("requestId").asText() : "unknown";
     _aggregationResults = brokerResponse.get("aggregationResults");
     _exceptions = brokerResponse.get("exceptions");
     _selectionResults = brokerResponse.get("selectionResults");
@@ -81,4 +83,8 @@ public class BrokerResponse {
   static BrokerResponse empty() {
     return new BrokerResponse();
   }
+
+  public String getRequestId() {
+    return _requestId;
+  }
 }
diff --git a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/BrokerResponseTest.java b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/BrokerResponseTest.java
new file mode 100644
index 0000000000..219c7a0948
--- /dev/null
+++ b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/BrokerResponseTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectReader;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class BrokerResponseTest {
+  private static final ObjectReader OBJECT_READER = JsonUtils.DEFAULT_READER;
+
+  @Test
+  public void parseResultWithRequestId()
+      throws JsonProcessingException {
+    String responseJson = "{\"requestId\":\"1\",\"traceInfo\":{},\"numDocsScanned\":36542,"
+        + "\"aggregationResults\":[{\"function\":\"count_star\",\"value\":\"36542\"}],\"timeUsedMs\":30,"
+        + "\"segmentStatistics\":[],\"exceptions\":[],\"totalDocs\":115545}";
+    BrokerResponse brokerResponse = BrokerResponse.fromJson(OBJECT_READER.readTree(responseJson));
+    Assert.assertEquals("1", brokerResponse.getRequestId());
+    Assert.assertTrue(!brokerResponse.hasExceptions());
+  }
+
+  @Test
+  public void parseResultWithoutRequestId()
+      throws JsonProcessingException {
+    String responseJson = "{\"traceInfo\":{},\"numDocsScanned\":36542,"
+        + "\"aggregationResults\":[{\"function\":\"count_star\",\"value\":\"36542\"}],\"timeUsedMs\":30,"
+        + "\"segmentStatistics\":[],\"exceptions\":[],\"totalDocs\":115545}";
+    BrokerResponse brokerResponse = BrokerResponse.fromJson(OBJECT_READER.readTree(responseJson));
+    Assert.assertEquals("unknown", brokerResponse.getRequestId());
+    Assert.assertTrue(!brokerResponse.hasExceptions());
+  }
+}
diff --git a/pinot-clients/pinot-java-client/src/test/resources/org/apache/pinot/client/aggregation.json b/pinot-clients/pinot-java-client/src/test/resources/org/apache/pinot/client/aggregation.json
index ff9a775e74..511b704049 100644
--- a/pinot-clients/pinot-java-client/src/test/resources/org/apache/pinot/client/aggregation.json
+++ b/pinot-clients/pinot-java-client/src/test/resources/org/apache/pinot/client/aggregation.json
@@ -1,4 +1,5 @@
 {
+  "requestId": "1",
   "traceInfo": {},
   "numDocsScanned": 36542,
   "aggregationResults": [
diff --git a/pinot-clients/pinot-java-client/src/test/resources/org/apache/pinot/client/aggregationGroupBy.json b/pinot-clients/pinot-java-client/src/test/resources/org/apache/pinot/client/aggregationGroupBy.json
index bfe0b26c25..b589cef4c0 100644
--- a/pinot-clients/pinot-java-client/src/test/resources/org/apache/pinot/client/aggregationGroupBy.json
+++ b/pinot-clients/pinot-java-client/src/test/resources/org/apache/pinot/client/aggregationGroupBy.json
@@ -1,4 +1,5 @@
 {
+  "requestId": "1",
   "traceInfo": {},
   "numDocsScanned": 22598,
   "aggregationResults": [
diff --git a/pinot-clients/pinot-java-client/src/test/resources/org/apache/pinot/client/exception.json b/pinot-clients/pinot-java-client/src/test/resources/org/apache/pinot/client/exception.json
index bc7c99ccb5..7a4e7e4fd5 100644
--- a/pinot-clients/pinot-java-client/src/test/resources/org/apache/pinot/client/exception.json
+++ b/pinot-clients/pinot-java-client/src/test/resources/org/apache/pinot/client/exception.json
@@ -1,4 +1,5 @@
 {
+  "requestId": "1",
   "traceInfo": {},
   "numDocsScanned": 0,
   "aggregationResults": [],
diff --git a/pinot-clients/pinot-java-client/src/test/resources/org/apache/pinot/client/selection.json b/pinot-clients/pinot-java-client/src/test/resources/org/apache/pinot/client/selection.json
index e3068dc263..7825e87620 100644
--- a/pinot-clients/pinot-java-client/src/test/resources/org/apache/pinot/client/selection.json
+++ b/pinot-clients/pinot-java-client/src/test/resources/org/apache/pinot/client/selection.json
@@ -1,4 +1,5 @@
 {
+  "requestId": "1",
   "selectionResults": {
     "columns": [
       "ActualElapsedTime",
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
index e50ea2b2ee..3ad49460bc 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
@@ -325,4 +325,14 @@ public interface BrokerResponse {
    * Set the total number of segments with a MatchAllFilterOperator when Explain Plan is called
    */
   void setExplainPlanNumMatchAllFilterSegments(long explainPlanNumMatchAllFilterSegments);
+
+  /**
+   * get request ID for the query
+   */
+  String getRequestId();
+
+  /**
+   * set request ID generated by broker
+   */
+  void setRequestId(String requestId);
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index 76ed75a466..f2580b9cc3 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -41,7 +41,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
  * Supports serialization via JSON.
  */
 @JsonPropertyOrder({
-    "resultTable", "exceptions", "numServersQueried", "numServersResponded", "numSegmentsQueried",
+    "resultTable", "requestId", "exceptions", "numServersQueried", "numServersResponded", "numSegmentsQueried",
     "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
     "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter",
     "numGroupsLimitReached", "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs",
@@ -57,6 +57,7 @@ public class BrokerResponseNative implements BrokerResponse {
       new BrokerResponseNative(QueryException.TABLE_DOES_NOT_EXIST_ERROR);
   public static final BrokerResponseNative BROKER_ONLY_EXPLAIN_PLAN_OUTPUT = getBrokerResponseExplainPlanOutput();
 
+  private String _requestId;
   private int _numServersQueried = 0;
   private int _numServersResponded = 0;
   private long _numDocsScanned = 0L;
@@ -557,4 +558,16 @@ public class BrokerResponseNative implements BrokerResponse {
   public int getExceptionsSize() {
     return _processingExceptions.size();
   }
+
+  @JsonProperty("requestId")
+  @Override
+  public String getRequestId() {
+    return _requestId;
+  }
+
+  @JsonProperty("requestId")
+  @Override
+  public void setRequestId(String requestId) {
+    _requestId = requestId;
+  }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
index 8465d47f81..95943881dc 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
@@ -46,7 +46,6 @@ import org.apache.pinot.spi.utils.JsonUtils;
     "realtimeTotalCpuTimeNs", "segmentStatistics", "traceInfo"
 })
 public class BrokerResponseNativeV2 extends BrokerResponseNative {
-  private String _requestId;
 
   private final Map<Integer, BrokerResponseStats> _stageIdStats = new HashMap<>();
 
@@ -93,13 +92,4 @@ public class BrokerResponseNativeV2 extends BrokerResponseNative {
   public Map<Integer, BrokerResponseStats> getStageIdStats() {
     return _stageIdStats;
   }
-
-  @JsonProperty("requestId")
-  public String getRequestId() {
-    return _requestId;
-  }
-
-  public void setRequestId(String requestId) {
-    _requestId = requestId;
-  }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
index 23482e905c..f2361a7908 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
@@ -33,7 +33,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
 //  same metadataKey
 // TODO: Replace member fields with a simple map of <MetadataKey, Object>
 // TODO: Add a subStat field, stage level subStats will contain each operator stats
-@JsonPropertyOrder({"exceptions", "numBlocks", "numRows", "stageExecutionTimeMs", "stageExecutionUnit",
+@JsonPropertyOrder({"requestId", "exceptions", "numBlocks", "numRows", "stageExecutionTimeMs", "stageExecutionUnit",
     "stageExecWallTimeMs", "stageExecEndTimeMs", "numServersQueried", "numServersResponded", "numSegmentsQueried",
     "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
     "numConsumingSegmentsMatched", "numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org