You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by su...@apache.org on 2019/05/22 05:21:15 UTC
[incubator-pinot] branch master updated: Emit freshnessLag metric
from broker for queries hitting consuming segments (#4229)
This is an automated email from the ASF dual-hosted git repository.
sunithabeeram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 06baad6 Emit freshnessLag metric from broker for queries hitting consuming segments (#4229)
06baad6 is described below
commit 06baad647bfb04ac7c8c1796a7318edea47815d2
Author: Sunitha Beeram <sb...@linkedin.com>
AuthorDate: Tue May 21 22:21:10 2019 -0700
Emit freshnessLag metric from broker for queries hitting consuming segments (#4229)
* Emit freshnessLag metric from broker for queries hitting consuming segments
* Address review comments
* Minor formatting fixes
---
.../broker/requesthandler/BaseBrokerRequestHandler.java | 6 ++++--
.../org/apache/pinot/common/metrics/BrokerTimer.java | 4 +++-
.../apache/pinot/common/response/BrokerResponse.java | 10 ++++++++++
.../common/response/broker/BrokerResponseNative.java | 6 +++++-
.../core/query/executor/ServerQueryExecutorV1Impl.java | 2 +-
.../pinot/core/query/reduce/BrokerReduceService.java | 16 ++++++++++------
.../pinot/core/query/scheduler/QueryScheduler.java | 17 ++++++++++++-----
7 files changed, 45 insertions(+), 16 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 7bc689f..89d3470 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -308,12 +308,14 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
if (_queryLogRateLimiter.tryAcquire() || forceLog(brokerResponse, totalTimeMs)) {
// Table name might have been changed (with suffix _OFFLINE/_REALTIME appended)
LOGGER.info(
- "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{}, segments(queried/processed/matched):{}/{}/{} "
- + "servers:{}/{}, groupLimitReached:{}, exceptions:{}, serverStats:{}, query:{}", requestId,
+ "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{},"
+ + " segments(queried/processed/matched/consuming):{}/{}/{}/{}, consumingFreshnessTimeMs:{},"
+ + " servers:{}/{}, groupLimitReached:{}, exceptions:{}, serverStats:{}, query:{}", requestId,
brokerRequest.getQuerySource().getTableName(), totalTimeMs, brokerResponse.getNumDocsScanned(),
brokerResponse.getTotalDocs(), brokerResponse.getNumEntriesScannedInFilter(),
brokerResponse.getNumEntriesScannedPostFilter(), brokerResponse.getNumSegmentsQueried(),
brokerResponse.getNumSegmentsProcessed(), brokerResponse.getNumSegmentsMatched(),
+ brokerResponse.getNumConsumingSegmentsQueried(), brokerResponse.getMinConsumingFreshnessTimeMs(),
brokerResponse.getNumServersResponded(), brokerResponse.getNumServersQueried(),
brokerResponse.isNumGroupsLimitReached(), brokerResponse.getExceptionsSize(), serverStats.getServerStats(),
StringUtils.substring(query, 0, _queryLogLength));
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java
index 694db60..6c98dab 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java
@@ -27,7 +27,9 @@ import org.apache.pinot.common.Utils;
*/
public enum BrokerTimer implements AbstractMetrics.Timer {
ROUTING_TABLE_UPDATE_TIME(true),
- CLUSTER_CHANGE_QUEUE_TIME(true);
+ CLUSTER_CHANGE_QUEUE_TIME(true),
+ // metric tracking the freshness lag for consuming segments
+ FRESHNESS_LAG_MS(false);
private final String timerName;
private final boolean global;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
index 72b95e2..ec2960e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
@@ -98,6 +98,16 @@ public interface BrokerResponse {
long getNumSegmentsMatched();
/**
+ * Get number of consuming segments that were queried.
+ */
+ long getNumConsumingSegmentsQueried();
+
+ /**
+ * Get the minimum freshness timestamp across consuming segments that were queried
+ */
+ long getMinConsumingFreshnessTimeMs();
+
+ /**
* Get total number of documents within the table hit.
*/
long getTotalDocs();
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index 214d942..38ae0c3 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -178,6 +178,7 @@ public class BrokerResponseNative implements BrokerResponse {
}
@JsonProperty("numSegmentsQueried")
+ @Override
public long getNumSegmentsQueried() {
return _numSegmentsQueried;
}
@@ -188,6 +189,7 @@ public class BrokerResponseNative implements BrokerResponse {
}
@JsonProperty("numSegmentsProcessed")
+ @Override
public long getNumSegmentsProcessed() {
return _numSegmentsProcessed;
}
@@ -198,6 +200,7 @@ public class BrokerResponseNative implements BrokerResponse {
}
@JsonProperty("numSegmentsMatched")
+ @Override
public long getNumSegmentsMatched() {
return _numSegmentsMatched;
}
@@ -208,6 +211,7 @@ public class BrokerResponseNative implements BrokerResponse {
}
@JsonProperty("numConsumingSegmentsQueried")
+ @Override
public long getNumConsumingSegmentsQueried() {
return _numConsumingSegmentsQueried;
}
@@ -218,6 +222,7 @@ public class BrokerResponseNative implements BrokerResponse {
}
@JsonProperty("minConsumingFreshnessTimeMs")
+ @Override
public long getMinConsumingFreshnessTimeMs() {
return _minConsumingFreshnessTimeMs;
}
@@ -227,7 +232,6 @@ public class BrokerResponseNative implements BrokerResponse {
_minConsumingFreshnessTimeMs = minConsumingFreshnessTimeMs;
}
-
@JsonProperty("totalDocs")
@Override
public long getTotalDocs() {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 3058539..6b59e1a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -170,7 +170,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
}
}
- long minConsumingFreshnessTimeMs = Long.MAX_VALUE;
+ long minConsumingFreshnessTimeMs = minIngestionTimeMs;
if (numConsumingSegmentsQueried > 0) {
if (minIngestionTimeMs == Long.MAX_VALUE) {
LOGGER.debug("Did not find valid ingestionTimestamp across consuming segments! Using indexTime instead");
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index d539bf7..ca9d91f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
@@ -33,6 +34,7 @@ import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.metrics.BrokerTimer;
import org.apache.pinot.common.query.ReduceService;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.GroupBy;
@@ -143,9 +145,9 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
numConsumingSegmentsQueried += Long.parseLong(numConsumingString);
}
- String minConsumingIndexTsString = metadata.get(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS);
- if (minConsumingIndexTsString != null) {
- minConsumingFreshnessTimeMs = Math.min(Long.parseLong(minConsumingIndexTsString), minConsumingFreshnessTimeMs);
+ String minConsumingFreshnessTimeMsString = metadata.get(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS);
+ if (minConsumingFreshnessTimeMsString != null) {
+ minConsumingFreshnessTimeMs = Math.min(Long.parseLong(minConsumingFreshnessTimeMsString), minConsumingFreshnessTimeMs);
}
String numTotalRawDocsString = metadata.get(DataTable.TOTAL_DOCS_METADATA_KEY);
@@ -181,9 +183,6 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
brokerResponseNative.setTotalDocs(numTotalRawDocs);
brokerResponseNative.setNumGroupsLimitReached(numGroupsLimitReached);
if (numConsumingSegmentsQueried > 0) {
- if (minConsumingFreshnessTimeMs == Long.MAX_VALUE) {
- LOGGER.error("Invalid lastIndexedTimestamp across {} consuming segments", numConsumingSegmentsQueried);
- }
brokerResponseNative.setNumConsumingSegmentsQueried(numConsumingSegmentsQueried);
brokerResponseNative.setMinConsumingFreshnessTimeMs(minConsumingFreshnessTimeMs);
}
@@ -197,6 +196,11 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
.addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_IN_FILTER, numEntriesScannedInFilter);
brokerMetrics
.addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_POST_FILTER, numEntriesScannedPostFilter);
+
+ if (numConsumingSegmentsQueried > 0 && minConsumingFreshnessTimeMs > 0) {
+ brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.FRESHNESS_LAG_MS,
+ System.currentTimeMillis() - minConsumingFreshnessTimeMs, TimeUnit.MILLISECONDS);
+ }
}
// Parse the option from request whether to preserve the type
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 627cda7..51e28ed 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -54,6 +54,7 @@ public abstract class QueryScheduler {
private static final String INVALID_NUM_SCANNED = "-1";
private static final String INVALID_SEGMENTS_COUNT = "-1";
+ private static final String INVALID_FRESHNESS_MS = "-1";
private static final String QUERY_LOG_MAX_RATE_KEY = "query.log.maxRatePerSecond";
private static final double DEFAULT_QUERY_LOG_MAX_RATE = 10_000d;
@@ -173,6 +174,10 @@ public abstract class QueryScheduler {
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_SEGMENTS_PROCESSED, INVALID_SEGMENTS_COUNT));
long numSegmentsMatched =
Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_SEGMENTS_MATCHED, INVALID_SEGMENTS_COUNT));
+ long numSegmentsConsuming =
+ Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_CONSUMING_SEGMENTS_QUERIED, INVALID_SEGMENTS_COUNT));
+ long minConsumingFreshnessMs =
+ Long.parseLong(dataTableMetadata.getOrDefault(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS, INVALID_FRESHNESS_MS));
if (numDocsScanned > 0) {
serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_DOCS_SCANNED, numDocsScanned);
@@ -192,11 +197,13 @@ public abstract class QueryScheduler {
if (queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs, numDocsScanned)) {
LOGGER.info(
- "Processed requestId={},table={},segments(queried/processed/matched)={}/{}/{},schedulerWaitMs={},totalExecMs={},totalTimeMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}",
- requestId, tableNameWithType, numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, schedulerWaitMs,
- timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
- timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), queryRequest.getBrokerId(),
- numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, name());
+ "Processed requestId={},table={},segments(queried/processed/matched/consuming)={}/{}/{}/{},"
+ + "schedulerWaitMs={},totalExecMs={},totalTimeMs={},minConsumingFreshnessMs={},broker={},"
+ + "numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}",
+ requestId, tableNameWithType, numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched,
+ numSegmentsConsuming, schedulerWaitMs, timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
+ timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), minConsumingFreshnessMs,
+ queryRequest.getBrokerId(), numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, name());
// Limit the dropping log message at most once per second.
if (numDroppedLogRateLimiter.tryAcquire()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org