You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/02/06 18:43:00 UTC

[jira] [Commented] (KAFKA-6184) report a metric of the lag between the consumer offset and the start offset of the log

    [ https://issues.apache.org/jira/browse/KAFKA-6184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16354323#comment-16354323 ] 

ASF GitHub Bot commented on KAFKA-6184:
---------------------------------------

junrao closed pull request #4191: KAFKA-6184: report a metric of the lag between the consumer offset ...
URL: https://github.com/apache/kafka/pull/4191
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 5dc0b26f8fb..55b7bbb9a8c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -44,6 +44,7 @@
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.BufferSupplier;
@@ -578,6 +579,11 @@ private long endTimestamp() {
                 if (partitionLag != null)
                     this.sensors.recordPartitionLag(partitionRecords.partition, partitionLag);
 
+                Long lead = subscriptions.partitionLead(partitionRecords.partition);
+                if (lead != null) {
+                    this.sensors.recordPartitionLead(partitionRecords.partition, lead);
+                }
+
                 return partRecords;
             } else {
                 // these records aren't next in line based on the last consumed position, ignore them
@@ -860,6 +866,11 @@ private PartitionRecords parseCompletedFetch(CompletedFetch completedFetch) {
                     subscriptions.updateHighWatermark(tp, partition.highWatermark);
                 }
 
+                if (partition.logStartOffset >= 0) {
+                    log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset);
+                    subscriptions.updateLogStartOffset(tp, partition.logStartOffset);
+                }
+
                 if (partition.lastStableOffset >= 0) {
                     log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset);
                     subscriptions.updateLastStableOffset(tp, partition.lastStableOffset);
@@ -934,7 +945,7 @@ private PartitionRecords parseCompletedFetch(CompletedFetch completedFetch) {
 
     @Override
     public void onAssignment(Set<TopicPartition> assignment) {
-        sensors.updatePartitionLagSensors(assignment);
+        sensors.updatePartitionLagAndLeadSensors(assignment);
     }
 
     public static Sensor throttleTimeSensor(Metrics metrics, FetcherMetricsRegistry metricsRegistry) {
@@ -1250,6 +1261,7 @@ protected void increment(int bytes, int records) {
         private final Sensor recordsFetched;
         private final Sensor fetchLatency;
         private final Sensor recordsFetchLag;
+        private final Sensor recordsFetchLead;
 
         private Set<TopicPartition> assignedPartitions;
 
@@ -1276,6 +1288,9 @@ private FetchManagerMetrics(Metrics metrics, FetcherMetricsRegistry metricsRegis
 
             this.recordsFetchLag = metrics.sensor("records-lag");
             this.recordsFetchLag.add(metrics.metricInstance(metricsRegistry.recordsLagMax), new Max());
+
+            this.recordsFetchLead = metrics.sensor("records-lead");
+            this.recordsFetchLead.add(metrics.metricInstance(metricsRegistry.recordsLeadMin), new Min());
         }
 
         private void recordTopicFetchMetrics(String topic, int bytes, int records) {
@@ -1311,16 +1326,37 @@ private void recordTopicFetchMetrics(String topic, int bytes, int records) {
             recordsFetched.record(records);
         }
 
-        private void updatePartitionLagSensors(Set<TopicPartition> assignedPartitions) {
+        private void updatePartitionLagAndLeadSensors(Set<TopicPartition> assignedPartitions) {
             if (this.assignedPartitions != null) {
                 for (TopicPartition tp : this.assignedPartitions) {
-                    if (!assignedPartitions.contains(tp))
+                    if (!assignedPartitions.contains(tp)) {
                         metrics.removeSensor(partitionLagMetricName(tp));
+                        metrics.removeSensor(partitionLeadMetricName(tp));
+                    }
                 }
             }
             this.assignedPartitions = assignedPartitions;
         }
 
+        private void recordPartitionLead(TopicPartition tp, long lead) {
+            this.recordsFetchLead.record(lead);
+
+            String name = partitionLeadMetricName(tp);
+            Sensor recordsLead = this.metrics.getSensor(name);
+            if (recordsLead == null) {
+                Map<String, String> metricTags = new HashMap<>(2);
+                metricTags.put("topic", tp.topic().replace('.', '_'));
+                metricTags.put("partition", String.valueOf(tp.partition()));
+
+                recordsLead = this.metrics.sensor(name);
+
+                recordsLead.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLead, metricTags), new Value());
+                recordsLead.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLeadMin, metricTags), new Min());
+                recordsLead.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLeadAvg, metricTags), new Avg());
+            }
+            recordsLead.record(lead);
+        }
+
         private void recordPartitionLag(TopicPartition tp, long lag) {
             this.recordsFetchLag.record(lag);
 
@@ -1353,6 +1389,11 @@ private void recordPartitionLag(TopicPartition tp, long lag) {
         private static String partitionLagMetricName(TopicPartition tp) {
             return tp + ".records-lag";
         }
+
+        private static String partitionLeadMetricName(TopicPartition tp) {
+            return tp + ".records-lead";
+        }
+
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
index f15ac062abf..b75d0cb24d9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
@@ -37,6 +37,7 @@
     public MetricNameTemplate fetchRequestRate;
     public MetricNameTemplate fetchRequestTotal;
     public MetricNameTemplate recordsLagMax;
+    public MetricNameTemplate recordsLeadMin;
     public MetricNameTemplate fetchThrottleTimeAvg;
     public MetricNameTemplate fetchThrottleTimeMax;
     public MetricNameTemplate topicFetchSizeAvg;
@@ -49,6 +50,9 @@
     public MetricNameTemplate partitionRecordsLag;
     public MetricNameTemplate partitionRecordsLagMax;
     public MetricNameTemplate partitionRecordsLagAvg;
+    public MetricNameTemplate partitionRecordsLead;
+    public MetricNameTemplate partitionRecordsLeadMin;
+    public MetricNameTemplate partitionRecordsLeadAvg;
     // To remove in 2.0
     public MetricNameTemplate partitionRecordsLagDeprecated;
     public MetricNameTemplate partitionRecordsLagMaxDeprecated;
@@ -95,6 +99,8 @@ public FetcherMetricsRegistry(Set<String> tags, String metricGrpPrefix) {
 
         this.recordsLagMax = new MetricNameTemplate("records-lag-max", groupName, 
                 "The maximum lag in terms of number of records for any partition in this window", tags);
+        this.recordsLeadMin = new MetricNameTemplate("records-lead-min", groupName,
+                "The minimum lead in terms of number of records for any partition in this window", tags);
 
         this.fetchThrottleTimeAvg = new MetricNameTemplate("fetch-throttle-time-avg", groupName, 
                 "The average throttle time in ms", tags);
@@ -137,7 +143,12 @@ public FetcherMetricsRegistry(Set<String> tags, String metricGrpPrefix) {
                 "The max lag of the partition", partitionTags);
         this.partitionRecordsLagAvg = new MetricNameTemplate("records-lag-avg", groupName,
                 "The average lag of the partition", partitionTags);
-        
+        this.partitionRecordsLead = new MetricNameTemplate("records-lead", groupName,
+                "The latest lead of the partition", partitionTags);
+        this.partitionRecordsLeadMin = new MetricNameTemplate("records-lead-min", groupName,
+                "The min lead of the partition", partitionTags);
+        this.partitionRecordsLeadAvg = new MetricNameTemplate("records-lead-avg", groupName,
+                "The average lead of the partition", partitionTags);
     
     }
     
@@ -155,6 +166,7 @@ public FetcherMetricsRegistry(Set<String> tags, String metricGrpPrefix) {
             fetchRequestRate,
             fetchRequestTotal,
             recordsLagMax,
+            recordsLeadMin,
             fetchThrottleTimeAvg,
             fetchThrottleTimeMax,
             topicFetchSizeAvg,
@@ -169,7 +181,10 @@ public FetcherMetricsRegistry(Set<String> tags, String metricGrpPrefix) {
             partitionRecordsLagMaxDeprecated,
             partitionRecordsLag,
             partitionRecordsLagAvg,
-            partitionRecordsLagMax
+            partitionRecordsLagMax,
+            partitionRecordsLead,
+            partitionRecordsLeadMin,
+            partitionRecordsLeadAvg
         );
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index adced58d44f..822870713f4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -327,10 +327,19 @@ public Long partitionLag(TopicPartition tp, IsolationLevel isolationLevel) {
             return topicPartitionState.highWatermark == null ? null : topicPartitionState.highWatermark - topicPartitionState.position;
     }
 
+    public Long partitionLead(TopicPartition tp) {
+        TopicPartitionState topicPartitionState = assignedState(tp);
+        return topicPartitionState.logStartOffset == null ? null : topicPartitionState.position - topicPartitionState.logStartOffset;
+    }
+
     public void updateHighWatermark(TopicPartition tp, long highWatermark) {
         assignedState(tp).highWatermark = highWatermark;
     }
 
+    public void updateLogStartOffset(TopicPartition tp, long logStartOffset) {
+        assignedState(tp).logStartOffset = logStartOffset;
+    }
+
     public void updateLastStableOffset(TopicPartition tp, long lastStableOffset) {
         assignedState(tp).lastStableOffset = lastStableOffset;
     }
@@ -435,6 +444,7 @@ public void fireOnAssignment(Set<TopicPartition> assignment) {
     private static class TopicPartitionState {
         private Long position; // last consumed position
         private Long highWatermark; // the high watermark from last fetch
+        private Long logStartOffset; // the log start offset
         private Long lastStableOffset;
         private OffsetAndMetadata committed;  // last committed position
         private boolean paused;  // whether this partition has been paused by the user
@@ -444,6 +454,7 @@ public TopicPartitionState() {
             this.paused = false;
             this.position = null;
             this.highWatermark = null;
+            this.logStartOffset = null;
             this.lastStableOffset = null;
             this.committed = null;
             this.resetStrategy = null;
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 26d7a50cf60..fc0dc054dde 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1203,6 +1203,45 @@ public void testFetcherMetrics() {
         assertFalse(allMetrics.containsKey(partitionLagMetric));
     }
 
+    @Test
+    public void testFetcherLeadMetric() {
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        MetricName minLeadMetric = metrics.metricInstance(metricsRegistry.recordsLeadMin);
+        Map<String, String> tags = new HashMap<>(2);
+        tags.put("topic", tp0.topic());
+        tags.put("partition", String.valueOf(tp0.partition()));
+        MetricName partitionLeadMetric = metrics.metricName("records-lead", metricGroup, "", tags);
+
+        Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+        KafkaMetric recordsFetchLeadMin = allMetrics.get(minLeadMetric);
+
+        // recordsFetchLeadMin should be initialized to MAX_VALUE
+        assertEquals(Double.MAX_VALUE, recordsFetchLeadMin.value(), EPSILON);
+
+        // recordsFetchLeadMin should be position - logStartOffset after receiving an empty FetchResponse
+        fetchRecords(tp0, MemoryRecords.EMPTY, Errors.NONE, 100L, -1L, 0L, 0);
+        assertEquals(0L, recordsFetchLeadMin.value(), EPSILON);
+
+        KafkaMetric partitionLead = allMetrics.get(partitionLeadMetric);
+        assertEquals(0L, partitionLead.value(), EPSILON);
+
+        // recordsFetchLeadMin should be position - logStartOffset after receiving a non-empty FetchResponse
+        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+                TimestampType.CREATE_TIME, 0L);
+        for (int v = 0; v < 3; v++) {
+            builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
+        }
+        fetchRecords(tp0, builder.build(), Errors.NONE, 200L, -1L, 0L, 0);
+        assertEquals(0L, recordsFetchLeadMin.value(), EPSILON);
+        assertEquals(3L, partitionLead.value(), EPSILON);
+
+        // verify de-registration of partition lag
+        subscriptions.unsubscribe();
+        assertFalse(allMetrics.containsKey(partitionLeadMetric));
+    }
+
     @Test
     public void testReadCommittedLagMetric() {
         Metrics metrics = new Metrics();
@@ -1421,6 +1460,14 @@ public void testFetcherMetricsTemplates() throws Exception {
         return fetcher.fetchedRecords();
     }
 
+    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(
+            TopicPartition tp, MemoryRecords records, Errors error, long hw, long lastStableOffset, long logStartOffset, int throttleTime) {
+        assertEquals(1, fetcher.sendFetches());
+        client.prepareResponse(fetchResponse(tp, records, error, hw, lastStableOffset, logStartOffset, throttleTime));
+        consumerClient.poll(0);
+        return fetcher.fetchedRecords();
+    }
+
     @Test
     public void testGetOffsetsForTimesTimeout() {
         try {
@@ -2044,6 +2091,13 @@ private FetchResponse fetchResponse(TopicPartition tp, MemoryRecords records, Er
         return new FetchResponse(new LinkedHashMap<>(partitions), throttleTime);
     }
 
+    private FetchResponse fetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw,
+                                        long lastStableOffset, long logStartOffset, int throttleTime) {
+        Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp,
+                new FetchResponse.PartitionData(error, hw, lastStableOffset, logStartOffset, null, records));
+        return new FetchResponse(new LinkedHashMap<>(partitions), throttleTime);
+    }
+
     private MetadataResponse newMetadataResponse(String topic, Errors error) {
         List<MetadataResponse.PartitionMetadata> partitionsMetadata = new ArrayList<>();
         if (error == Errors.NONE) {
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 103a28ae67b..a06e9e36528 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -1375,6 +1375,54 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(500, consumer0.committed(tp2).offset)
   }
 
+  @Test
+  def testPerPartitionLeadMetricsCleanUpWithSubscribe() {
+    val numMessages = 1000
+    val topic2 = "topic2"
+    createTopic(topic2, 2, serverCount)
+    // send some messages.
+    sendRecords(numMessages, tp)
+    // Test subscribe
+    // Create a consumer and consumer some messages.
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithSubscribe")
+    consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithSubscribe")
+    val consumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+    try {
+      val listener0 = new TestConsumerReassignmentListener
+      consumer.subscribe(List(topic, topic2).asJava, listener0)
+      var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
+      TestUtils.waitUntilTrue(() => {
+        records = consumer.poll(100)
+        !records.records(tp).isEmpty
+      }, "Consumer did not consume any message before timeout.")
+      assertEquals("should be assigned once", 1, listener0.callsToAssigned)
+      // Verify the metric exist.
+      val tags1 = new util.HashMap[String, String]()
+      tags1.put("client-id", "testPerPartitionLeadMetricsCleanUpWithSubscribe")
+      tags1.put("topic", tp.topic())
+      tags1.put("partition", String.valueOf(tp.partition()))
+
+      val tags2 = new util.HashMap[String, String]()
+      tags2.put("client-id", "testPerPartitionLeadMetricsCleanUpWithSubscribe")
+      tags2.put("topic", tp2.topic())
+      tags2.put("partition", String.valueOf(tp2.partition()))
+      val fetchLead0 = consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1))
+      assertNotNull(fetchLead0)
+      assertTrue(s"The lead should be ${records.count}", fetchLead0.metricValue() == records.count)
+
+      // Remove topic from subscription
+      consumer.subscribe(List(topic2).asJava, listener0)
+      TestUtils.waitUntilTrue(() => {
+        consumer.poll(100)
+        listener0.callsToAssigned >= 2
+      }, "Expected rebalance did not occur.")
+      // Verify the metric has gone
+      assertNull(consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1)))
+      assertNull(consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags2)))
+    } finally {
+      consumer.close()
+    }
+  }
 
   @Test
   def testPerPartitionLagMetricsCleanUpWithSubscribe() {
@@ -1426,6 +1474,41 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @Test
+  def testPerPartitionLeadMetricsCleanUpWithAssign() {
+    val numMessages = 1000
+    // Test assign
+    // send some messages.
+    sendRecords(numMessages, tp)
+    sendRecords(numMessages, tp2)
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithAssign")
+    consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithAssign")
+    val consumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+    try {
+      consumer.assign(List(tp).asJava)
+      var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
+      TestUtils.waitUntilTrue(() => {
+        records = consumer.poll(100)
+        !records.records(tp).isEmpty
+      }, "Consumer did not consume any message before timeout.")
+      // Verify the metric exist.
+      val tags = new util.HashMap[String, String]()
+      tags.put("client-id", "testPerPartitionLeadMetricsCleanUpWithAssign")
+      tags.put("topic", tp.topic())
+      tags.put("partition", String.valueOf(tp.partition()))
+      val fetchLead = consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags))
+      assertNotNull(fetchLead)
+
+      assertTrue(s"The lead should be ${records.count}", records.count == fetchLead.metricValue())
+
+      consumer.assign(List(tp2).asJava)
+      TestUtils.waitUntilTrue(() => !consumer.poll(100).isEmpty, "Consumer did not consume any message before timeout.")
+      assertNull(consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags)))
+    } finally {
+      consumer.close()
+    }
+  }
+
   @Test
   def testPerPartitionLagMetricsCleanUpWithAssign() {
     val numMessages = 1000
@@ -1467,6 +1550,34 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  @Test
+  def testPerPartitionLeadWithMaxPollRecords() {
+    val numMessages = 1000
+    val maxPollRecords = 10
+    sendRecords(numMessages, tp)
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLeadWithMaxPollRecords")
+    consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLeadWithMaxPollRecords")
+    consumerConfig.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString)
+    val consumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+    consumer.assign(List(tp).asJava)
+    try {
+      var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
+      TestUtils.waitUntilTrue(() => {
+        records = consumer.poll(100)
+        !records.isEmpty
+      }, "Consumer did not consume any message before timeout.")
+
+      val tags = new util.HashMap[String, String]()
+      tags.put("client-id", "testPerPartitionLeadWithMaxPollRecords")
+      tags.put("topic", tp.topic())
+      tags.put("partition", String.valueOf(tp.partition()))
+      val lead = consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags))
+      assertTrue(s"The lead should be $maxPollRecords", lead.metricValue() == maxPollRecords)
+    } finally {
+      consumer.close()
+    }
+  }
+
   @Test
   def testPerPartitionLagWithMaxPollRecords() {
     val numMessages = 1000


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> report a metric of the lag between the consumer offset and the start offset of the log
> --------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6184
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6184
>             Project: Kafka
>          Issue Type: Improvement
>    Affects Versions: 1.0.0
>            Reporter: Jun Rao
>            Assignee: huxihx
>            Priority: Major
>              Labels: needs-kip
>             Fix For: 1.2.0
>
>
> Currently, the consumer reports a metric of the lag between the high watermark of a log and the consumer offset. It will be useful to report a similar lag metric between the consumer offset and the start offset of the log. If this latter lag gets close to 0, it's an indication that the consumer may lose data soon.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)