You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by su...@apache.org on 2019/05/22 05:21:15 UTC

[incubator-pinot] branch master updated: Emit freshnessLag metric from broker for queries hitting consuming segments (#4229)

This is an automated email from the ASF dual-hosted git repository.

sunithabeeram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 06baad6  Emit freshnessLag metric from broker for queries hitting consuming segments (#4229)
06baad6 is described below

commit 06baad647bfb04ac7c8c1796a7318edea47815d2
Author: Sunitha Beeram <sb...@linkedin.com>
AuthorDate: Tue May 21 22:21:10 2019 -0700

    Emit freshnessLag metric from broker for queries hitting consuming segments (#4229)
    
    * Emit freshnessLag metric from broker for queries hitting consuming segments
    
    * Address review comments
    
    * Minor formatting fixes
---
 .../broker/requesthandler/BaseBrokerRequestHandler.java |  6 ++++--
 .../org/apache/pinot/common/metrics/BrokerTimer.java    |  4 +++-
 .../apache/pinot/common/response/BrokerResponse.java    | 10 ++++++++++
 .../common/response/broker/BrokerResponseNative.java    |  6 +++++-
 .../core/query/executor/ServerQueryExecutorV1Impl.java  |  2 +-
 .../pinot/core/query/reduce/BrokerReduceService.java    | 16 ++++++++++------
 .../pinot/core/query/scheduler/QueryScheduler.java      | 17 ++++++++++++-----
 7 files changed, 45 insertions(+), 16 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 7bc689f..89d3470 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -308,12 +308,14 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     if (_queryLogRateLimiter.tryAcquire() || forceLog(brokerResponse, totalTimeMs)) {
       // Table name might have been changed (with suffix _OFFLINE/_REALTIME appended)
       LOGGER.info(
-          "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{}, segments(queried/processed/matched):{}/{}/{} "
-              + "servers:{}/{}, groupLimitReached:{}, exceptions:{}, serverStats:{}, query:{}", requestId,
+          "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{},"
+              + " segments(queried/processed/matched/consuming):{}/{}/{}/{}, consumingFreshnessTimeMs:{},"
+              + " servers:{}/{}, groupLimitReached:{}, exceptions:{}, serverStats:{}, query:{}", requestId,
           brokerRequest.getQuerySource().getTableName(), totalTimeMs, brokerResponse.getNumDocsScanned(),
           brokerResponse.getTotalDocs(), brokerResponse.getNumEntriesScannedInFilter(),
           brokerResponse.getNumEntriesScannedPostFilter(), brokerResponse.getNumSegmentsQueried(),
           brokerResponse.getNumSegmentsProcessed(), brokerResponse.getNumSegmentsMatched(),
+          brokerResponse.getNumConsumingSegmentsQueried(), brokerResponse.getMinConsumingFreshnessTimeMs(),
           brokerResponse.getNumServersResponded(), brokerResponse.getNumServersQueried(),
           brokerResponse.isNumGroupsLimitReached(), brokerResponse.getExceptionsSize(), serverStats.getServerStats(),
           StringUtils.substring(query, 0, _queryLogLength));
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java
index 694db60..6c98dab 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java
@@ -27,7 +27,9 @@ import org.apache.pinot.common.Utils;
  */
 public enum BrokerTimer implements AbstractMetrics.Timer {
   ROUTING_TABLE_UPDATE_TIME(true),
-  CLUSTER_CHANGE_QUEUE_TIME(true);
+  CLUSTER_CHANGE_QUEUE_TIME(true),
+  // metric tracking the freshness lag for consuming segments
+  FRESHNESS_LAG_MS(false);
 
   private final String timerName;
   private final boolean global;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
index 72b95e2..ec2960e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
@@ -98,6 +98,16 @@ public interface BrokerResponse {
   long getNumSegmentsMatched();
 
   /**
+   * Get number of consuming segments that were queried.
+   */
+  long getNumConsumingSegmentsQueried();
+
+  /**
+   * Get the minimum freshness timestamp across consuming segments that were queried
+   */
+  long getMinConsumingFreshnessTimeMs();
+
+  /**
    * Get total number of documents within the table hit.
    */
   long getTotalDocs();
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index 214d942..38ae0c3 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -178,6 +178,7 @@ public class BrokerResponseNative implements BrokerResponse {
   }
 
   @JsonProperty("numSegmentsQueried")
+  @Override
   public long getNumSegmentsQueried() {
     return _numSegmentsQueried;
   }
@@ -188,6 +189,7 @@ public class BrokerResponseNative implements BrokerResponse {
   }
 
   @JsonProperty("numSegmentsProcessed")
+  @Override
   public long getNumSegmentsProcessed() {
     return _numSegmentsProcessed;
   }
@@ -198,6 +200,7 @@ public class BrokerResponseNative implements BrokerResponse {
   }
 
   @JsonProperty("numSegmentsMatched")
+  @Override
   public long getNumSegmentsMatched() {
     return _numSegmentsMatched;
   }
@@ -208,6 +211,7 @@ public class BrokerResponseNative implements BrokerResponse {
   }
 
   @JsonProperty("numConsumingSegmentsQueried")
+  @Override
   public long getNumConsumingSegmentsQueried() {
     return _numConsumingSegmentsQueried;
   }
@@ -218,6 +222,7 @@ public class BrokerResponseNative implements BrokerResponse {
   }
 
   @JsonProperty("minConsumingFreshnessTimeMs")
+  @Override
   public long getMinConsumingFreshnessTimeMs() {
     return _minConsumingFreshnessTimeMs;
   }
@@ -227,7 +232,6 @@ public class BrokerResponseNative implements BrokerResponse {
     _minConsumingFreshnessTimeMs = minConsumingFreshnessTimeMs;
   }
 
-
   @JsonProperty("totalDocs")
   @Override
   public long getTotalDocs() {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 3058539..6b59e1a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -170,7 +170,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
       }
     }
 
-    long minConsumingFreshnessTimeMs = Long.MAX_VALUE;
+    long minConsumingFreshnessTimeMs = minIngestionTimeMs;
     if (numConsumingSegmentsQueried > 0) {
       if (minIngestionTimeMs == Long.MAX_VALUE) {
         LOGGER.debug("Did not find valid ingestionTimestamp across consuming segments! Using indexTime instead");
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index d539bf7..ca9d91f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
@@ -33,6 +34,7 @@ import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.metrics.BrokerTimer;
 import org.apache.pinot.common.query.ReduceService;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.GroupBy;
@@ -143,9 +145,9 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
         numConsumingSegmentsQueried += Long.parseLong(numConsumingString);
       }
 
-      String minConsumingIndexTsString = metadata.get(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS);
-      if (minConsumingIndexTsString != null) {
-        minConsumingFreshnessTimeMs = Math.min(Long.parseLong(minConsumingIndexTsString), minConsumingFreshnessTimeMs);
+      String minConsumingFreshnessTimeMsString = metadata.get(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS);
+      if (minConsumingFreshnessTimeMsString != null) {
+          minConsumingFreshnessTimeMs = Math.min(Long.parseLong(minConsumingFreshnessTimeMsString), minConsumingFreshnessTimeMs);
       }
 
       String numTotalRawDocsString = metadata.get(DataTable.TOTAL_DOCS_METADATA_KEY);
@@ -181,9 +183,6 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
     brokerResponseNative.setTotalDocs(numTotalRawDocs);
     brokerResponseNative.setNumGroupsLimitReached(numGroupsLimitReached);
     if (numConsumingSegmentsQueried > 0) {
-      if (minConsumingFreshnessTimeMs == Long.MAX_VALUE) {
-        LOGGER.error("Invalid lastIndexedTimestamp across {} consuming segments", numConsumingSegmentsQueried);
-      }
       brokerResponseNative.setNumConsumingSegmentsQueried(numConsumingSegmentsQueried);
       brokerResponseNative.setMinConsumingFreshnessTimeMs(minConsumingFreshnessTimeMs);
     }
@@ -197,6 +196,11 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
           .addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_IN_FILTER, numEntriesScannedInFilter);
       brokerMetrics
           .addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_POST_FILTER, numEntriesScannedPostFilter);
+
+      if (numConsumingSegmentsQueried > 0 && minConsumingFreshnessTimeMs > 0) {
+        brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.FRESHNESS_LAG_MS,
+            System.currentTimeMillis() - minConsumingFreshnessTimeMs, TimeUnit.MILLISECONDS);
+      }
     }
 
     // Parse the option from request whether to preserve the type
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 627cda7..51e28ed 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -54,6 +54,7 @@ public abstract class QueryScheduler {
 
   private static final String INVALID_NUM_SCANNED = "-1";
   private static final String INVALID_SEGMENTS_COUNT = "-1";
+  private static final String INVALID_FRESHNESS_MS = "-1";
   private static final String QUERY_LOG_MAX_RATE_KEY = "query.log.maxRatePerSecond";
   private static final double DEFAULT_QUERY_LOG_MAX_RATE = 10_000d;
 
@@ -173,6 +174,10 @@ public abstract class QueryScheduler {
         Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_SEGMENTS_PROCESSED, INVALID_SEGMENTS_COUNT));
     long numSegmentsMatched =
         Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_SEGMENTS_MATCHED, INVALID_SEGMENTS_COUNT));
+    long numSegmentsConsuming =
+        Long.parseLong(dataTableMetadata.getOrDefault(DataTable.NUM_CONSUMING_SEGMENTS_QUERIED, INVALID_SEGMENTS_COUNT));
+    long minConsumingFreshnessMs =
+        Long.parseLong(dataTableMetadata.getOrDefault(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS, INVALID_FRESHNESS_MS));
 
     if (numDocsScanned > 0) {
       serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_DOCS_SCANNED, numDocsScanned);
@@ -192,11 +197,13 @@ public abstract class QueryScheduler {
 
     if (queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs, numDocsScanned)) {
       LOGGER.info(
-          "Processed requestId={},table={},segments(queried/processed/matched)={}/{}/{},schedulerWaitMs={},totalExecMs={},totalTimeMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}",
-          requestId, tableNameWithType, numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, schedulerWaitMs,
-          timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
-          timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), queryRequest.getBrokerId(),
-          numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, name());
+          "Processed requestId={},table={},segments(queried/processed/matched/consuming)={}/{}/{}/{},"
+              + "schedulerWaitMs={},totalExecMs={},totalTimeMs={},minConsumingFreshnessMs={},broker={},"
+              + "numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}",
+          requestId, tableNameWithType, numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched,
+          numSegmentsConsuming, schedulerWaitMs, timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
+          timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), minConsumingFreshnessMs,
+          queryRequest.getBrokerId(), numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, name());
 
       // Limit the dropping log message at most once per second.
       if (numDroppedLogRateLimiter.tryAcquire()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org