You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by su...@apache.org on 2019/06/17 17:34:15 UTC
[incubator-pinot] branch master updated: Emit freshness lag at
server level as well (#4308)
This is an automated email from the ASF dual-hosted git repository.
sunithabeeram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ad6a46c Emit freshness lag at server level as well (#4308)
ad6a46c is described below
commit ad6a46c3e472022ed23764b7f91aa19db97a1092
Author: Sunitha Beeram <sb...@linkedin.com>
AuthorDate: Mon Jun 17 10:34:09 2019 -0700
Emit freshness lag at server level as well (#4308)
* Emit freshness lag at server level as well
* Address review comments
---
.../java/org/apache/pinot/common/metadata/RowMetadata.java | 11 ++++++++---
.../java/org/apache/pinot/common/metrics/ServerTimer.java | 6 +++++-
.../pinot/core/indexsegment/mutable/MutableSegmentImpl.java | 3 +--
.../org/apache/pinot/core/query/scheduler/QueryScheduler.java | 6 ++++++
.../pinot/core/realtime/stream/StreamMessageMetadata.java | 6 ++++++
5 files changed, 26 insertions(+), 6 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/RowMetadata.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/RowMetadata.java
index 4e67a05..63be9f1 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/RowMetadata.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/RowMetadata.java
@@ -23,16 +23,21 @@ import org.apache.pinot.annotations.InterfaceStability;
/**
- * A class that provides relevant row-level metadata for rows ingested into a segment.
+ * A class that provides relevant row-level metadata for rows indexed into a segment.
*
- * Currently this is relevant for rows ingested into a mutable segment.
+ * Currently this is relevant for rows ingested into a mutable segment - the metadata is expected to be
+ * provided by the underlying stream.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface RowMetadata {
/**
- * Return the ingestion timestamp of the row.
+ * Return the timestamp associated with when the row was ingested upstream.
+ * Expected to be mainly used for stream-based sources.
+ *
+ * @return timestamp (epoch in milliseconds) when the row was ingested upstream
+ * Long.MIN_VALUE if not available
*/
long getIngestionTimeMs();
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
index a8560f0..326ae10 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
@@ -26,7 +26,11 @@ import org.apache.pinot.common.Utils;
*
*/
public enum ServerTimer implements AbstractMetrics.Timer {
- CURRENT_MSG_EVENT_TIMESTAMP_LAG("currentMsgEventTimestampLag", false);
+ // don't see usages for this
+ @Deprecated
+ CURRENT_MSG_EVENT_TIMESTAMP_LAG("currentMsgEventTimestampLag", false),
+ // metric tracking the freshness lag for consuming segments
+ FRESHNESS_LAG_MS("freshnessLagMs", false);
private final String timerName;
private final boolean global;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index 3a09bbb..972d91c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -99,7 +99,6 @@ public class MutableSegmentImpl implements MutableSegment {
private final int _numKeyColumns;
// default message metadata
- private static final StreamMessageMetadata _defaultMetadata = new StreamMessageMetadata(System.currentTimeMillis());
private volatile long _lastIndexedTimeMs = Long.MIN_VALUE;
private volatile long _latestIngestionTimeMs = Long.MIN_VALUE;
@@ -243,7 +242,7 @@ public class MutableSegmentImpl implements MutableSegment {
_lastIndexedTimeMs = System.currentTimeMillis();
- if (rowMetadata != null) {
+ if (rowMetadata != null && rowMetadata.getIngestionTimeMs() != Long.MIN_VALUE) {
_latestIngestionTimeMs = Math.max(_latestIngestionTimeMs, rowMetadata.getIngestionTimeMs());
}
return canTakeMore;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 51e28ed..24c4ec0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.RateLimiter;
import java.util.Map;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAccumulator;
import javax.annotation.Nonnull;
@@ -34,6 +35,7 @@ import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerQueryPhase;
+import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.datatable.DataTableImplV2;
@@ -220,6 +222,10 @@ public abstract class QueryScheduler {
numDroppedLogCounter.incrementAndGet();
}
+ if (minConsumingFreshnessMs > -1) {
+ serverMetrics.addTimedTableValue(tableNameWithType, ServerTimer.FRESHNESS_LAG_MS,
+ (System.currentTimeMillis() - minConsumingFreshnessMs), TimeUnit.MILLISECONDS);
+ }
serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_QUERIED, numSegmentsQueried);
serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PROCESSED, numSegmentsProcessed);
serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_MATCHED, numSegmentsMatched);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
index 0b3ad84..9ad416a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
@@ -29,6 +29,12 @@ public class StreamMessageMetadata implements RowMetadata {
private final long _ingestionTimeMs;
+ /**
+ * Construct the stream based message/row message metadata
+ *
+ * @param ingestionTimeMs the time that the message was ingested by the stream provider
+ * use Long.MIN_VALUE if not applicable
+ */
public StreamMessageMetadata(long ingestionTimeMs) {
_ingestionTimeMs = ingestionTimeMs;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org