You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by mc...@apache.org on 2018/12/13 02:02:17 UTC
[incubator-pinot] 01/01: Adding a class to get statistics about a
request (query) processed by Pinot
This is an automated email from the ASF dual-hosted git repository.
mcvsubbu pushed a commit to branch request-processing-info
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 74d4dcb9da103ea506cb0f142daf665f3b9b24ab
Author: Subbu Subramaniam <ss...@linkedin.com>
AuthorDate: Tue Dec 11 20:15:31 2018 -0800
Adding a class to get statistics about a request (query) processed by Pinot
This will enable us to publish such statistics about each request to a stream.
This stream can eventually be consumed by Pinot, and data from the stream
analyzed on a per-table basis.
---
.../pinot/broker/api/RequestStatistics.java | 171 +++++++++++++++++++++
.../broker/api/resources/PinotClientRequest.java | 5 +-
.../requesthandler/BaseBrokerRequestHandler.java | 23 ++-
.../requesthandler/BrokerRequestHandler.java | 4 +-
4 files changed, 199 insertions(+), 4 deletions(-)
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/RequestStatistics.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/RequestStatistics.java
new file mode 100644
index 0000000..02a0ba7
--- /dev/null
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/RequestStatistics.java
@@ -0,0 +1,171 @@
+/**
+ * Copyright (C) 2014-2015 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.linkedin.pinot.broker.api;
+
+import com.linkedin.pinot.common.request.BrokerRequest;
+import com.linkedin.pinot.common.request.QueryType;
+import com.linkedin.pinot.common.response.BrokerResponse;
+
+
+/**
+ * A class to hold the details regarding a request and the statistics.
+ * This object can be used to publish the query processing statistics to a stream for
+ * post-processing at a finer level than metrics.
+ */
+public class RequestStatistics {
+ private int _errorCode = 0;
+ private String _pql;
+ private String _tableName = "NotYetParsed";
+ private long _processingTimeMillis = -1;
+
+ private long _totalDocs;
+ private long _numDocsScanned;
+ private long _numEntriesScannedInFilter;
+ private long _numEntriesScannedPostFilter;
+ private long _numSegmentsQueried;
+ private long _numSegmentsProcessed;
+ private long _numSegmentsMatched;
+ private int _numServersQueried;
+ private int _numServersResponded;
+ private boolean _isNumGroupsLimitReached;
+ private int _numExceptions;
+ private boolean _isGroupBy;
+ private boolean _isAggregation;
+
+ public enum FanoutType {
+ OFFLINE,
+ REALTIME,
+ HYBRID
+ }
+
+ private FanoutType _fanoutType;
+
+ public RequestStatistics() {
+ }
+
+ public void setErrorCode(int errorCode) {
+ _errorCode = errorCode;
+ }
+
+ public void setPql(String pql) {
+ _pql = pql;
+ }
+
+ public void setTableName(String tableName) {
+ _tableName = tableName;
+ }
+
+ public void setQueryProcessingTime(long processingTimeMillis) {
+ _processingTimeMillis = processingTimeMillis;
+ }
+
+ public void setStatistics(BrokerResponse brokerResponse) {
+ _totalDocs = brokerResponse.getTotalDocs();
+ _numDocsScanned = brokerResponse.getNumDocsScanned();
+ _numEntriesScannedInFilter = brokerResponse.getNumEntriesScannedInFilter();
+ _numEntriesScannedPostFilter = brokerResponse.getNumEntriesScannedPostFilter();
+ _numSegmentsQueried = brokerResponse.getNumSegmentsQueried();
+ _numSegmentsProcessed = brokerResponse.getNumSegmentsProcessed();
+ _numSegmentsMatched = brokerResponse.getNumSegmentsMatched();
+ _numServersQueried = brokerResponse.getNumServersQueried();
+ _numSegmentsProcessed = brokerResponse.getNumSegmentsProcessed();
+ _numServersResponded = brokerResponse.getNumServersResponded();
+ _isNumGroupsLimitReached = brokerResponse.isNumGroupsLimitReached();
+ _numExceptions = brokerResponse.getExceptionsSize();
+ }
+
+ public void setStatistics(BrokerRequest brokerRequest) {
+ _isGroupBy = brokerRequest.isSetGroupBy();
+ _isAggregation = brokerRequest.isSetAggregationsInfo();
+ }
+
+ public void setFanoutType(FanoutType fanoutType) {
+ _fanoutType = fanoutType;
+ }
+
+ public FanoutType getFanoutType() {
+ return _fanoutType;
+ }
+
+ public boolean isAggregation() {
+ return _isAggregation;
+ }
+
+ public boolean isGroupBy() {
+ return _isGroupBy;
+ }
+
+ public int getErrorCode() {
+ return _errorCode;
+ }
+
+ public String getPql() {
+ return _pql;
+ }
+
+ public String getTableName() {
+ return _tableName;
+ }
+
+ public long getProcessingTimeMillis() {
+ return _processingTimeMillis;
+ }
+
+ public long getTotalDocs() {
+ return _totalDocs;
+ }
+
+ public long getNumDocsScanned() {
+ return _numDocsScanned;
+ }
+
+ public long getNumEntriesScannedInFilter() {
+ return _numEntriesScannedInFilter;
+ }
+
+ public long getNumEntriesScannedPostFilter() {
+ return _numEntriesScannedPostFilter;
+ }
+
+ public long getNumSegmentsQueried() {
+ return _numSegmentsQueried;
+ }
+
+ public long getNumSegmentsProcessed() {
+ return _numSegmentsProcessed;
+ }
+
+ public long getNumSegmentsMatched() {
+ return _numSegmentsMatched;
+ }
+
+ public int getNumServersQueried() {
+ return _numServersQueried;
+ }
+
+ public int getNumServersResponded() {
+ return _numServersResponded;
+ }
+
+ public boolean isNumGroupsLimitReached() {
+ return _isNumGroupsLimitReached;
+ }
+
+ public int getNumExceptions() {
+ return _numExceptions;
+ }
+}
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/resources/PinotClientRequest.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/resources/PinotClientRequest.java
index 5b2bc6e..eccd43a 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/resources/PinotClientRequest.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/api/resources/PinotClientRequest.java
@@ -15,6 +15,7 @@
*/
package com.linkedin.pinot.broker.api.resources;
+import com.linkedin.pinot.broker.api.RequestStatistics;
import com.linkedin.pinot.broker.requesthandler.BrokerRequestHandler;
import com.linkedin.pinot.common.metrics.BrokerMeter;
import com.linkedin.pinot.common.metrics.BrokerMetrics;
@@ -74,7 +75,7 @@ public class PinotClientRequest {
if (debugOptions != null) {
requestJson.put(DEBUG_OPTIONS, debugOptions);
}
- BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, null);
+ BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, null, new RequestStatistics());
return brokerResponse.toJsonString();
} catch (Exception e) {
LOGGER.error("Caught exception while processing GET request", e);
@@ -94,7 +95,7 @@ public class PinotClientRequest {
public String processQueryPost(String query) {
try {
JSONObject requestJson = new JSONObject(query);
- BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, null);
+ BrokerResponse brokerResponse = requestHandler.handleRequest(requestJson, null, new RequestStatistics());
return brokerResponse.toJsonString();
} catch (Exception e) {
LOGGER.error("Caught exception while processing GET request", e);
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index c06c3ff..456a7bf 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -16,6 +16,7 @@
package com.linkedin.pinot.broker.requesthandler;
import com.google.common.base.Splitter;
+import com.linkedin.pinot.broker.api.RequestStatistics;
import com.linkedin.pinot.broker.api.RequesterIdentity;
import com.linkedin.pinot.broker.broker.AccessControlFactory;
import com.linkedin.pinot.broker.queryquota.TableQueryQuotaManager;
@@ -105,11 +106,13 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
}
@Override
- public BrokerResponse handleRequest(JSONObject request, @Nullable RequesterIdentity requesterIdentity)
+ public BrokerResponse handleRequest(JSONObject request, @Nullable RequesterIdentity requesterIdentity,
+ RequestStatistics requestStatistics)
throws Exception {
long requestId = _requestIdGenerator.incrementAndGet();
String query = request.getString(PQL);
LOGGER.debug("Query string for request {}: {}", requestId, query);
+ requestStatistics.setPql(query);
// Compile the request
long compilationStartTimeNs = System.nanoTime();
@@ -119,10 +122,13 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
} catch (Exception e) {
LOGGER.info("Caught exception while compiling request {}: {}, {}", requestId, query, e.getMessage());
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS, 1);
+ requestStatistics.setErrorCode(QueryException.PQL_PARSING_ERROR_CODE);
return new BrokerResponseNative(QueryException.getException(QueryException.PQL_PARSING_ERROR, e));
}
String tableName = brokerRequest.getQuerySource().getTableName();
String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+ requestStatistics.setTableName(rawTableName);
+ requestStatistics.setStatistics(brokerRequest);
long compilationEndTimeNs = System.nanoTime();
_brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.REQUEST_COMPILATION,
compilationEndTimeNs - compilationStartTimeNs);
@@ -133,6 +139,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
if (!hasAccess) {
_brokerMetrics.addMeteredTableValue(brokerRequest.getQuerySource().getTableName(),
BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1);
+ requestStatistics.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE);
return new BrokerResponseNative(QueryException.ACCESS_DENIED_ERROR);
}
_brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.AUTHORIZATION,
@@ -166,6 +173,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
if ((offlineTableName == null) && (realtimeTableName == null)) {
// No table matches the request
LOGGER.info("No table matches for request {}: {}", requestId, query);
+ requestStatistics.setErrorCode(QueryException.BROKER_RESOURCE_MISSING_ERROR_CODE);
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.RESOURCE_MISSING_EXCEPTIONS, 1);
return BrokerResponseNative.NO_TABLE_RESULT;
}
@@ -175,6 +183,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
String errorMessage =
String.format("Request %d exceeds query quota for table:%s, query:%s", requestId, tableName, query);
LOGGER.info(errorMessage);
+ requestStatistics.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE);
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_QUOTA_EXCEEDED, 1);
return new BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR, errorMessage));
}
@@ -184,6 +193,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
validateRequest(brokerRequest);
} catch (Exception e) {
LOGGER.info("Caught exception while validating request {}: {}, {}", requestId, query, e.getMessage());
+ requestStatistics.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1);
return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, e));
}
@@ -228,6 +238,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
Map<String, List<String>> offlineRoutingTable = null;
Map<String, List<String>> realtimeRoutingTable = null;
if (offlineBrokerRequest != null) {
+ if (realtimeBrokerRequest == null) {
+ requestStatistics.setFanoutType(RequestStatistics.FanoutType.OFFLINE);
+ }
offlineRoutingTable = _routingTable.getRoutingTable(new RoutingTableLookupRequest(offlineBrokerRequest));
if (offlineRoutingTable.isEmpty()) {
LOGGER.debug("No OFFLINE server found for request {}: {}", requestId, query);
@@ -236,6 +249,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
}
}
if (realtimeBrokerRequest != null) {
+ if (offlineBrokerRequest == null) {
+ requestStatistics.setFanoutType(RequestStatistics.FanoutType.REALTIME);
+ }
realtimeRoutingTable = _routingTable.getRoutingTable(new RoutingTableLookupRequest(realtimeBrokerRequest));
if (realtimeRoutingTable.isEmpty()) {
LOGGER.debug("No REALTIME server found for request {}: {}", requestId, query);
@@ -243,6 +259,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
realtimeRoutingTable = null;
}
}
+ if (offlineBrokerRequest != null && realtimeBrokerRequest != null) {
+ requestStatistics.setFanoutType(RequestStatistics.FanoutType.HYBRID);
+ }
if (offlineBrokerRequest == null && realtimeBrokerRequest == null) {
LOGGER.info("No server found for request {}: {}", requestId, query);
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.NO_SERVER_FOUND_EXCEPTIONS, 1);
@@ -269,6 +288,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
// Set total query processing time
long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(executionEndTimeNs - compilationStartTimeNs);
brokerResponse.setTimeUsedMs(totalTimeMs);
+ requestStatistics.setQueryProcessingTime(totalTimeMs);
+ requestStatistics.setStatistics(brokerResponse);
LOGGER.debug("Broker Response: {}", brokerResponse);
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BrokerRequestHandler.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BrokerRequestHandler.java
index 4513741..5cc2bd9 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/requesthandler/BrokerRequestHandler.java
@@ -15,6 +15,7 @@
*/
package com.linkedin.pinot.broker.requesthandler;
+import com.linkedin.pinot.broker.api.RequestStatistics;
import com.linkedin.pinot.broker.api.RequesterIdentity;
import com.linkedin.pinot.common.response.BrokerResponse;
import javax.annotation.Nullable;
@@ -29,5 +30,6 @@ public interface BrokerRequestHandler {
void shutDown();
- BrokerResponse handleRequest(JSONObject request, @Nullable RequesterIdentity requesterIdentity) throws Exception;
+ BrokerResponse handleRequest(JSONObject request, @Nullable RequesterIdentity requesterIdentity,
+ RequestStatistics requestStatistics) throws Exception;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org