You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by su...@apache.org on 2019/05/21 17:56:15 UTC

[incubator-pinot] 01/01: Emit freshnessLag metric from broker for queries hitting consuming segments

This is an automated email from the ASF dual-hosted git repository.

sunithabeeram pushed a commit to branch consumingStats
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit a5066fbe5ebd2e22311db4c37d818e8227e98d97
Author: Sunitha Beeram <sb...@sbeeram-ld2.linkedin.biz>
AuthorDate: Tue May 21 10:55:51 2019 -0700

    Emit freshnessLag metric from broker for queries hitting consuming segments
---
 .../requesthandler/BaseBrokerRequestHandler.java     |  6 ++++--
 .../org/apache/pinot/common/metrics/BrokerMeter.java |  3 +++
 .../apache/pinot/common/response/BrokerResponse.java |  4 ++++
 .../common/response/broker/BrokerResponseNative.java |  1 -
 .../query/executor/ServerQueryExecutorV1Impl.java    |  4 +++-
 .../pinot/core/query/reduce/BrokerReduceService.java | 20 ++++++++++++++++----
 .../pinot/core/query/scheduler/QueryScheduler.java   | 12 +++++++++---
 7 files changed, 39 insertions(+), 11 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 7bc689f..5f503dd 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -308,12 +308,14 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     if (_queryLogRateLimiter.tryAcquire() || forceLog(brokerResponse, totalTimeMs)) {
       // Table name might have been changed (with suffix _OFFLINE/_REALTIME appended)
       LOGGER.info(
-          "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{}, segments(queried/processed/matched):{}/{}/{} "
-              + "servers:{}/{}, groupLimitReached:{}, exceptions:{}, serverStats:{}, query:{}", requestId,
+          "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{}, segments(queried/processed/matched/consuming):{}/{}/{}/{},"
+              + " consumingFreshnessTimeMs:{},"
+              + " servers:{}/{}, groupLimitReached:{}, exceptions:{}, serverStats:{}, query:{}", requestId,
           brokerRequest.getQuerySource().getTableName(), totalTimeMs, brokerResponse.getNumDocsScanned(),
           brokerResponse.getTotalDocs(), brokerResponse.getNumEntriesScannedInFilter(),
           brokerResponse.getNumEntriesScannedPostFilter(), brokerResponse.getNumSegmentsQueried(),
           brokerResponse.getNumSegmentsProcessed(), brokerResponse.getNumSegmentsMatched(),
+          brokerResponse.getNumConsumingSegmentsQueried(), brokerResponse.getMinConsumingFreshnessTimeMs(),
           brokerResponse.getNumServersResponded(), brokerResponse.getNumServersQueried(),
           brokerResponse.isNumGroupsLimitReached(), brokerResponse.getExceptionsSize(), serverStats.getServerStats(),
           StringUtils.substring(query, 0, _queryLogLength));
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index 170e745..facd15f 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -63,6 +63,9 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
   ENTRIES_SCANNED_IN_FILTER("documents", false),
   ENTRIES_SCANNED_POST_FILTER("documents", false),
 
+  // metric tracking the freshness lag for consuming segments
+  FRESHNESS_LAG_MS("latency", false),
+
   REQUEST_CONNECTION_TIMEOUTS("timeouts", false),
   HELIX_ZOOKEEPER_RECONNECTS("reconnects", true),
 
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
index 72b95e2..0533318 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
@@ -97,6 +97,10 @@ public interface BrokerResponse {
    */
   long getNumSegmentsMatched();
 
+  long getNumConsumingSegmentsQueried();
+
+  long getMinConsumingFreshnessTimeMs();
+
   /**
    * Get total number of documents within the table hit.
    */
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index 214d942..1ccedd6 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -227,7 +227,6 @@ public class BrokerResponseNative implements BrokerResponse {
     _minConsumingFreshnessTimeMs = minConsumingFreshnessTimeMs;
   }
 
-
   @JsonProperty("totalDocs")
   @Override
   public long getTotalDocs() {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 3058539..5f70dc2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -170,11 +170,13 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
       }
     }
 
-    long minConsumingFreshnessTimeMs = Long.MAX_VALUE;
+    long minConsumingFreshnessTimeMs = -1;
     if (numConsumingSegmentsQueried > 0) {
       if (minIngestionTimeMs == Long.MAX_VALUE) {
         LOGGER.debug("Did not find valid ingestionTimestamp across consuming segments! Using indexTime instead");
         minConsumingFreshnessTimeMs = minIndexTimeMs;
+      } else {
+        minConsumingFreshnessTimeMs = minIngestionTimeMs;
       }
       LOGGER.debug("Querying {} consuming segments with min minConsumingFreshnessTimeMs {}", numConsumingSegmentsQueried, minConsumingFreshnessTimeMs);
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index d539bf7..dde809e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -64,6 +64,7 @@ import org.slf4j.LoggerFactory;
 @ThreadSafe
 public class BrokerReduceService implements ReduceService<BrokerResponseNative> {
   private static final Logger LOGGER = LoggerFactory.getLogger(BrokerReduceService.class);
+  private static final long INVALID_FRESHNESS = -1;
 
   @Nonnull
   @Override
@@ -143,9 +144,13 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
         numConsumingSegmentsQueried += Long.parseLong(numConsumingString);
       }
 
-      String minConsumingIndexTsString = metadata.get(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS);
-      if (minConsumingIndexTsString != null) {
-        minConsumingFreshnessTimeMs = Math.min(Long.parseLong(minConsumingIndexTsString), minConsumingFreshnessTimeMs);
+      String minConsumingFreshnessTimeMsString = metadata.get(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS);
+      if (minConsumingFreshnessTimeMsString != null) {
+        long freshness = Long.parseLong(minConsumingFreshnessTimeMsString);
+        // ignore invalid values
+        if (freshness > INVALID_FRESHNESS) {
+          minConsumingFreshnessTimeMs = Math.min(freshness, minConsumingFreshnessTimeMs);
+        }
       }
 
       String numTotalRawDocsString = metadata.get(DataTable.TOTAL_DOCS_METADATA_KEY);
@@ -182,7 +187,9 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
     brokerResponseNative.setNumGroupsLimitReached(numGroupsLimitReached);
     if (numConsumingSegmentsQueried > 0) {
       if (minConsumingFreshnessTimeMs == Long.MAX_VALUE) {
-        LOGGER.error("Invalid lastIndexedTimestamp across {} consuming segments", numConsumingSegmentsQueried);
+        LOGGER.error("Invalid freshness time across {} consuming segments", numConsumingSegmentsQueried);
+        // use the invalid value (-1) for clear logging
+        minConsumingFreshnessTimeMs = INVALID_FRESHNESS;
       }
       brokerResponseNative.setNumConsumingSegmentsQueried(numConsumingSegmentsQueried);
       brokerResponseNative.setMinConsumingFreshnessTimeMs(minConsumingFreshnessTimeMs);
@@ -197,6 +204,11 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
           .addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_IN_FILTER, numEntriesScannedInFilter);
       brokerMetrics
           .addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_POST_FILTER, numEntriesScannedPostFilter);
+
+      if (numConsumingSegmentsQueried > 0 && minConsumingFreshnessTimeMs > 0) {
+        brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.FRESHNESS_LAG_MS,
+            System.currentTimeMillis() - minConsumingFreshnessTimeMs);
+      }
     }
 
     // Parse the option from request whether to preserve the type
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 627cda7..dcd5ee6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -54,6 +54,7 @@ public abstract class QueryScheduler {
 
   private static final String INVALID_NUM_SCANNED = "-1";
   private static final String INVALID_SEGMENTS_COUNT = "-1";
+  private static final String INVALID_FRESHNESS_MS = "-1";
   private static final String QUERY_LOG_MAX_RATE_KEY = "query.log.maxRatePerSecond";
   private static final double DEFAULT_QUERY_LOG_MAX_RATE = 10_000d;
 
@@ -173,6 +174,10 @@ public abstract class QueryScheduler {
         Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_SEGMENTS_PROCESSED, INVALID_SEGMENTS_COUNT));
     long numSegmentsMatched =
         Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_SEGMENTS_MATCHED, INVALID_SEGMENTS_COUNT));
+    long numSegmentsConsuming =
+        Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_CONSUMING_SEGMENTS_QUERIED, INVALID_SEGMENTS_COUNT));
+    long minConsumingFreshnessMs =
+        Long.parseLong(dataTableMetadata.getOrDefault(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS, INVALID_FRESHNESS_MS));
 
     if (numDocsScanned > 0) {
       serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_DOCS_SCANNED, numDocsScanned);
@@ -192,10 +197,11 @@ public abstract class QueryScheduler {
 
     if (queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs, numDocsScanned)) {
       LOGGER.info(
-          "Processed requestId={},table={},segments(queried/processed/matched)={}/{}/{},schedulerWaitMs={},totalExecMs={},totalTimeMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}",
-          requestId, tableNameWithType, numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, schedulerWaitMs,
+          "Processed requestId={},table={},segments(queried/processed/matched/consuming)={}/{}/{}/{},schedulerWaitMs={},totalExecMs={},totalTimeMs={},minConsumingFreshnessMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}",
+          requestId, tableNameWithType, numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, numSegmentsConsuming, schedulerWaitMs,
           timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
-          timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), queryRequest.getBrokerId(),
+          timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), minConsumingFreshnessMs,
+          queryRequest.getBrokerId(),
           numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, name());
 
       // Limit the dropping log message at most once per second.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org