You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by su...@apache.org on 2019/06/17 17:34:15 UTC

[incubator-pinot] branch master updated: Emit freshness lag at server level as well (#4308)

This is an automated email from the ASF dual-hosted git repository.

sunithabeeram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ad6a46c  Emit freshness lag at server level as well (#4308)
ad6a46c is described below

commit ad6a46c3e472022ed23764b7f91aa19db97a1092
Author: Sunitha Beeram <sb...@linkedin.com>
AuthorDate: Mon Jun 17 10:34:09 2019 -0700

    Emit freshness lag at server level as well (#4308)
    
    * Emit freshness lag at server level as well
    
    * Address review comments
---
 .../java/org/apache/pinot/common/metadata/RowMetadata.java    | 11 ++++++++---
 .../java/org/apache/pinot/common/metrics/ServerTimer.java     |  6 +++++-
 .../pinot/core/indexsegment/mutable/MutableSegmentImpl.java   |  3 +--
 .../org/apache/pinot/core/query/scheduler/QueryScheduler.java |  6 ++++++
 .../pinot/core/realtime/stream/StreamMessageMetadata.java     |  6 ++++++
 5 files changed, 26 insertions(+), 6 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/RowMetadata.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/RowMetadata.java
index 4e67a05..63be9f1 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/RowMetadata.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/RowMetadata.java
@@ -23,16 +23,21 @@ import org.apache.pinot.annotations.InterfaceStability;
 
 
 /**
- * A class that provides relevant row-level metadata for rows ingested into a segment.
+ * A class that provides relevant row-level metadata for rows indexed into a segment.
  *
- * Currently this is relevant for rows ingested into a mutable segment.
+ * Currently this is relevant for rows ingested into a mutable segment - the metadata is expected to be
+ * provided by the underlying stream.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public interface RowMetadata {
 
   /**
-   * Return the ingestion timestamp of the row.
+   * Return the timestamp associated with when the row was ingested upstream.
+   * Expected to be mainly used for stream-based sources.
+   *
+   * @return timestamp (epoch in milliseconds) when the row was ingested upstream
+   *         Long.MIN_VALUE if not available
    */
   long getIngestionTimeMs();
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
index a8560f0..326ae10 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
@@ -26,7 +26,11 @@ import org.apache.pinot.common.Utils;
  *
  */
 public enum ServerTimer implements AbstractMetrics.Timer {
-  CURRENT_MSG_EVENT_TIMESTAMP_LAG("currentMsgEventTimestampLag", false);
+  // don't see usages for this
+  @Deprecated
+  CURRENT_MSG_EVENT_TIMESTAMP_LAG("currentMsgEventTimestampLag", false),
+  // metric tracking the freshness lag for consuming segments
+  FRESHNESS_LAG_MS("freshnessLagMs", false);
 
   private final String timerName;
   private final boolean global;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index 3a09bbb..972d91c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -99,7 +99,6 @@ public class MutableSegmentImpl implements MutableSegment {
   private final int _numKeyColumns;
 
   // default message metadata
-  private static final StreamMessageMetadata _defaultMetadata = new StreamMessageMetadata(System.currentTimeMillis());
   private volatile long _lastIndexedTimeMs = Long.MIN_VALUE;
   private volatile long _latestIngestionTimeMs = Long.MIN_VALUE;
 
@@ -243,7 +242,7 @@ public class MutableSegmentImpl implements MutableSegment {
 
     _lastIndexedTimeMs = System.currentTimeMillis();
 
-    if (rowMetadata != null) {
+    if (rowMetadata != null && rowMetadata.getIngestionTimeMs() != Long.MIN_VALUE) {
       _latestIngestionTimeMs = Math.max(_latestIngestionTimeMs, rowMetadata.getIngestionTimeMs());
     }
     return canTakeMore;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 51e28ed..24c4ec0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListenableFutureTask;
 import com.google.common.util.concurrent.RateLimiter;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.LongAccumulator;
 import javax.annotation.Nonnull;
@@ -34,6 +35,7 @@ import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.metrics.ServerQueryPhase;
+import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.datatable.DataTableImplV2;
@@ -220,6 +222,10 @@ public abstract class QueryScheduler {
       numDroppedLogCounter.incrementAndGet();
     }
 
+    if (minConsumingFreshnessMs > -1) {
+      serverMetrics.addTimedTableValue(tableNameWithType, ServerTimer.FRESHNESS_LAG_MS,
+          (System.currentTimeMillis() - minConsumingFreshnessMs), TimeUnit.MILLISECONDS);
+    }
     serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_QUERIED, numSegmentsQueried);
     serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PROCESSED, numSegmentsProcessed);
     serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_MATCHED, numSegmentsMatched);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
index 0b3ad84..9ad416a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
@@ -29,6 +29,12 @@ public class StreamMessageMetadata implements RowMetadata {
 
   private final long _ingestionTimeMs;
 
+  /**
+   * Construct the stream based message/row message metadata
+   *
+   * @param ingestionTimeMs  the time that the message was ingested by the stream provider
+   *                         use Long.MIN_VALUE if not applicable
+   */
   public StreamMessageMetadata(long ingestionTimeMs) {
     _ingestionTimeMs = ingestionTimeMs;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org