You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/11/01 21:15:23 UTC

[pinot] branch master updated: Add record availability lag for Kafka connector (#9621)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new dcc496f63b Add record availability lag for Kafka connector (#9621)
dcc496f63b is described below

commit dcc496f63b4488989808ec144a7e2ffcc4697c75
Author: Navina Ramesh <na...@apache.org>
AuthorDate: Tue Nov 1 14:15:18 2022 -0700

    Add record availability lag for Kafka connector (#9621)
---
 .../restlet/resources/SegmentConsumerInfo.java     | 43 +++++++++++++---------
 .../util/ConsumingSegmentInfoReader.java           | 10 ++++-
 .../ConsumingSegmentInfoReaderStatelessTest.java   | 18 ++++-----
 .../realtime/LLRealtimeSegmentDataManager.java     |  9 ++++-
 .../realtime/LLRealtimeSegmentDataManagerTest.java |  5 +++
 .../stream/kafka20/KafkaConsumerPartitionLag.java  |  9 ++++-
 .../kafka20/KafkaStreamMetadataProvider.java       | 24 +++++++++---
 .../stream/kafka20/server/KafkaDataProducer.java   |  5 ++-
 .../pinot/server/api/resources/DebugResource.java  | 26 +++++++------
 .../pinot/server/api/resources/TablesResource.java | 15 ++++----
 .../pinot/spi/stream/ConsumerPartitionState.java   |  8 +++-
 .../apache/pinot/spi/stream/PartitionLagState.java | 12 +++++-
 12 files changed, 123 insertions(+), 61 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java
index e77bb97d82..b62a412e17 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java
@@ -68,23 +68,28 @@ public class SegmentConsumerInfo {
 
   @JsonIgnoreProperties(ignoreUnknown = true)
   static public class PartitionOffsetInfo {
-      @JsonProperty("currentOffsets")
-      public Map<String, String> _currentOffsets;
-
-      @JsonProperty("recordsLag")
-      public Map<String, String> _recordsLag;
-
-      @JsonProperty("latestUpstreamOffsets")
-      public Map<String, String> _latestUpstreamOffsets;
-
-      public PartitionOffsetInfo(
-          @JsonProperty("currentOffsets") Map<String, String> currentOffsets,
-          @JsonProperty("latestUpstreamOffsets") Map<String, String> latestUpstreamOffsets,
-          @JsonProperty("recordsLag") Map<String, String> recordsLag) {
-        _currentOffsets = currentOffsets;
-        _latestUpstreamOffsets = latestUpstreamOffsets;
-        _recordsLag = recordsLag;
-      }
+    @JsonProperty("currentOffsets")
+    public Map<String, String> _currentOffsets;
+
+    @JsonProperty("recordsLag")
+    public Map<String, String> _recordsLag;
+
+    @JsonProperty("latestUpstreamOffsets")
+    public Map<String, String> _latestUpstreamOffsets;
+
+    @JsonProperty("availabilityLagMs")
+    public Map<String, String> _availabilityLagMs;
+
+    public PartitionOffsetInfo(
+        @JsonProperty("currentOffsets") Map<String, String> currentOffsets,
+        @JsonProperty("latestUpstreamOffsets") Map<String, String> latestUpstreamOffsets,
+        @JsonProperty("recordsLag") Map<String, String> recordsLag,
+        @JsonProperty("availabilityLagMs") Map<String, String> availabilityLagMs) {
+      _currentOffsets = currentOffsets;
+      _latestUpstreamOffsets = latestUpstreamOffsets;
+      _recordsLag = recordsLag;
+      _availabilityLagMs = availabilityLagMs;
+    }
 
     public Map<String, String> getCurrentOffsets() {
       return _currentOffsets;
@@ -97,5 +102,9 @@ public class SegmentConsumerInfo {
     public Map<String, String> getLatestUpstreamOffsets() {
       return _latestUpstreamOffsets;
     }
+
+    public Map<String, String> getAvailabilityLagMs() {
+      return _availabilityLagMs;
+    }
   }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
index 8539eb89f8..6caaea3087 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
@@ -83,7 +83,8 @@ public class ConsumingSegmentInfoReader {
       for (SegmentConsumerInfo info : entry.getValue()) {
         SegmentConsumerInfo.PartitionOffsetInfo partitionOffsetInfo = info.getPartitionOffsetInfo();
         PartitionOffsetInfo offsetInfo = new PartitionOffsetInfo(partitionOffsetInfo.getCurrentOffsets(),
-            partitionOffsetInfo.getLatestUpstreamOffsets(), partitionOffsetInfo.getRecordsLag());
+            partitionOffsetInfo.getLatestUpstreamOffsets(), partitionOffsetInfo.getRecordsLag(),
+            partitionOffsetInfo.getAvailabilityLagMs());
         consumingSegmentInfoMap.computeIfAbsent(info.getSegmentName(), k -> new ArrayList<>()).add(
             new ConsumingSegmentInfo(serverName, info.getConsumerState(), info.getLastConsumedTimestamp(),
                 partitionOffsetInfo.getCurrentOffsets(), offsetInfo));
@@ -237,13 +238,18 @@ public class ConsumingSegmentInfoReader {
     @JsonProperty("latestUpstreamOffsetMap")
     public Map<String, String> _latestUpstreamOffsetMap;
 
+    @JsonProperty("availabilityLagMsMap")
+    public Map<String, String> _availabilityLagMap;
+
     public PartitionOffsetInfo(
         @JsonProperty("currentOffsetsMap") Map<String, String> currentOffsetsMap,
         @JsonProperty("latestUpstreamOffsetMap") Map<String, String> latestUpstreamOffsetMap,
-        @JsonProperty("recordsLagMap") Map<String, String> recordsLagMap) {
+        @JsonProperty("recordsLagMap") Map<String, String> recordsLagMap,
+        @JsonProperty("availabilityLagMsMap") Map<String, String> availabilityLagMsMap) {
       _currentOffsetsMap = currentOffsetsMap;
       _latestUpstreamOffsetMap = latestUpstreamOffsetMap;
       _recordsLagMap = recordsLagMap;
+      _availabilityLagMap = availabilityLagMsMap;
     }
   }
 }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java
index 8b07f641c5..c16fe5b573 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java
@@ -92,10 +92,10 @@ public class ConsumingSegmentInfoReaderStatelessTest {
         .newArrayList(
             new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0,
                 partitionToOffset0, new SegmentConsumerInfo.PartitionOffsetInfo(
-                    partitionToOffset0, Collections.emptyMap(), Collections.emptyMap())),
+                    partitionToOffset0, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap())),
             new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0,
                 partitionToOffset1, new SegmentConsumerInfo.PartitionOffsetInfo(
-                    partitionToOffset1, Collections.emptyMap(), Collections.emptyMap()))));
+                    partitionToOffset1, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()))));
     s0.start(uriPath, createHandler(200, s0._consumerInfos, 0));
     _serverMap.put("server0", s0);
 
@@ -104,10 +104,10 @@ public class ConsumingSegmentInfoReaderStatelessTest {
         .newArrayList(
             new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0,
                 partitionToOffset0, new SegmentConsumerInfo.PartitionOffsetInfo(
-                    partitionToOffset0, Collections.emptyMap(), Collections.emptyMap())),
+                    partitionToOffset0, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap())),
             new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0,
                 partitionToOffset1, new SegmentConsumerInfo.PartitionOffsetInfo(
-                    partitionToOffset1, Collections.emptyMap(), Collections.emptyMap()))));
+                    partitionToOffset1, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()))));
     s1.start(uriPath, createHandler(200, s1._consumerInfos, 0));
     _serverMap.put("server1", s1);
 
@@ -115,10 +115,10 @@ public class ConsumingSegmentInfoReaderStatelessTest {
     FakeConsumingInfoServer s2 = new FakeConsumingInfoServer(Lists
         .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "NOT_CONSUMING", 0,
                 partitionToOffset0, new SegmentConsumerInfo.PartitionOffsetInfo(
-                    partitionToOffset0, Collections.emptyMap(), Collections.emptyMap())),
+                    partitionToOffset0, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap())),
             new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1,
                 new SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset1, Collections.emptyMap(),
-                    Collections.emptyMap()))));
+                    Collections.emptyMap(), Collections.emptyMap()))));
     s2.start(uriPath, createHandler(200, s2._consumerInfos, 0));
     _serverMap.put("server2", s2);
 
@@ -126,7 +126,7 @@ public class ConsumingSegmentInfoReaderStatelessTest {
     FakeConsumingInfoServer s3 = new FakeConsumingInfoServer(
         Lists.newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0,
             partitionToOffset1, new SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset1, Collections.emptyMap(),
-                Collections.emptyMap()))));
+                Collections.emptyMap(), Collections.emptyMap()))));
     s3.start(uriPath, createHandler(200, s3._consumerInfos, 0));
     _serverMap.put("server3", s3);
 
@@ -134,10 +134,10 @@ public class ConsumingSegmentInfoReaderStatelessTest {
     FakeConsumingInfoServer s4 = new FakeConsumingInfoServer(Lists
         .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0,
                 partitionToOffset0, new SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset0,
-                Collections.emptyMap(), Collections.emptyMap())),
+                Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap())),
             new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1,
                 new SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset1, Collections.emptyMap(),
-                    Collections.emptyMap()))));
+                    Collections.emptyMap(), Collections.emptyMap()))));
     s4.start(uriPath, createHandler(200, s4._consumerInfos, TIMEOUT_MSEC * EXTENDED_TIMEOUT_FACTOR));
     _serverMap.put("server4", s4);
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index f395d71e56..b720f486bb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -280,6 +280,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   private final StreamPartitionMsgOffset _startOffset;
   private final PartitionLevelStreamConfig _partitionLevelStreamConfig;
 
+  private RowMetadata _lastRowMetadata;
+  private long _lastConsumedTimestampMs = -1;
+
   private long _lastLogTime = 0;
   private int _lastConsumedCount = 0;
   private String _stopReason = null;
@@ -577,6 +580,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
           try {
             canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
             indexedMessageCount++;
+            _lastRowMetadata = msgMetadata;
+            _lastConsumedTimestampMs = System.currentTimeMillis();
             realtimeRowsConsumedMeter =
                 _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1,
                     realtimeRowsConsumedMeter);
@@ -818,14 +823,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
 
   @Override
   public long getLastConsumedTimestamp() {
-    return _lastLogTime;
+    return _lastConsumedTimestampMs;
   }
 
   @Override
   public Map<String, ConsumerPartitionState> getConsumerPartitionState() {
     String partitionGroupId = String.valueOf(_partitionGroupId);
     return Collections.singletonMap(partitionGroupId, new ConsumerPartitionState(partitionGroupId, getCurrentOffset(),
-        getLastConsumedTimestamp(), fetchLatestStreamOffset(5_000)));
+        getLastConsumedTimestamp(), fetchLatestStreamOffset(5_000), _lastRowMetadata));
   }
 
   @Override
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index b911e23106..6a52a31df1 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -1084,6 +1084,11 @@ public class LLRealtimeSegmentDataManagerTest {
       _postConsumeStoppedCalled = true;
     }
 
+    // TODO: Some of the tests rely on specific number of calls to the `now()` method in the SegmentDataManager.
+    // This is not a good coding practice and makes the code very fragile. This needs to be fixed.
+    // Invoking now() in any part of LLRealtimeSegmentDataManager code will break the following tests:
+    // 1. LLRealtimeSegmentDataManagerTest.testShouldNotSkipUnfilteredMessagesIfNotIndexedAndRowCountThresholdIsReached
+    // 2. LLRealtimeSegmentDataManagerTest.testShouldNotSkipUnfilteredMessagesIfNotIndexedAndTimeThresholdIsReached
     @Override
     protected long now() {
       // now() is called in the constructor before _timeSupplier is set
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerPartitionLag.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerPartitionLag.java
index 709c2a7d79..4e4ab2a034 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerPartitionLag.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerPartitionLag.java
@@ -23,12 +23,19 @@ import org.apache.pinot.spi.stream.PartitionLagState;
 
 public class KafkaConsumerPartitionLag extends PartitionLagState {
   private final String _recordsLag;
+  private final String _availabilityLagMs;
 
-  public KafkaConsumerPartitionLag(String recordsLag) {
+  public KafkaConsumerPartitionLag(String recordsLag, String availabilityLagMs) {
     _recordsLag = recordsLag;
+    _availabilityLagMs = availabilityLagMs;
   }
 
   public String getRecordsLag() {
     return _recordsLag;
   }
+
+  @Override
+  public String getAvailabilityLagMs() {
+    return _availabilityLagMs;
+  }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index c6fa7ff218..022b38273e 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -31,6 +31,7 @@ import org.apache.pinot.spi.stream.ConsumerPartitionState;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.PartitionLagState;
+import org.apache.pinot.spi.stream.RowMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -113,14 +114,27 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa
       Map<String, ConsumerPartitionState> currentPartitionStateMap) {
     Map<String, PartitionLagState> perPartitionLag = new HashMap<>();
     for (Map.Entry<String, ConsumerPartitionState> entry: currentPartitionStateMap.entrySet()) {
-      StreamPartitionMsgOffset currentOffset = entry.getValue().getCurrentOffset();
-      StreamPartitionMsgOffset upstreamLatest = entry.getValue().getUpstreamLatestOffset();
+      ConsumerPartitionState partitionState = entry.getValue();
+      // Compute records-lag
+      StreamPartitionMsgOffset currentOffset = partitionState.getCurrentOffset();
+      StreamPartitionMsgOffset upstreamLatest = partitionState.getUpstreamLatestOffset();
+      String offsetLagString = "UNKNOWN";
+
       if (currentOffset instanceof LongMsgOffset && upstreamLatest instanceof LongMsgOffset) {
         long offsetLag = ((LongMsgOffset) upstreamLatest).getOffset() - ((LongMsgOffset) currentOffset).getOffset();
-        perPartitionLag.put(entry.getKey(), new KafkaConsumerPartitionLag(String.valueOf(offsetLag)));
-      } else {
-        perPartitionLag.put(entry.getKey(), new KafkaConsumerPartitionLag("UNKNOWN"));
+        offsetLagString = String.valueOf(offsetLag);
       }
+
+      // Compute record-availability
+      String availabilityLagMs = "UNKNOWN";
+      RowMetadata lastProcessedMessageMetadata = partitionState.getLastProcessedRowMetadata();
+      if (lastProcessedMessageMetadata != null && partitionState.getLastProcessedTimeMs() > 0) {
+        long availabilityLag = partitionState.getLastProcessedTimeMs()
+            - lastProcessedMessageMetadata.getRecordIngestionTimeMs();
+        availabilityLagMs = String.valueOf(availabilityLag);
+      }
+
+      perPartitionLag.put(entry.getKey(), new KafkaConsumerPartitionLag(offsetLagString, availabilityLagMs));
     }
     return perPartitionLag;
   }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataProducer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataProducer.java
index 0bfc7a6cac..9e1e627b12 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataProducer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataProducer.java
@@ -78,7 +78,8 @@ public class KafkaDataProducer implements StreamDataProducer {
   @Override
   public void produce(String topic, byte[] key, byte[] payload, GenericRow headers) {
     List<Header> headerList = new ArrayList<>();
-    headerList.add(new RecordHeader("producerTimestamp", String.valueOf(System.currentTimeMillis()).getBytes(
+    long nowMs = System.currentTimeMillis();
+    headerList.add(new RecordHeader("producerTimestamp", String.valueOf(nowMs).getBytes(
         StandardCharsets.UTF_8)));
     if (headers != null) {
       headers.getFieldToValueMap().forEach((k, v) -> {
@@ -88,7 +89,7 @@ public class KafkaDataProducer implements StreamDataProducer {
         }
       });
     }
-    _producer.send(new ProducerRecord<>(topic, null, key, payload, headerList));
+    _producer.send(new ProducerRecord<>(topic, null, nowMs, key, payload, headerList));
     _producer.flush();
   }
 
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
index 9132f28556..5c666a28bd 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
@@ -26,6 +26,7 @@ import io.swagger.annotations.Authorization;
 import io.swagger.annotations.SecurityDefinition;
 import io.swagger.annotations.SwaggerDefinition;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -51,7 +52,6 @@ import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.server.starter.ServerInstance;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.stream.ConsumerPartitionState;
-import org.apache.pinot.spi.stream.PartitionLagState;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
@@ -168,22 +168,24 @@ public class DebugResource {
     if (tableType == TableType.REALTIME) {
       RealtimeSegmentDataManager realtimeSegmentDataManager = (RealtimeSegmentDataManager) segmentDataManager;
       Map<String, ConsumerPartitionState> partitionStateMap = realtimeSegmentDataManager.getConsumerPartitionState();
-      Map<String, PartitionLagState> partitionLagStateMap =
-          realtimeSegmentDataManager.getPartitionToLagState(partitionStateMap);
-      Map<String, String> partitionToCurrentOffsetMap = realtimeSegmentDataManager.getPartitionToCurrentOffset();
+      Map<String, String> currentOffsets = realtimeSegmentDataManager.getPartitionToCurrentOffset();
+      Map<String, String> upstreamLatest = partitionStateMap.entrySet().stream().collect(
+          Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getUpstreamLatestOffset().toString()));
+      Map<String, String> recordsLagMap = new HashMap<>();
+      Map<String, String> availabilityLagMsMap = new HashMap<>();
+      realtimeSegmentDataManager.getPartitionToLagState(partitionStateMap).forEach((k, v) -> {
+        recordsLagMap.put(k, v.getRecordsLag());
+        availabilityLagMsMap.put(k, v.getAvailabilityLagMs());
+      });
+
       segmentConsumerInfo =
           new SegmentConsumerInfo(
               segmentDataManager.getSegmentName(),
               realtimeSegmentDataManager.getConsumerState().toString(),
               realtimeSegmentDataManager.getLastConsumedTimestamp(),
-              partitionToCurrentOffsetMap,
-              new SegmentConsumerInfo.PartitionOffsetInfo(partitionToCurrentOffsetMap,
-                  partitionStateMap.entrySet().stream().collect(
-                      Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getUpstreamLatestOffset().toString())
-                  ),
-                  partitionLagStateMap.entrySet().stream().collect(
-                      Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getRecordsLag())
-                  )));
+              currentOffsets,
+              new SegmentConsumerInfo.PartitionOffsetInfo(currentOffsets,
+                  upstreamLatest, recordsLagMap, availabilityLagMsMap));
     }
     return segmentConsumerInfo;
   }
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 8d67bf5ce7..2658a45d99 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -90,7 +90,6 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.stream.ConsumerPartitionState;
-import org.apache.pinot.spi.stream.PartitionLagState;
 import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
@@ -517,8 +516,12 @@ public class TablesResource {
           RealtimeSegmentDataManager realtimeSegmentDataManager = (RealtimeSegmentDataManager) segmentDataManager;
           Map<String, ConsumerPartitionState> partitionStateMap =
               realtimeSegmentDataManager.getConsumerPartitionState();
-          Map<String, PartitionLagState> partitionLagStateMap =
-              realtimeSegmentDataManager.getPartitionToLagState(partitionStateMap);
+          Map<String, String> recordsLagMap = new HashMap<>();
+          Map<String, String> availabilityLagMsMap = new HashMap<>();
+          realtimeSegmentDataManager.getPartitionToLagState(partitionStateMap).forEach((k, v) -> {
+            recordsLagMap.put(k, v.getRecordsLag());
+            availabilityLagMsMap.put(k, v.getAvailabilityLagMs());
+          });
           @Deprecated Map<String, String> partitiionToOffsetMap =
               realtimeSegmentDataManager.getPartitionToCurrentOffset();
           segmentConsumerInfoList.add(
@@ -530,11 +533,7 @@ public class TablesResource {
                       partitiionToOffsetMap,
                       partitionStateMap.entrySet().stream().collect(
                           Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getUpstreamLatestOffset().toString())
-                      ),
-                      partitionLagStateMap.entrySet().stream().collect(
-                          Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getRecordsLag())
-                      )))
-          );
+                      ), recordsLagMap, availabilityLagMsMap)));
         }
       }
     } catch (Exception e) {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/ConsumerPartitionState.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/ConsumerPartitionState.java
index 9da90d5bff..668f771077 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/ConsumerPartitionState.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/ConsumerPartitionState.java
@@ -29,13 +29,15 @@ public class ConsumerPartitionState {
   private final StreamPartitionMsgOffset _currentOffset;
   private final long _lastProcessedTimeMs;
   private final StreamPartitionMsgOffset _upstreamLatestOffset;
+  private final RowMetadata _lastProcessedRowMetadata;
 
   public ConsumerPartitionState(String partitionId, StreamPartitionMsgOffset currentOffset, long lastProcessedTimeMs,
-      @Nullable StreamPartitionMsgOffset upstreamLatestOffset) {
+      @Nullable StreamPartitionMsgOffset upstreamLatestOffset, @Nullable RowMetadata lastProcessedRowMetadata) {
     _partitionId = partitionId;
     _currentOffset = currentOffset;
     _lastProcessedTimeMs = lastProcessedTimeMs;
     _upstreamLatestOffset = upstreamLatestOffset;
+    _lastProcessedRowMetadata = lastProcessedRowMetadata;
   }
 
   public StreamPartitionMsgOffset getCurrentOffset() {
@@ -53,4 +55,8 @@ public class ConsumerPartitionState {
   public StreamPartitionMsgOffset getUpstreamLatestOffset() {
     return _upstreamLatestOffset;
   }
+
+  public RowMetadata getLastProcessedRowMetadata() {
+    return _lastProcessedRowMetadata;
+  }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLagState.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLagState.java
index 21e2222403..ead8246baf 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLagState.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLagState.java
@@ -27,12 +27,20 @@ public class PartitionLagState {
   protected final static String NOT_CALCULATED = "NOT_CALCULATED";
 
   /**
-   * Defines how far away the current record's offset / pointer is from upstream latest record
+   * Defines how far behind the current record's offset / pointer is from upstream latest record
    * The distance is based on actual record count.
    */
   public String getRecordsLag() {
     return NOT_CALCULATED;
   }
 
-  // TODO: Define record availability lag ($latest_record_consumption_time - $latest_record_ingestion_time)
+  /**
+   * Defines how soon after record ingestion was the record consumed by Pinot. That is, the difference between the
+   * time the record was consumed and the time at which the record was ingested upstream.
+   *
+   * @return Lag value in milliseconds
+   */
+  public String getAvailabilityLagMs() {
+    return NOT_CALCULATED;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org