You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/02/06 18:40:55 UTC

[kafka] branch trunk updated: KAFKA-6184; report a metric of the lag between the consumer offset ...

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7c97b23  KAFKA-6184; report a metric of the lag between the consumer offset ...
7c97b23 is described below

commit 7c97b239a5261cf030984f71427287196e657d95
Author: huxihx <hu...@hotmail.com>
AuthorDate: Tue Feb 6 10:40:51 2018 -0800

    KAFKA-6184; report a metric of the lag between the consumer offset ...
    
    Add `records-lead` and partition-level `{topic}-{partition}.records-lead-min|avg` for fetcher metrics.
    
    junrao  Please kindly review. Thanks.
    
    Author: huxihx <hu...@hotmail.com>
    
    Reviewers: Jun Rao <ju...@gmail.com>
    
    Closes #4191 from huxihx/KAFKA-6184
---
 .../kafka/clients/consumer/internals/Fetcher.java  |  47 ++++++++-
 .../consumer/internals/FetcherMetricsRegistry.java |  19 +++-
 .../consumer/internals/SubscriptionState.java      |  11 ++
 .../clients/consumer/internals/FetcherTest.java    |  54 ++++++++++
 .../kafka/api/PlaintextConsumerTest.scala          | 111 +++++++++++++++++++++
 5 files changed, 237 insertions(+), 5 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 32782ee..0711d15 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -45,6 +45,7 @@ import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.BufferSupplier;
@@ -587,6 +588,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                 if (partitionLag != null)
                     this.sensors.recordPartitionLag(partitionRecords.partition, partitionLag);
 
+                Long lead = subscriptions.partitionLead(partitionRecords.partition);
+                if (lead != null) {
+                    this.sensors.recordPartitionLead(partitionRecords.partition, lead);
+                }
+
                 return partRecords;
             } else {
                 // these records aren't next in line based on the last consumed position, ignore them
@@ -871,6 +877,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                     subscriptions.updateHighWatermark(tp, partition.highWatermark);
                 }
 
+                if (partition.logStartOffset >= 0) {
+                    log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset);
+                    subscriptions.updateLogStartOffset(tp, partition.logStartOffset);
+                }
+
                 if (partition.lastStableOffset >= 0) {
                     log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset);
                     subscriptions.updateLastStableOffset(tp, partition.lastStableOffset);
@@ -945,7 +956,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
 
     @Override
     public void onAssignment(Set<TopicPartition> assignment) {
-        sensors.updatePartitionLagSensors(assignment);
+        sensors.updatePartitionLagAndLeadSensors(assignment);
     }
 
     public static Sensor throttleTimeSensor(Metrics metrics, FetcherMetricsRegistry metricsRegistry) {
@@ -1261,6 +1272,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         private final Sensor recordsFetched;
         private final Sensor fetchLatency;
         private final Sensor recordsFetchLag;
+        private final Sensor recordsFetchLead;
 
         private Set<TopicPartition> assignedPartitions;
 
@@ -1287,6 +1299,9 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
 
             this.recordsFetchLag = metrics.sensor("records-lag");
             this.recordsFetchLag.add(metrics.metricInstance(metricsRegistry.recordsLagMax), new Max());
+
+            this.recordsFetchLead = metrics.sensor("records-lead");
+            this.recordsFetchLead.add(metrics.metricInstance(metricsRegistry.recordsLeadMin), new Min());
         }
 
         private void recordTopicFetchMetrics(String topic, int bytes, int records) {
@@ -1322,16 +1337,37 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
             recordsFetched.record(records);
         }
 
-        private void updatePartitionLagSensors(Set<TopicPartition> assignedPartitions) {
+        private void updatePartitionLagAndLeadSensors(Set<TopicPartition> assignedPartitions) {
             if (this.assignedPartitions != null) {
                 for (TopicPartition tp : this.assignedPartitions) {
-                    if (!assignedPartitions.contains(tp))
+                    if (!assignedPartitions.contains(tp)) {
                         metrics.removeSensor(partitionLagMetricName(tp));
+                        metrics.removeSensor(partitionLeadMetricName(tp));
+                    }
                 }
             }
             this.assignedPartitions = assignedPartitions;
         }
 
+        private void recordPartitionLead(TopicPartition tp, long lead) {
+            this.recordsFetchLead.record(lead);
+
+            String name = partitionLeadMetricName(tp);
+            Sensor recordsLead = this.metrics.getSensor(name);
+            if (recordsLead == null) {
+                Map<String, String> metricTags = new HashMap<>(2);
+                metricTags.put("topic", tp.topic().replace('.', '_'));
+                metricTags.put("partition", String.valueOf(tp.partition()));
+
+                recordsLead = this.metrics.sensor(name);
+
+                recordsLead.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLead, metricTags), new Value());
+                recordsLead.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLeadMin, metricTags), new Min());
+                recordsLead.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLeadAvg, metricTags), new Avg());
+            }
+            recordsLead.record(lead);
+        }
+
         private void recordPartitionLag(TopicPartition tp, long lag) {
             this.recordsFetchLag.record(lag);
 
@@ -1364,6 +1400,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         private static String partitionLagMetricName(TopicPartition tp) {
             return tp + ".records-lag";
         }
+
+        private static String partitionLeadMetricName(TopicPartition tp) {
+            return tp + ".records-lead";
+        }
+
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
index 301363a..6eb4fa2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
@@ -38,6 +38,7 @@ public class FetcherMetricsRegistry {
     public MetricNameTemplate fetchRequestRate;
     public MetricNameTemplate fetchRequestTotal;
     public MetricNameTemplate recordsLagMax;
+    public MetricNameTemplate recordsLeadMin;
     public MetricNameTemplate fetchThrottleTimeAvg;
     public MetricNameTemplate fetchThrottleTimeMax;
     public MetricNameTemplate topicFetchSizeAvg;
@@ -50,6 +51,9 @@ public class FetcherMetricsRegistry {
     public MetricNameTemplate partitionRecordsLag;
     public MetricNameTemplate partitionRecordsLagMax;
     public MetricNameTemplate partitionRecordsLagAvg;
+    public MetricNameTemplate partitionRecordsLead;
+    public MetricNameTemplate partitionRecordsLeadMin;
+    public MetricNameTemplate partitionRecordsLeadAvg;
     // To remove in 2.0
     public MetricNameTemplate partitionRecordsLagDeprecated;
     public MetricNameTemplate partitionRecordsLagMaxDeprecated;
@@ -96,6 +100,8 @@ public class FetcherMetricsRegistry {
 
         this.recordsLagMax = new MetricNameTemplate("records-lag-max", groupName, 
                 "The maximum lag in terms of number of records for any partition in this window", tags);
+        this.recordsLeadMin = new MetricNameTemplate("records-lead-min", groupName,
+                "The minimum lead in terms of number of records for any partition in this window", tags);
 
         this.fetchThrottleTimeAvg = new MetricNameTemplate("fetch-throttle-time-avg", groupName, 
                 "The average throttle time in ms", tags);
@@ -138,7 +144,12 @@ public class FetcherMetricsRegistry {
                 "The max lag of the partition", partitionTags);
         this.partitionRecordsLagAvg = new MetricNameTemplate("records-lag-avg", groupName,
                 "The average lag of the partition", partitionTags);
-        
+        this.partitionRecordsLead = new MetricNameTemplate("records-lead", groupName,
+                "The latest lead of the partition", partitionTags);
+        this.partitionRecordsLeadMin = new MetricNameTemplate("records-lead-min", groupName,
+                "The min lead of the partition", partitionTags);
+        this.partitionRecordsLeadAvg = new MetricNameTemplate("records-lead-avg", groupName,
+                "The average lead of the partition", partitionTags);
     
     }
     
@@ -156,6 +167,7 @@ public class FetcherMetricsRegistry {
             fetchRequestRate,
             fetchRequestTotal,
             recordsLagMax,
+            recordsLeadMin,
             fetchThrottleTimeAvg,
             fetchThrottleTimeMax,
             topicFetchSizeAvg,
@@ -170,7 +182,10 @@ public class FetcherMetricsRegistry {
             partitionRecordsLagMaxDeprecated,
             partitionRecordsLag,
             partitionRecordsLagAvg,
-            partitionRecordsLagMax
+            partitionRecordsLagMax,
+            partitionRecordsLead,
+            partitionRecordsLeadMin,
+            partitionRecordsLeadAvg
         );
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index adced58..8228707 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -327,10 +327,19 @@ public class SubscriptionState {
             return topicPartitionState.highWatermark == null ? null : topicPartitionState.highWatermark - topicPartitionState.position;
     }
 
+    public Long partitionLead(TopicPartition tp) {
+        TopicPartitionState topicPartitionState = assignedState(tp);
+        return topicPartitionState.logStartOffset == null ? null : topicPartitionState.position - topicPartitionState.logStartOffset;
+    }
+
     public void updateHighWatermark(TopicPartition tp, long highWatermark) {
         assignedState(tp).highWatermark = highWatermark;
     }
 
+    public void updateLogStartOffset(TopicPartition tp, long logStartOffset) {
+        assignedState(tp).logStartOffset = logStartOffset;
+    }
+
     public void updateLastStableOffset(TopicPartition tp, long lastStableOffset) {
         assignedState(tp).lastStableOffset = lastStableOffset;
     }
@@ -435,6 +444,7 @@ public class SubscriptionState {
     private static class TopicPartitionState {
         private Long position; // last consumed position
         private Long highWatermark; // the high watermark from last fetch
+        private Long logStartOffset; // the log start offset
         private Long lastStableOffset;
         private OffsetAndMetadata committed;  // last committed position
         private boolean paused;  // whether this partition has been paused by the user
@@ -444,6 +454,7 @@ public class SubscriptionState {
             this.paused = false;
             this.position = null;
             this.highWatermark = null;
+            this.logStartOffset = null;
             this.lastStableOffset = null;
             this.committed = null;
             this.resetStrategy = null;
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index a0205e7..e81e3d1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1211,6 +1211,45 @@ public class FetcherTest {
     }
 
     @Test
+    public void testFetcherLeadMetric() {
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        MetricName minLeadMetric = metrics.metricInstance(metricsRegistry.recordsLeadMin);
+        Map<String, String> tags = new HashMap<>(2);
+        tags.put("topic", tp0.topic());
+        tags.put("partition", String.valueOf(tp0.partition()));
+        MetricName partitionLeadMetric = metrics.metricName("records-lead", metricGroup, "", tags);
+
+        Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+        KafkaMetric recordsFetchLeadMin = allMetrics.get(minLeadMetric);
+
+        // recordsFetchLeadMin should be initialized to MAX_VALUE
+        assertEquals(Double.MAX_VALUE, recordsFetchLeadMin.value(), EPSILON);
+
+        // recordsFetchLeadMin should be position - logStartOffset after receiving an empty FetchResponse
+        fetchRecords(tp0, MemoryRecords.EMPTY, Errors.NONE, 100L, -1L, 0L, 0);
+        assertEquals(0L, recordsFetchLeadMin.value(), EPSILON);
+
+        KafkaMetric partitionLead = allMetrics.get(partitionLeadMetric);
+        assertEquals(0L, partitionLead.value(), EPSILON);
+
+        // recordsFetchLeadMin should be position - logStartOffset after receiving a non-empty FetchResponse
+        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+                TimestampType.CREATE_TIME, 0L);
+        for (int v = 0; v < 3; v++) {
+            builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
+        }
+        fetchRecords(tp0, builder.build(), Errors.NONE, 200L, -1L, 0L, 0);
+        assertEquals(0L, recordsFetchLeadMin.value(), EPSILON);
+        assertEquals(3L, partitionLead.value(), EPSILON);
+
+        // verify de-registration of partition lag
+        subscriptions.unsubscribe();
+        assertFalse(allMetrics.containsKey(partitionLeadMetric));
+    }
+
+    @Test
     public void testReadCommittedLagMetric() {
         Metrics metrics = new Metrics();
         fetcher = createFetcher(subscriptions, metrics, new ByteArrayDeserializer(),
@@ -1430,6 +1469,14 @@ public class FetcherTest {
         return fetcher.fetchedRecords();
     }
 
+    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(
+            TopicPartition tp, MemoryRecords records, Errors error, long hw, long lastStableOffset, long logStartOffset, int throttleTime) {
+        assertEquals(1, fetcher.sendFetches());
+        client.prepareResponse(fetchResponse(tp, records, error, hw, lastStableOffset, logStartOffset, throttleTime));
+        consumerClient.poll(0);
+        return fetcher.fetchedRecords();
+    }
+
     @Test
     public void testGetOffsetsForTimesTimeout() {
         try {
@@ -2132,6 +2179,13 @@ public class FetcherTest {
         return new FetchResponse(Errors.NONE, new LinkedHashMap<>(partitions), throttleTime, INVALID_SESSION_ID);
     }
 
+    private FetchResponse fetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw,
+                                        long lastStableOffset, long logStartOffset, int throttleTime) {
+        Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp,
+                new FetchResponse.PartitionData(error, hw, lastStableOffset, logStartOffset, null, records));
+        return new FetchResponse(new LinkedHashMap<>(partitions), throttleTime);
+    }
+
     private MetadataResponse newMetadataResponse(String topic, Errors error) {
         List<MetadataResponse.PartitionMetadata> partitionsMetadata = new ArrayList<>();
         if (error == Errors.NONE) {
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 103a28a..a06e9e3 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -1375,6 +1375,54 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(500, consumer0.committed(tp2).offset)
   }
 
+  @Test
+  def testPerPartitionLeadMetricsCleanUpWithSubscribe() {
+    val numMessages = 1000
+    val topic2 = "topic2"
+    createTopic(topic2, 2, serverCount)
+    // send some messages.
+    sendRecords(numMessages, tp)
+    // Test subscribe
+    // Create a consumer and consumer some messages.
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithSubscribe")
+    consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithSubscribe")
+    val consumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+    try {
+      val listener0 = new TestConsumerReassignmentListener
+      consumer.subscribe(List(topic, topic2).asJava, listener0)
+      var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
+      TestUtils.waitUntilTrue(() => {
+        records = consumer.poll(100)
+        !records.records(tp).isEmpty
+      }, "Consumer did not consume any message before timeout.")
+      assertEquals("should be assigned once", 1, listener0.callsToAssigned)
+      // Verify the metric exist.
+      val tags1 = new util.HashMap[String, String]()
+      tags1.put("client-id", "testPerPartitionLeadMetricsCleanUpWithSubscribe")
+      tags1.put("topic", tp.topic())
+      tags1.put("partition", String.valueOf(tp.partition()))
+
+      val tags2 = new util.HashMap[String, String]()
+      tags2.put("client-id", "testPerPartitionLeadMetricsCleanUpWithSubscribe")
+      tags2.put("topic", tp2.topic())
+      tags2.put("partition", String.valueOf(tp2.partition()))
+      val fetchLead0 = consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1))
+      assertNotNull(fetchLead0)
+      assertTrue(s"The lead should be ${records.count}", fetchLead0.metricValue() == records.count)
+
+      // Remove topic from subscription
+      consumer.subscribe(List(topic2).asJava, listener0)
+      TestUtils.waitUntilTrue(() => {
+        consumer.poll(100)
+        listener0.callsToAssigned >= 2
+      }, "Expected rebalance did not occur.")
+      // Verify the metric has gone
+      assertNull(consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1)))
+      assertNull(consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags2)))
+    } finally {
+      consumer.close()
+    }
+  }
 
   @Test
   def testPerPartitionLagMetricsCleanUpWithSubscribe() {
@@ -1427,6 +1475,41 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   @Test
+  def testPerPartitionLeadMetricsCleanUpWithAssign() {
+    val numMessages = 1000
+    // Test assign
+    // send some messages.
+    sendRecords(numMessages, tp)
+    sendRecords(numMessages, tp2)
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithAssign")
+    consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithAssign")
+    val consumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+    try {
+      consumer.assign(List(tp).asJava)
+      var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
+      TestUtils.waitUntilTrue(() => {
+        records = consumer.poll(100)
+        !records.records(tp).isEmpty
+      }, "Consumer did not consume any message before timeout.")
+      // Verify the metric exist.
+      val tags = new util.HashMap[String, String]()
+      tags.put("client-id", "testPerPartitionLeadMetricsCleanUpWithAssign")
+      tags.put("topic", tp.topic())
+      tags.put("partition", String.valueOf(tp.partition()))
+      val fetchLead = consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags))
+      assertNotNull(fetchLead)
+
+      assertTrue(s"The lead should be ${records.count}", records.count == fetchLead.metricValue())
+
+      consumer.assign(List(tp2).asJava)
+      TestUtils.waitUntilTrue(() => !consumer.poll(100).isEmpty, "Consumer did not consume any message before timeout.")
+      assertNull(consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags)))
+    } finally {
+      consumer.close()
+    }
+  }
+
+  @Test
   def testPerPartitionLagMetricsCleanUpWithAssign() {
     val numMessages = 1000
     // Test assign
@@ -1468,6 +1551,34 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   @Test
+  def testPerPartitionLeadWithMaxPollRecords() {
+    val numMessages = 1000
+    val maxPollRecords = 10
+    sendRecords(numMessages, tp)
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLeadWithMaxPollRecords")
+    consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLeadWithMaxPollRecords")
+    consumerConfig.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString)
+    val consumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+    consumer.assign(List(tp).asJava)
+    try {
+      var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
+      TestUtils.waitUntilTrue(() => {
+        records = consumer.poll(100)
+        !records.isEmpty
+      }, "Consumer did not consume any message before timeout.")
+
+      val tags = new util.HashMap[String, String]()
+      tags.put("client-id", "testPerPartitionLeadWithMaxPollRecords")
+      tags.put("topic", tp.topic())
+      tags.put("partition", String.valueOf(tp.partition()))
+      val lead = consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags))
+      assertTrue(s"The lead should be $maxPollRecords", lead.metricValue() == maxPollRecords)
+    } finally {
+      consumer.close()
+    }
+  }
+
+  @Test
   def testPerPartitionLagWithMaxPollRecords() {
     val numMessages = 1000
     val maxPollRecords = 10

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.