You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/11/01 21:15:23 UTC
[pinot] branch master updated: Add record availability lag for Kafka connector (#9621)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new dcc496f63b Add record availability lag for Kafka connector (#9621)
dcc496f63b is described below
commit dcc496f63b4488989808ec144a7e2ffcc4697c75
Author: Navina Ramesh <na...@apache.org>
AuthorDate: Tue Nov 1 14:15:18 2022 -0700
Add record availability lag for Kafka connector (#9621)
---
.../restlet/resources/SegmentConsumerInfo.java | 43 +++++++++++++---------
.../util/ConsumingSegmentInfoReader.java | 10 ++++-
.../ConsumingSegmentInfoReaderStatelessTest.java | 18 ++++-----
.../realtime/LLRealtimeSegmentDataManager.java | 9 ++++-
.../realtime/LLRealtimeSegmentDataManagerTest.java | 5 +++
.../stream/kafka20/KafkaConsumerPartitionLag.java | 9 ++++-
.../kafka20/KafkaStreamMetadataProvider.java | 24 +++++++++---
.../stream/kafka20/server/KafkaDataProducer.java | 5 ++-
.../pinot/server/api/resources/DebugResource.java | 26 +++++++------
.../pinot/server/api/resources/TablesResource.java | 15 ++++----
.../pinot/spi/stream/ConsumerPartitionState.java | 8 +++-
.../apache/pinot/spi/stream/PartitionLagState.java | 12 +++++-
12 files changed, 123 insertions(+), 61 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java
index e77bb97d82..b62a412e17 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java
@@ -68,23 +68,28 @@ public class SegmentConsumerInfo {
@JsonIgnoreProperties(ignoreUnknown = true)
static public class PartitionOffsetInfo {
- @JsonProperty("currentOffsets")
- public Map<String, String> _currentOffsets;
-
- @JsonProperty("recordsLag")
- public Map<String, String> _recordsLag;
-
- @JsonProperty("latestUpstreamOffsets")
- public Map<String, String> _latestUpstreamOffsets;
-
- public PartitionOffsetInfo(
- @JsonProperty("currentOffsets") Map<String, String> currentOffsets,
- @JsonProperty("latestUpstreamOffsets") Map<String, String> latestUpstreamOffsets,
- @JsonProperty("recordsLag") Map<String, String> recordsLag) {
- _currentOffsets = currentOffsets;
- _latestUpstreamOffsets = latestUpstreamOffsets;
- _recordsLag = recordsLag;
- }
+ @JsonProperty("currentOffsets")
+ public Map<String, String> _currentOffsets;
+
+ @JsonProperty("recordsLag")
+ public Map<String, String> _recordsLag;
+
+ @JsonProperty("latestUpstreamOffsets")
+ public Map<String, String> _latestUpstreamOffsets;
+
+ @JsonProperty("availabilityLagMs")
+ public Map<String, String> _availabilityLagMs;
+
+ public PartitionOffsetInfo(
+ @JsonProperty("currentOffsets") Map<String, String> currentOffsets,
+ @JsonProperty("latestUpstreamOffsets") Map<String, String> latestUpstreamOffsets,
+ @JsonProperty("recordsLag") Map<String, String> recordsLag,
+ @JsonProperty("availabilityLagMs") Map<String, String> availabilityLagMs) {
+ _currentOffsets = currentOffsets;
+ _latestUpstreamOffsets = latestUpstreamOffsets;
+ _recordsLag = recordsLag;
+ _availabilityLagMs = availabilityLagMs;
+ }
public Map<String, String> getCurrentOffsets() {
return _currentOffsets;
@@ -97,5 +102,9 @@ public class SegmentConsumerInfo {
public Map<String, String> getLatestUpstreamOffsets() {
return _latestUpstreamOffsets;
}
+
+ public Map<String, String> getAvailabilityLagMs() {
+ return _availabilityLagMs;
+ }
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
index 8539eb89f8..6caaea3087 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
@@ -83,7 +83,8 @@ public class ConsumingSegmentInfoReader {
for (SegmentConsumerInfo info : entry.getValue()) {
SegmentConsumerInfo.PartitionOffsetInfo partitionOffsetInfo = info.getPartitionOffsetInfo();
PartitionOffsetInfo offsetInfo = new PartitionOffsetInfo(partitionOffsetInfo.getCurrentOffsets(),
- partitionOffsetInfo.getLatestUpstreamOffsets(), partitionOffsetInfo.getRecordsLag());
+ partitionOffsetInfo.getLatestUpstreamOffsets(), partitionOffsetInfo.getRecordsLag(),
+ partitionOffsetInfo.getAvailabilityLagMs());
consumingSegmentInfoMap.computeIfAbsent(info.getSegmentName(), k -> new ArrayList<>()).add(
new ConsumingSegmentInfo(serverName, info.getConsumerState(), info.getLastConsumedTimestamp(),
partitionOffsetInfo.getCurrentOffsets(), offsetInfo));
@@ -237,13 +238,18 @@ public class ConsumingSegmentInfoReader {
@JsonProperty("latestUpstreamOffsetMap")
public Map<String, String> _latestUpstreamOffsetMap;
+ @JsonProperty("availabilityLagMsMap")
+ public Map<String, String> _availabilityLagMap;
+
public PartitionOffsetInfo(
@JsonProperty("currentOffsetsMap") Map<String, String> currentOffsetsMap,
@JsonProperty("latestUpstreamOffsetMap") Map<String, String> latestUpstreamOffsetMap,
- @JsonProperty("recordsLagMap") Map<String, String> recordsLagMap) {
+ @JsonProperty("recordsLagMap") Map<String, String> recordsLagMap,
+ @JsonProperty("availabilityLagMsMap") Map<String, String> availabilityLagMsMap) {
_currentOffsetsMap = currentOffsetsMap;
_latestUpstreamOffsetMap = latestUpstreamOffsetMap;
_recordsLagMap = recordsLagMap;
+ _availabilityLagMap = availabilityLagMsMap;
}
}
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java
index 8b07f641c5..c16fe5b573 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java
@@ -92,10 +92,10 @@ public class ConsumingSegmentInfoReaderStatelessTest {
.newArrayList(
new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0,
partitionToOffset0, new SegmentConsumerInfo.PartitionOffsetInfo(
- partitionToOffset0, Collections.emptyMap(), Collections.emptyMap())),
+ partitionToOffset0, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap())),
new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0,
partitionToOffset1, new SegmentConsumerInfo.PartitionOffsetInfo(
- partitionToOffset1, Collections.emptyMap(), Collections.emptyMap()))));
+ partitionToOffset1, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()))));
s0.start(uriPath, createHandler(200, s0._consumerInfos, 0));
_serverMap.put("server0", s0);
@@ -104,10 +104,10 @@ public class ConsumingSegmentInfoReaderStatelessTest {
.newArrayList(
new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0,
partitionToOffset0, new SegmentConsumerInfo.PartitionOffsetInfo(
- partitionToOffset0, Collections.emptyMap(), Collections.emptyMap())),
+ partitionToOffset0, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap())),
new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0,
partitionToOffset1, new SegmentConsumerInfo.PartitionOffsetInfo(
- partitionToOffset1, Collections.emptyMap(), Collections.emptyMap()))));
+ partitionToOffset1, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()))));
s1.start(uriPath, createHandler(200, s1._consumerInfos, 0));
_serverMap.put("server1", s1);
@@ -115,10 +115,10 @@ public class ConsumingSegmentInfoReaderStatelessTest {
FakeConsumingInfoServer s2 = new FakeConsumingInfoServer(Lists
.newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "NOT_CONSUMING", 0,
partitionToOffset0, new SegmentConsumerInfo.PartitionOffsetInfo(
- partitionToOffset0, Collections.emptyMap(), Collections.emptyMap())),
+ partitionToOffset0, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap())),
new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1,
new SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset1, Collections.emptyMap(),
- Collections.emptyMap()))));
+ Collections.emptyMap(), Collections.emptyMap()))));
s2.start(uriPath, createHandler(200, s2._consumerInfos, 0));
_serverMap.put("server2", s2);
@@ -126,7 +126,7 @@ public class ConsumingSegmentInfoReaderStatelessTest {
FakeConsumingInfoServer s3 = new FakeConsumingInfoServer(
Lists.newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0,
partitionToOffset1, new SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset1, Collections.emptyMap(),
- Collections.emptyMap()))));
+ Collections.emptyMap(), Collections.emptyMap()))));
s3.start(uriPath, createHandler(200, s3._consumerInfos, 0));
_serverMap.put("server3", s3);
@@ -134,10 +134,10 @@ public class ConsumingSegmentInfoReaderStatelessTest {
FakeConsumingInfoServer s4 = new FakeConsumingInfoServer(Lists
.newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0,
partitionToOffset0, new SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset0,
- Collections.emptyMap(), Collections.emptyMap())),
+ Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap())),
new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, partitionToOffset1,
new SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset1, Collections.emptyMap(),
- Collections.emptyMap()))));
+ Collections.emptyMap(), Collections.emptyMap()))));
s4.start(uriPath, createHandler(200, s4._consumerInfos, TIMEOUT_MSEC * EXTENDED_TIMEOUT_FACTOR));
_serverMap.put("server4", s4);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index f395d71e56..b720f486bb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -280,6 +280,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
private final StreamPartitionMsgOffset _startOffset;
private final PartitionLevelStreamConfig _partitionLevelStreamConfig;
+ private RowMetadata _lastRowMetadata;
+ private long _lastConsumedTimestampMs = -1;
+
private long _lastLogTime = 0;
private int _lastConsumedCount = 0;
private String _stopReason = null;
@@ -577,6 +580,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
try {
canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
indexedMessageCount++;
+ _lastRowMetadata = msgMetadata;
+ _lastConsumedTimestampMs = System.currentTimeMillis();
realtimeRowsConsumedMeter =
_serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1,
realtimeRowsConsumedMeter);
@@ -818,14 +823,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
@Override
public long getLastConsumedTimestamp() {
- return _lastLogTime;
+ return _lastConsumedTimestampMs;
}
@Override
public Map<String, ConsumerPartitionState> getConsumerPartitionState() {
String partitionGroupId = String.valueOf(_partitionGroupId);
return Collections.singletonMap(partitionGroupId, new ConsumerPartitionState(partitionGroupId, getCurrentOffset(),
- getLastConsumedTimestamp(), fetchLatestStreamOffset(5_000)));
+ getLastConsumedTimestamp(), fetchLatestStreamOffset(5_000), _lastRowMetadata));
}
@Override
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index b911e23106..6a52a31df1 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -1084,6 +1084,11 @@ public class LLRealtimeSegmentDataManagerTest {
_postConsumeStoppedCalled = true;
}
+ // TODO: Some of the tests rely on specific number of calls to the `now()` method in the SegmentDataManager.
+ // This is not a good coding practice and makes the code very fragile. This needs to be fixed.
+ // Invoking now() in any part of LLRealtimeSegmentDataManager code will break the following tests:
+ // 1. LLRealtimeSegmentDataManagerTest.testShouldNotSkipUnfilteredMessagesIfNotIndexedAndRowCountThresholdIsReached
+ // 2. LLRealtimeSegmentDataManagerTest.testShouldNotSkipUnfilteredMessagesIfNotIndexedAndTimeThresholdIsReached
@Override
protected long now() {
// now() is called in the constructor before _timeSupplier is set
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerPartitionLag.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerPartitionLag.java
index 709c2a7d79..4e4ab2a034 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerPartitionLag.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerPartitionLag.java
@@ -23,12 +23,19 @@ import org.apache.pinot.spi.stream.PartitionLagState;
public class KafkaConsumerPartitionLag extends PartitionLagState {
private final String _recordsLag;
+ private final String _availabilityLagMs;
- public KafkaConsumerPartitionLag(String recordsLag) {
+ public KafkaConsumerPartitionLag(String recordsLag, String availabilityLagMs) {
_recordsLag = recordsLag;
+ _availabilityLagMs = availabilityLagMs;
}
public String getRecordsLag() {
return _recordsLag;
}
+
+ @Override
+ public String getAvailabilityLagMs() {
+ return _availabilityLagMs;
+ }
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index c6fa7ff218..022b38273e 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -31,6 +31,7 @@ import org.apache.pinot.spi.stream.ConsumerPartitionState;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionLagState;
+import org.apache.pinot.spi.stream.RowMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -113,14 +114,27 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa
Map<String, ConsumerPartitionState> currentPartitionStateMap) {
Map<String, PartitionLagState> perPartitionLag = new HashMap<>();
for (Map.Entry<String, ConsumerPartitionState> entry: currentPartitionStateMap.entrySet()) {
- StreamPartitionMsgOffset currentOffset = entry.getValue().getCurrentOffset();
- StreamPartitionMsgOffset upstreamLatest = entry.getValue().getUpstreamLatestOffset();
+ ConsumerPartitionState partitionState = entry.getValue();
+ // Compute records-lag
+ StreamPartitionMsgOffset currentOffset = partitionState.getCurrentOffset();
+ StreamPartitionMsgOffset upstreamLatest = partitionState.getUpstreamLatestOffset();
+ String offsetLagString = "UNKNOWN";
+
if (currentOffset instanceof LongMsgOffset && upstreamLatest instanceof LongMsgOffset) {
long offsetLag = ((LongMsgOffset) upstreamLatest).getOffset() - ((LongMsgOffset) currentOffset).getOffset();
- perPartitionLag.put(entry.getKey(), new KafkaConsumerPartitionLag(String.valueOf(offsetLag)));
- } else {
- perPartitionLag.put(entry.getKey(), new KafkaConsumerPartitionLag("UNKNOWN"));
+ offsetLagString = String.valueOf(offsetLag);
}
+
+ // Compute record-availability
+ String availabilityLagMs = "UNKNOWN";
+ RowMetadata lastProcessedMessageMetadata = partitionState.getLastProcessedRowMetadata();
+ if (lastProcessedMessageMetadata != null && partitionState.getLastProcessedTimeMs() > 0) {
+ long availabilityLag = partitionState.getLastProcessedTimeMs()
+ - lastProcessedMessageMetadata.getRecordIngestionTimeMs();
+ availabilityLagMs = String.valueOf(availabilityLag);
+ }
+
+ perPartitionLag.put(entry.getKey(), new KafkaConsumerPartitionLag(offsetLagString, availabilityLagMs));
}
return perPartitionLag;
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataProducer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataProducer.java
index 0bfc7a6cac..9e1e627b12 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataProducer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataProducer.java
@@ -78,7 +78,8 @@ public class KafkaDataProducer implements StreamDataProducer {
@Override
public void produce(String topic, byte[] key, byte[] payload, GenericRow headers) {
List<Header> headerList = new ArrayList<>();
- headerList.add(new RecordHeader("producerTimestamp", String.valueOf(System.currentTimeMillis()).getBytes(
+ long nowMs = System.currentTimeMillis();
+ headerList.add(new RecordHeader("producerTimestamp", String.valueOf(nowMs).getBytes(
StandardCharsets.UTF_8)));
if (headers != null) {
headers.getFieldToValueMap().forEach((k, v) -> {
@@ -88,7 +89,7 @@ public class KafkaDataProducer implements StreamDataProducer {
}
});
}
- _producer.send(new ProducerRecord<>(topic, null, key, payload, headerList));
+ _producer.send(new ProducerRecord<>(topic, null, nowMs, key, payload, headerList));
_producer.flush();
}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
index 9132f28556..5c666a28bd 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
@@ -26,6 +26,7 @@ import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -51,7 +52,6 @@ import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.server.starter.ServerInstance;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.stream.ConsumerPartitionState;
-import org.apache.pinot.spi.stream.PartitionLagState;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
@@ -168,22 +168,24 @@ public class DebugResource {
if (tableType == TableType.REALTIME) {
RealtimeSegmentDataManager realtimeSegmentDataManager = (RealtimeSegmentDataManager) segmentDataManager;
Map<String, ConsumerPartitionState> partitionStateMap = realtimeSegmentDataManager.getConsumerPartitionState();
- Map<String, PartitionLagState> partitionLagStateMap =
- realtimeSegmentDataManager.getPartitionToLagState(partitionStateMap);
- Map<String, String> partitionToCurrentOffsetMap = realtimeSegmentDataManager.getPartitionToCurrentOffset();
+ Map<String, String> currentOffsets = realtimeSegmentDataManager.getPartitionToCurrentOffset();
+ Map<String, String> upstreamLatest = partitionStateMap.entrySet().stream().collect(
+ Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getUpstreamLatestOffset().toString()));
+ Map<String, String> recordsLagMap = new HashMap<>();
+ Map<String, String> availabilityLagMsMap = new HashMap<>();
+ realtimeSegmentDataManager.getPartitionToLagState(partitionStateMap).forEach((k, v) -> {
+ recordsLagMap.put(k, v.getRecordsLag());
+ availabilityLagMsMap.put(k, v.getAvailabilityLagMs());
+ });
+
segmentConsumerInfo =
new SegmentConsumerInfo(
segmentDataManager.getSegmentName(),
realtimeSegmentDataManager.getConsumerState().toString(),
realtimeSegmentDataManager.getLastConsumedTimestamp(),
- partitionToCurrentOffsetMap,
- new SegmentConsumerInfo.PartitionOffsetInfo(partitionToCurrentOffsetMap,
- partitionStateMap.entrySet().stream().collect(
- Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getUpstreamLatestOffset().toString())
- ),
- partitionLagStateMap.entrySet().stream().collect(
- Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getRecordsLag())
- )));
+ currentOffsets,
+ new SegmentConsumerInfo.PartitionOffsetInfo(currentOffsets,
+ upstreamLatest, recordsLagMap, availabilityLagMsMap));
}
return segmentConsumerInfo;
}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 8d67bf5ce7..2658a45d99 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -90,7 +90,6 @@ import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.stream.ConsumerPartitionState;
-import org.apache.pinot.spi.stream.PartitionLagState;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
@@ -517,8 +516,12 @@ public class TablesResource {
RealtimeSegmentDataManager realtimeSegmentDataManager = (RealtimeSegmentDataManager) segmentDataManager;
Map<String, ConsumerPartitionState> partitionStateMap =
realtimeSegmentDataManager.getConsumerPartitionState();
- Map<String, PartitionLagState> partitionLagStateMap =
- realtimeSegmentDataManager.getPartitionToLagState(partitionStateMap);
+ Map<String, String> recordsLagMap = new HashMap<>();
+ Map<String, String> availabilityLagMsMap = new HashMap<>();
+ realtimeSegmentDataManager.getPartitionToLagState(partitionStateMap).forEach((k, v) -> {
+ recordsLagMap.put(k, v.getRecordsLag());
+ availabilityLagMsMap.put(k, v.getAvailabilityLagMs());
+ });
@Deprecated Map<String, String> partitiionToOffsetMap =
realtimeSegmentDataManager.getPartitionToCurrentOffset();
segmentConsumerInfoList.add(
@@ -530,11 +533,7 @@ public class TablesResource {
partitiionToOffsetMap,
partitionStateMap.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getUpstreamLatestOffset().toString())
- ),
- partitionLagStateMap.entrySet().stream().collect(
- Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getRecordsLag())
- )))
- );
+ ), recordsLagMap, availabilityLagMsMap)));
}
}
} catch (Exception e) {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/ConsumerPartitionState.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/ConsumerPartitionState.java
index 9da90d5bff..668f771077 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/ConsumerPartitionState.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/ConsumerPartitionState.java
@@ -29,13 +29,15 @@ public class ConsumerPartitionState {
private final StreamPartitionMsgOffset _currentOffset;
private final long _lastProcessedTimeMs;
private final StreamPartitionMsgOffset _upstreamLatestOffset;
+ private final RowMetadata _lastProcessedRowMetadata;
public ConsumerPartitionState(String partitionId, StreamPartitionMsgOffset currentOffset, long lastProcessedTimeMs,
- @Nullable StreamPartitionMsgOffset upstreamLatestOffset) {
+ @Nullable StreamPartitionMsgOffset upstreamLatestOffset, @Nullable RowMetadata lastProcessedRowMetadata) {
_partitionId = partitionId;
_currentOffset = currentOffset;
_lastProcessedTimeMs = lastProcessedTimeMs;
_upstreamLatestOffset = upstreamLatestOffset;
+ _lastProcessedRowMetadata = lastProcessedRowMetadata;
}
public StreamPartitionMsgOffset getCurrentOffset() {
@@ -53,4 +55,8 @@ public class ConsumerPartitionState {
public StreamPartitionMsgOffset getUpstreamLatestOffset() {
return _upstreamLatestOffset;
}
+
+ public RowMetadata getLastProcessedRowMetadata() {
+ return _lastProcessedRowMetadata;
+ }
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLagState.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLagState.java
index 21e2222403..ead8246baf 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLagState.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLagState.java
@@ -27,12 +27,20 @@ public class PartitionLagState {
protected final static String NOT_CALCULATED = "NOT_CALCULATED";
/**
- * Defines how far away the current record's offset / pointer is from upstream latest record
+ * Defines how far behind the current record's offset / pointer is from upstream latest record
* The distance is based on actual record count.
*/
public String getRecordsLag() {
return NOT_CALCULATED;
}
- // TODO: Define record availability lag ($latest_record_consumption_time - $latest_record_ingestion_time)
+ /**
+ * Defines how soon after record ingestion was the record consumed by Pinot. That is, the difference between the
+ * time the record was consumed and the time at which the record was ingested upstream.
+ *
+ * @return Lag value in milliseconds
+ */
+ public String getAvailabilityLagMs() {
+ return NOT_CALCULATED;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org