You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by su...@apache.org on 2019/05/21 17:56:15 UTC
[incubator-pinot] 01/01: Emit freshnessLag metric from broker for
queries hitting consuming segments
This is an automated email from the ASF dual-hosted git repository.
sunithabeeram pushed a commit to branch consumingStats
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit a5066fbe5ebd2e22311db4c37d818e8227e98d97
Author: Sunitha Beeram <sb...@sbeeram-ld2.linkedin.biz>
AuthorDate: Tue May 21 10:55:51 2019 -0700
Emit freshnessLag metric from broker for queries hitting consuming segments
---
.../requesthandler/BaseBrokerRequestHandler.java | 6 ++++--
.../org/apache/pinot/common/metrics/BrokerMeter.java | 3 +++
.../apache/pinot/common/response/BrokerResponse.java | 4 ++++
.../common/response/broker/BrokerResponseNative.java | 1 -
.../query/executor/ServerQueryExecutorV1Impl.java | 4 +++-
.../pinot/core/query/reduce/BrokerReduceService.java | 20 ++++++++++++++++----
.../pinot/core/query/scheduler/QueryScheduler.java | 12 +++++++++---
7 files changed, 39 insertions(+), 11 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 7bc689f..5f503dd 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -308,12 +308,14 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
if (_queryLogRateLimiter.tryAcquire() || forceLog(brokerResponse, totalTimeMs)) {
// Table name might have been changed (with suffix _OFFLINE/_REALTIME appended)
LOGGER.info(
- "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{}, segments(queried/processed/matched):{}/{}/{} "
- + "servers:{}/{}, groupLimitReached:{}, exceptions:{}, serverStats:{}, query:{}", requestId,
+ "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{}, segments(queried/processed/matched/consuming):{}/{}/{}/{},"
+ + " consumingFreshnessTimeMs:{},"
+ + " servers:{}/{}, groupLimitReached:{}, exceptions:{}, serverStats:{}, query:{}", requestId,
brokerRequest.getQuerySource().getTableName(), totalTimeMs, brokerResponse.getNumDocsScanned(),
brokerResponse.getTotalDocs(), brokerResponse.getNumEntriesScannedInFilter(),
brokerResponse.getNumEntriesScannedPostFilter(), brokerResponse.getNumSegmentsQueried(),
brokerResponse.getNumSegmentsProcessed(), brokerResponse.getNumSegmentsMatched(),
+ brokerResponse.getNumConsumingSegmentsQueried(), brokerResponse.getMinConsumingFreshnessTimeMs(),
brokerResponse.getNumServersResponded(), brokerResponse.getNumServersQueried(),
brokerResponse.isNumGroupsLimitReached(), brokerResponse.getExceptionsSize(), serverStats.getServerStats(),
StringUtils.substring(query, 0, _queryLogLength));
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index 170e745..facd15f 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -63,6 +63,9 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
ENTRIES_SCANNED_IN_FILTER("documents", false),
ENTRIES_SCANNED_POST_FILTER("documents", false),
+ // metric tracking the freshness lag for consuming segments
+ FRESHNESS_LAG_MS("latency", false),
+
REQUEST_CONNECTION_TIMEOUTS("timeouts", false),
HELIX_ZOOKEEPER_RECONNECTS("reconnects", true),
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
index 72b95e2..0533318 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
@@ -97,6 +97,10 @@ public interface BrokerResponse {
*/
long getNumSegmentsMatched();
+ long getNumConsumingSegmentsQueried();
+
+ long getMinConsumingFreshnessTimeMs();
+
/**
* Get total number of documents within the table hit.
*/
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index 214d942..1ccedd6 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -227,7 +227,6 @@ public class BrokerResponseNative implements BrokerResponse {
_minConsumingFreshnessTimeMs = minConsumingFreshnessTimeMs;
}
-
@JsonProperty("totalDocs")
@Override
public long getTotalDocs() {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 3058539..5f70dc2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -170,11 +170,13 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
}
}
- long minConsumingFreshnessTimeMs = Long.MAX_VALUE;
+ long minConsumingFreshnessTimeMs = -1;
if (numConsumingSegmentsQueried > 0) {
if (minIngestionTimeMs == Long.MAX_VALUE) {
LOGGER.debug("Did not find valid ingestionTimestamp across consuming segments! Using indexTime instead");
minConsumingFreshnessTimeMs = minIndexTimeMs;
+ } else {
+ minConsumingFreshnessTimeMs = minIngestionTimeMs;
}
LOGGER.debug("Querying {} consuming segments with min minConsumingFreshnessTimeMs {}", numConsumingSegmentsQueried, minConsumingFreshnessTimeMs);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index d539bf7..dde809e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -64,6 +64,7 @@ import org.slf4j.LoggerFactory;
@ThreadSafe
public class BrokerReduceService implements ReduceService<BrokerResponseNative> {
private static final Logger LOGGER = LoggerFactory.getLogger(BrokerReduceService.class);
+ private static final long INVALID_FRESHNESS = -1;
@Nonnull
@Override
@@ -143,9 +144,13 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
numConsumingSegmentsQueried += Long.parseLong(numConsumingString);
}
- String minConsumingIndexTsString = metadata.get(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS);
- if (minConsumingIndexTsString != null) {
- minConsumingFreshnessTimeMs = Math.min(Long.parseLong(minConsumingIndexTsString), minConsumingFreshnessTimeMs);
+ String minConsumingFreshnessTimeMsString = metadata.get(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS);
+ if (minConsumingFreshnessTimeMsString != null) {
+ long freshness = Long.parseLong(minConsumingFreshnessTimeMsString);
+ // ignore invalid values
+ if (freshness > INVALID_FRESHNESS) {
+ minConsumingFreshnessTimeMs = Math.min(freshness, minConsumingFreshnessTimeMs);
+ }
}
String numTotalRawDocsString = metadata.get(DataTable.TOTAL_DOCS_METADATA_KEY);
@@ -182,7 +187,9 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
brokerResponseNative.setNumGroupsLimitReached(numGroupsLimitReached);
if (numConsumingSegmentsQueried > 0) {
if (minConsumingFreshnessTimeMs == Long.MAX_VALUE) {
- LOGGER.error("Invalid lastIndexedTimestamp across {} consuming segments", numConsumingSegmentsQueried);
+ LOGGER.error("Invalid freshness time across {} consuming segments", numConsumingSegmentsQueried);
+ // use the invalid value (-1) for clear logging
+ minConsumingFreshnessTimeMs = INVALID_FRESHNESS;
}
brokerResponseNative.setNumConsumingSegmentsQueried(numConsumingSegmentsQueried);
brokerResponseNative.setMinConsumingFreshnessTimeMs(minConsumingFreshnessTimeMs);
@@ -197,6 +204,11 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
.addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_IN_FILTER, numEntriesScannedInFilter);
brokerMetrics
.addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_POST_FILTER, numEntriesScannedPostFilter);
+
+ if (numConsumingSegmentsQueried > 0 && minConsumingFreshnessTimeMs > 0) {
+ brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.FRESHNESS_LAG_MS,
+ System.currentTimeMillis() - minConsumingFreshnessTimeMs);
+ }
}
// Parse the option from request whether to preserve the type
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 627cda7..dcd5ee6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -54,6 +54,7 @@ public abstract class QueryScheduler {
private static final String INVALID_NUM_SCANNED = "-1";
private static final String INVALID_SEGMENTS_COUNT = "-1";
+ private static final String INVALID_FRESHNESS_MS = "-1";
private static final String QUERY_LOG_MAX_RATE_KEY = "query.log.maxRatePerSecond";
private static final double DEFAULT_QUERY_LOG_MAX_RATE = 10_000d;
@@ -173,6 +174,10 @@ public abstract class QueryScheduler {
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_SEGMENTS_PROCESSED, INVALID_SEGMENTS_COUNT));
long numSegmentsMatched =
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_SEGMENTS_MATCHED, INVALID_SEGMENTS_COUNT));
+ long numSegmentsConsuming =
+ Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_CONSUMING_SEGMENTS_QUERIED, INVALID_SEGMENTS_COUNT));
+ long minConsumingFreshnessMs =
+ Long.parseLong(dataTableMetadata.getOrDefault(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS, INVALID_FRESHNESS_MS));
if (numDocsScanned > 0) {
serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_DOCS_SCANNED, numDocsScanned);
@@ -192,10 +197,11 @@ public abstract class QueryScheduler {
if (queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs, numDocsScanned)) {
LOGGER.info(
- "Processed requestId={},table={},segments(queried/processed/matched)={}/{}/{},schedulerWaitMs={},totalExecMs={},totalTimeMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}",
- requestId, tableNameWithType, numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, schedulerWaitMs,
+ "Processed requestId={},table={},segments(queried/processed/matched/consuming)={}/{}/{}/{},schedulerWaitMs={},totalExecMs={},totalTimeMs={},minConsumingFreshnessMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}",
+ requestId, tableNameWithType, numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, numSegmentsConsuming, schedulerWaitMs,
timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
- timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), queryRequest.getBrokerId(),
+ timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), minConsumingFreshnessMs,
+ queryRequest.getBrokerId(),
numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, name());
// Limit the dropping log message at most once per second.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org