You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by mc...@apache.org on 2018/12/13 02:02:17 UTC

[incubator-pinot] 01/01: Adding a class to get statistics about a request (query) processed by Pinot

This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch request-processing-info
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 74d4dcb9da103ea506cb0f142daf665f3b9b24ab
Author: Subbu Subramaniam <ss...@linkedin.com>
AuthorDate: Tue Dec 11 20:15:31 2018 -0800

    Adding a class to get statistics about a request (query) processed by Pinot
    
    This will enable us to publish such statistics about each request to a stream.
    This stream can eventually be consumed by Pinot, and data from the stream
    analyzed on a per-table basis.
---
 .../pinot/broker/api/RequestStatistics.java        | 171 +++++++++++++++++++++
 .../broker/api/resources/PinotClientRequest.java   |   5 +-
 .../requesthandler/BaseBrokerRequestHandler.java   |  23 ++-
 .../requesthandler/BrokerRequestHandler.java       |   4 +-
 4 files changed, 199 insertions(+), 4 deletions(-)

diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/RequestStatistics.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/RequestStatistics.java
new file mode 100644
index 0000000..02a0ba7
--- /dev/null
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/RequestStatistics.java
@@ -0,0 +1,171 @@
+/**
+ * Copyright (C) 2014-2015 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.linkedin.pinot.broker.api;
+
+import com.linkedin.pinot.common.request.BrokerRequest;
+import com.linkedin.pinot.common.request.QueryType;
+import com.linkedin.pinot.common.response.BrokerResponse;
+
+
+/**
+ * A class to hold the details regarding a request and the statistics.
+ * This object can be used to publish the query processing statistics to a stream for
+ * post-processing at a finer level than metrics.
+ */
+public class RequestStatistics {
+  private int _errorCode = 0;
+  private String _pql;
+  private String _tableName = "NotYetParsed";
+  private long _processingTimeMillis = -1;
+
+  private long _totalDocs;
+  private long _numDocsScanned;
+  private long _numEntriesScannedInFilter;
+  private long _numEntriesScannedPostFilter;
+  private long _numSegmentsQueried;
+  private long _numSegmentsProcessed;
+  private long _numSegmentsMatched;
+  private int _numServersQueried;
+  private int _numServersResponded;
+  private boolean _isNumGroupsLimitReached;
+  private int _numExceptions;
+  private boolean _isGroupBy;
+  private boolean _isAggregation;
+
+  public enum FanoutType {
+    OFFLINE,
+    REALTIME,
+    HYBRID
+  }
+
+  private FanoutType _fanoutType;
+
+  public RequestStatistics() {
+  }
+
+  public void setErrorCode(int errorCode) {
+    _errorCode = errorCode;
+  }
+
+  public void setPql(String pql) {
+    _pql = pql;
+  }
+
+  public void setTableName(String tableName) {
+    _tableName = tableName;
+  }
+
+  public void setQueryProcessingTime(long processingTimeMillis) {
+    _processingTimeMillis = processingTimeMillis;
+  }
+
+  public void setStatistics(BrokerResponse brokerResponse) {
+    _totalDocs = brokerResponse.getTotalDocs();
+    _numDocsScanned = brokerResponse.getNumDocsScanned();
+    _numEntriesScannedInFilter = brokerResponse.getNumEntriesScannedInFilter();
+    _numEntriesScannedPostFilter = brokerResponse.getNumEntriesScannedPostFilter();
+    _numSegmentsQueried = brokerResponse.getNumSegmentsQueried();
+    _numSegmentsProcessed = brokerResponse.getNumSegmentsProcessed();
+    _numSegmentsMatched = brokerResponse.getNumSegmentsMatched();
+    _numServersQueried = brokerResponse.getNumServersQueried();
+    _numSegmentsProcessed = brokerResponse.getNumSegmentsProcessed();
+    _numServersResponded = brokerResponse.getNumServersResponded();
+    _isNumGroupsLimitReached = brokerResponse.isNumGroupsLimitReached();
+    _numExceptions = brokerResponse.getExceptionsSize();
+  }
+
+  public void setStatistics(BrokerRequest brokerRequest) {
+    _isGroupBy = brokerRequest.isSetGroupBy();
+    _isAggregation = brokerRequest.isSetAggregationsInfo();
+  }
+
+  public void setFanoutType(FanoutType fanoutType) {
+    _fanoutType = fanoutType;
+  }
+
+  public FanoutType getFanoutType() {
+    return _fanoutType;
+  }
+
+  public boolean isAggregation() {
+    return _isAggregation;
+  }
+
+  public boolean isGroupBy() {
+    return _isGroupBy;
+  }
+
+  public int getErrorCode() {
+    return _errorCode;
+  }
+
+  public String getPql() {
+    return _pql;
+  }
+
+  public String getTableName() {
+    return _tableName;
+  }
+
+  public long getProcessingTimeMillis() {
+    return _processingTimeMillis;
+  }
+
+  public long getTotalDocs() {
+    return _totalDocs;
+  }
+
+  public long getNumDocsScanned() {
+    return _numDocsScanned;
+  }
+
+  public long getNumEntriesScannedInFilter() {
+    return _numEntriesScannedInFilter;
+  }
+
+  public long getNumEntriesScannedPostFilter() {
+    return _numEntriesScannedPostFilter;
+  }
+
+  public long getNumSegmentsQueried() {
+    return _numSegmentsQueried;
+  }
+
+  public long getNumSegmentsProcessed() {
+    return _numSegmentsProcessed;
+  }
+
+  public long getNumSegmentsMatched() {
+    return _numSegmentsMatched;
+  }
+
+  public int getNumServersQueried() {
+    return _numServersQueried;
+  }
+
+  public int getNumServersResponded() {
+    return _numServersResponded;
+  }
+
+  public boolean isNumGroupsLimitReached() {
+    return _isNumGroupsLimitReached;
+  }
+
+  public int getNumExceptions() {
+    return _numExceptions;
+  }
+}
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/resources/PinotClientRequest.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/resources/PinotClientRequest.java
index 5b2bc6e..eccd43a 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/resources/PinotClientRequest.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/resources/PinotClientRequest.java
@@ -15,6 +15,7 @@
  */
 package com.linkedin.pinot.broker.api.resources;
 
+import com.linkedin.pinot.broker.api.RequestStatistics;
 import com.linkedin.pinot.broker.requesthandler.BrokerRequestHandler;
 import com.linkedin.pinot.common.metrics.BrokerMeter;
 import com.linkedin.pinot.common.metrics.BrokerMetrics;
@@ -74,7 +75,7 @@ public class PinotClientRequest {
       if (debugOptions != null) {
         requestJson.put(DEBUG_OPTIONS, debugOptions);
       }
-      BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, null);
+      BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, null, new RequestStatistics());
       return brokerResponse.toJsonString();
     } catch (Exception e) {
       LOGGER.error("Caught exception while processing GET request", e);
@@ -94,7 +95,7 @@ public class PinotClientRequest {
   public String processQueryPost(String query) {
     try {
       JSONObject requestJson = new JSONObject(query);
-      BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, null);
+      BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, null, new RequestStatistics());
       return brokerResponse.toJsonString();
     } catch (Exception e) {
       LOGGER.error("Caught exception while processing GET request", e);
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index c06c3ff..456a7bf 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -16,6 +16,7 @@
 package com.linkedin.pinot.broker.requesthandler;
 
 import com.google.common.base.Splitter;
+import com.linkedin.pinot.broker.api.RequestStatistics;
 import com.linkedin.pinot.broker.api.RequesterIdentity;
 import com.linkedin.pinot.broker.broker.AccessControlFactory;
 import com.linkedin.pinot.broker.queryquota.TableQueryQuotaManager;
@@ -105,11 +106,13 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   }
 
   @Override
-  public BrokerResponse handleRequest(JSONObject request, @Nullable RequesterIdentity requesterIdentity)
+  public BrokerResponse handleRequest(JSONObject request, @Nullable RequesterIdentity requesterIdentity,
+      RequestStatistics requestStatistics)
       throws Exception {
     long requestId = _requestIdGenerator.incrementAndGet();
     String query = request.getString(PQL);
     LOGGER.debug("Query string for request {}: {}", requestId, query);
+    requestStatistics.setPql(query);
 
     // Compile the request
     long compilationStartTimeNs = System.nanoTime();
@@ -119,10 +122,13 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     } catch (Exception e) {
       LOGGER.info("Caught exception while compiling request {}: {}, {}", requestId, query, e.getMessage());
       _brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS, 1);
+      requestStatistics.setErrorCode(QueryException.PQL_PARSING_ERROR_CODE);
       return new BrokerResponseNative(QueryException.getException(QueryException.PQL_PARSING_ERROR, e));
     }
     String tableName = brokerRequest.getQuerySource().getTableName();
     String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    requestStatistics.setTableName(rawTableName);
+    requestStatistics.setStatistics(brokerRequest);
     long compilationEndTimeNs = System.nanoTime();
     _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.REQUEST_COMPILATION,
         compilationEndTimeNs - compilationStartTimeNs);
@@ -133,6 +139,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     if (!hasAccess) {
       _brokerMetrics.addMeteredTableValue(brokerRequest.getQuerySource().getTableName(),
           BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1);
+      requestStatistics.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE);
       return new BrokerResponseNative(QueryException.ACCESS_DENIED_ERROR);
     }
     _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.AUTHORIZATION,
@@ -166,6 +173,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     if ((offlineTableName == null) && (realtimeTableName == null)) {
       // No table matches the request
       LOGGER.info("No table matches for request {}: {}", requestId, query);
+      requestStatistics.setErrorCode(QueryException.BROKER_RESOURCE_MISSING_ERROR_CODE);
       _brokerMetrics.addMeteredGlobalValue(BrokerMeter.RESOURCE_MISSING_EXCEPTIONS, 1);
       return BrokerResponseNative.NO_TABLE_RESULT;
     }
@@ -175,6 +183,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
       String errorMessage =
           String.format("Request %d exceeds query quota for table:%s, query:%s", requestId, tableName, query);
       LOGGER.info(errorMessage);
+      requestStatistics.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE);
       _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_QUOTA_EXCEEDED, 1);
       return new BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR, errorMessage));
     }
@@ -184,6 +193,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
       validateRequest(brokerRequest);
     } catch (Exception e) {
       LOGGER.info("Caught exception while validating request {}: {}, {}", requestId, query, e.getMessage());
+      requestStatistics.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
       _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1);
       return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, e));
     }
@@ -228,6 +238,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     Map<String, List<String>> offlineRoutingTable = null;
     Map<String, List<String>> realtimeRoutingTable = null;
     if (offlineBrokerRequest != null) {
+      if (realtimeBrokerRequest == null) {
+        requestStatistics.setFanoutType(RequestStatistics.FanoutType.OFFLINE);
+      }
       offlineRoutingTable = _routingTable.getRoutingTable(new RoutingTableLookupRequest(offlineBrokerRequest));
       if (offlineRoutingTable.isEmpty()) {
         LOGGER.debug("No OFFLINE server found for request {}: {}", requestId, query);
@@ -236,6 +249,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
       }
     }
     if (realtimeBrokerRequest != null) {
+      if (offlineBrokerRequest == null) {
+        requestStatistics.setFanoutType(RequestStatistics.FanoutType.REALTIME);
+      }
       realtimeRoutingTable = _routingTable.getRoutingTable(new RoutingTableLookupRequest(realtimeBrokerRequest));
       if (realtimeRoutingTable.isEmpty()) {
         LOGGER.debug("No REALTIME server found for request {}: {}", requestId, query);
@@ -243,6 +259,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
         realtimeRoutingTable = null;
       }
     }
+    if (offlineBrokerRequest != null && realtimeBrokerRequest != null) {
+      requestStatistics.setFanoutType(RequestStatistics.FanoutType.HYBRID);
+    }
     if (offlineBrokerRequest == null && realtimeBrokerRequest == null) {
       LOGGER.info("No server found for request {}: {}", requestId, query);
       _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.NO_SERVER_FOUND_EXCEPTIONS, 1);
@@ -269,6 +288,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     // Set total query processing time
     long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(executionEndTimeNs - compilationStartTimeNs);
     brokerResponse.setTimeUsedMs(totalTimeMs);
+    requestStatistics.setQueryProcessingTime(totalTimeMs);
+    requestStatistics.setStatistics(brokerResponse);
 
     LOGGER.debug("Broker Response: {}", brokerResponse);
 
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BrokerRequestHandler.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BrokerRequestHandler.java
index 4513741..5cc2bd9 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BrokerRequestHandler.java
@@ -15,6 +15,7 @@
  */
 package com.linkedin.pinot.broker.requesthandler;
 
+import com.linkedin.pinot.broker.api.RequestStatistics;
 import com.linkedin.pinot.broker.api.RequesterIdentity;
 import com.linkedin.pinot.common.response.BrokerResponse;
 import javax.annotation.Nullable;
@@ -29,5 +30,6 @@ public interface BrokerRequestHandler {
 
   void shutDown();
 
-  BrokerResponse handleRequest(JSONObject request, @Nullable RequesterIdentity requesterIdentity) throws Exception;
+  BrokerResponse handleRequest(JSONObject request, @Nullable RequesterIdentity requesterIdentity,
+      RequestStatistics requestStatistics) throws Exception;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org