You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2018/01/15 00:19:03 UTC

[kafka] branch trunk updated: KAFKA-5890; records.lag should use tags for topic and partition rather than using metric name.

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5d81639  KAFKA-5890; records.lag should use tags for topic and partition rather than using metric name.
5d81639 is described below

commit 5d81639907869ce7355c40d2bac176a655e52074
Author: cmolter <cm...@apple.com>
AuthorDate: Sun Jan 14 16:18:39 2018 -0800

    KAFKA-5890; records.lag should use tags for topic and partition rather than using metric name.
    
    This is the implementation of KIP-225.
    It marks the previous metrics as deprecated in the documentation and adds new metrics using tags.
    
    Testing verifies that both the new and the old metric report the same value.
    
    Author: cmolter <cm...@apple.com>
    
    Reviewers: Jiangjie (Becket) Qin <be...@gmail.com>
    
    Closes #4362 from lahabana/kafka-5890
---
 .../kafka/clients/consumer/internals/Fetcher.java  | 21 +++++++----
 .../consumer/internals/FetcherMetricsRegistry.java | 28 +++++++++++----
 .../clients/consumer/internals/FetcherTest.java    | 16 +++++++--
 .../kafka/api/PlaintextConsumerTest.scala          | 41 +++++++++++++++++-----
 docs/upgrade.html                                  |  1 +
 5 files changed, 85 insertions(+), 22 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 4c68f1f..5dc0b26 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1327,16 +1327,25 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
             String name = partitionLagMetricName(tp);
             Sensor recordsLag = this.metrics.getSensor(name);
             if (recordsLag == null) {
+                Map<String, String> metricTags = new HashMap<>(2);
+                metricTags.put("topic", tp.topic().replace('.', '_'));
+                metricTags.put("partition", String.valueOf(tp.partition()));
+
                 recordsLag = this.metrics.sensor(name);
+
+                recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLag, metricTags), new Value());
+                recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLagMax, metricTags), new Max());
+                recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLagAvg, metricTags), new Avg());
+
                 recordsLag.add(this.metrics.metricName(name,
-                        metricsRegistry.partitionRecordsLag.group(),
-                        metricsRegistry.partitionRecordsLag.description()), new Value());
+                        metricsRegistry.partitionRecordsLagDeprecated.group(),
+                        metricsRegistry.partitionRecordsLagDeprecated.description()), new Value());
                 recordsLag.add(this.metrics.metricName(name + "-max",
-                        metricsRegistry.partitionRecordsLagMax.group(),
-                        metricsRegistry.partitionRecordsLagMax.description()), new Max());
+                        metricsRegistry.partitionRecordsLagMaxDeprecated.group(),
+                        metricsRegistry.partitionRecordsLagMaxDeprecated.description()), new Max());
                 recordsLag.add(this.metrics.metricName(name + "-avg",
-                        metricsRegistry.partitionRecordsLagAvg.group(),
-                        metricsRegistry.partitionRecordsLagAvg.description()), new Avg());
+                        metricsRegistry.partitionRecordsLagAvgDeprecated.group(),
+                        metricsRegistry.partitionRecordsLagAvgDeprecated.description()), new Avg());
             }
             recordsLag.record(lag);
         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
index acf42ec..f15ac06 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
@@ -49,6 +49,10 @@ public class FetcherMetricsRegistry {
     public MetricNameTemplate partitionRecordsLag;
     public MetricNameTemplate partitionRecordsLagMax;
     public MetricNameTemplate partitionRecordsLagAvg;
+    // To remove in 2.0
+    public MetricNameTemplate partitionRecordsLagDeprecated;
+    public MetricNameTemplate partitionRecordsLagMaxDeprecated;
+    public MetricNameTemplate partitionRecordsLagAvgDeprecated;
 
     public FetcherMetricsRegistry() {
         this(new HashSet<String>(), "");
@@ -117,13 +121,22 @@ public class FetcherMetricsRegistry {
         this.topicRecordsConsumedTotal = new MetricNameTemplate("records-consumed-total", groupName,
                 "The total number of records consumed for a topic", topicTags);
         
+        this.partitionRecordsLagDeprecated = new MetricNameTemplate("{topic}-{partition}.records-lag", groupName,
+                "The latest lag of the partition (DEPRECATED use the tag based version instead)", tags);
+        this.partitionRecordsLagMaxDeprecated = new MetricNameTemplate("{topic}-{partition}.records-lag-max", groupName,
+                "The max lag of the partition (DEPRECATED use the tag based version instead)", tags);
+        this.partitionRecordsLagAvgDeprecated = new MetricNameTemplate("{topic}-{partition}.records-lag-avg", groupName,
+                "The average lag of the partition (DEPRECATED use the tag based version instead)", tags);
+
         /***** Partition level *****/
-        this.partitionRecordsLag = new MetricNameTemplate("{topic}-{partition}.records-lag", groupName, 
-                "The latest lag of the partition", tags);
-        this.partitionRecordsLagMax = new MetricNameTemplate("{topic}-{partition}.records-lag-max", groupName, 
-                "The max lag of the partition", tags);
-        this.partitionRecordsLagAvg = new MetricNameTemplate("{topic}-{partition}.records-lag-avg", groupName, 
-                "The average lag of the partition", tags);
+        Set<String> partitionTags = new HashSet<>(topicTags);
+        partitionTags.add("partition");
+        this.partitionRecordsLag = new MetricNameTemplate("records-lag", groupName,
+                "The latest lag of the partition", partitionTags);
+        this.partitionRecordsLagMax = new MetricNameTemplate("records-lag-max", groupName,
+                "The max lag of the partition", partitionTags);
+        this.partitionRecordsLagAvg = new MetricNameTemplate("records-lag-avg", groupName,
+                "The average lag of the partition", partitionTags);
         
     
     }
@@ -151,6 +164,9 @@ public class FetcherMetricsRegistry {
             topicRecordsPerRequestAvg,
             topicRecordsConsumedRate,
             topicRecordsConsumedTotal,
+            partitionRecordsLagDeprecated,
+            partitionRecordsLagAvgDeprecated,
+            partitionRecordsLagMaxDeprecated,
             partitionRecordsLag,
             partitionRecordsLagAvg,
             partitionRecordsLagMax
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 7c1d9ba..26d7a50 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1171,7 +1171,10 @@ public class FetcherTest {
         subscriptions.seek(tp0, 0);
 
         MetricName maxLagMetric = metrics.metricInstance(metricsRegistry.recordsLagMax);
-        MetricName partitionLagMetric = metrics.metricName(tp0 + ".records-lag", metricGroup);
+        Map<String, String> tags = new HashMap<>();
+        tags.put("topic", tp0.topic());
+        tags.put("partition", String.valueOf(tp0.partition()));
+        MetricName partitionLagMetric = metrics.metricName("records-lag", metricGroup, tags);
 
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
         KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric);
@@ -1210,7 +1213,12 @@ public class FetcherTest {
         subscriptions.seek(tp0, 0);
 
         MetricName maxLagMetric = metrics.metricInstance(metricsRegistry.recordsLagMax);
-        MetricName partitionLagMetric = metrics.metricName(tp0 + ".records-lag", metricGroup);
+
+        Map<String, String> tags = new HashMap<>();
+        tags.put("topic", tp0.topic());
+        tags.put("partition", String.valueOf(tp0.partition()));
+        MetricName partitionLagMetric = metrics.metricName("records-lag", metricGroup, tags);
+        MetricName partitionLagMetricDeprecated = metrics.metricName(tp0 + ".records-lag", metricGroup);
 
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
         KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric);
@@ -1225,6 +1233,9 @@ public class FetcherTest {
         KafkaMetric partitionLag = allMetrics.get(partitionLagMetric);
         assertEquals(50, partitionLag.value(), EPSILON);
 
+        KafkaMetric partitionLagDeprecated = allMetrics.get(partitionLagMetricDeprecated);
+        assertEquals(50, partitionLagDeprecated.value(), EPSILON);
+
         // recordsFetchLagMax should be lso - offset of the last message after receiving a non-empty FetchResponse
         MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
                 TimestampType.CREATE_TIME, 0L);
@@ -1237,6 +1248,7 @@ public class FetcherTest {
         // verify de-registration of partition lag
         subscriptions.unsubscribe();
         assertFalse(allMetrics.containsKey(partitionLagMetric));
+        assertFalse(allMetrics.containsKey(partitionLagMetricDeprecated));
     }
 
     @Test
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 04935f8..103a28a 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -1398,8 +1398,16 @@ class PlaintextConsumerTest extends BaseConsumerTest {
       }, "Consumer did not consume any message before timeout.")
       assertEquals("should be assigned once", 1, listener0.callsToAssigned)
       // Verify the metric exist.
-      val tags = Collections.singletonMap("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe")
-      val fetchLag0 = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags))
+      val tags1 = new util.HashMap[String, String]()
+      tags1.put("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe")
+      tags1.put("topic", tp.topic())
+      tags1.put("partition", String.valueOf(tp.partition()))
+
+      val tags2 = new util.HashMap[String, String]()
+      tags2.put("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe")
+      tags2.put("topic", tp2.topic())
+      tags2.put("partition", String.valueOf(tp2.partition()))
+      val fetchLag0 = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags1))
       assertNotNull(fetchLag0)
       val expectedLag = numMessages - records.count
       assertEquals(s"The lag should be $expectedLag", expectedLag, fetchLag0.value, epsilon)
@@ -1411,8 +1419,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
         listener0.callsToAssigned >= 2
       }, "Expected rebalance did not occur.")
       // Verify the metric has gone
-      assertNull(consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags)))
-      assertNull(consumer.metrics.get(new MetricName(tp2 + ".records-lag", "consumer-fetch-manager-metrics", "", tags)))
+      assertNull(consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags1)))
+      assertNull(consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags2)))
     } finally {
       consumer.close()
     }
@@ -1436,15 +1444,24 @@ class PlaintextConsumerTest extends BaseConsumerTest {
         !records.records(tp).isEmpty
       }, "Consumer did not consume any message before timeout.")
       // Verify the metric exist.
-      val tags = Collections.singletonMap("client-id", "testPerPartitionLagMetricsCleanUpWithAssign")
-      val fetchLag = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags))
+      val tags = new util.HashMap[String, String]()
+      tags.put("client-id", "testPerPartitionLagMetricsCleanUpWithAssign")
+      tags.put("topic", tp.topic())
+      tags.put("partition", String.valueOf(tp.partition()))
+      val fetchLag = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags))
       assertNotNull(fetchLag)
+
+      val oldTags = Collections.singletonMap("client-id", "testPerPartitionLagMetricsCleanUpWithAssign")
+      val oldFetchLag = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", oldTags))
+      assertEquals(fetchLag.metricValue(), oldFetchLag.metricValue())
+
       val expectedLag = numMessages - records.count
       assertEquals(s"The lag should be $expectedLag", expectedLag, fetchLag.value, epsilon)
 
       consumer.assign(List(tp2).asJava)
       TestUtils.waitUntilTrue(() => !consumer.poll(100).isEmpty, "Consumer did not consume any message before timeout.")
       assertNull(consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags)))
+      assertNull(consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags)))
     } finally {
       consumer.close()
     }
@@ -1466,8 +1483,16 @@ class PlaintextConsumerTest extends BaseConsumerTest {
         records = consumer.poll(100)
         !records.isEmpty
       }, "Consumer did not consume any message before timeout.")
-      val tags = Collections.singletonMap("client-id", "testPerPartitionLagWithMaxPollRecords")
-      val lag = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags))
+      val oldTags = Collections.singletonMap("client-id", "testPerPartitionLagWithMaxPollRecords")
+      val oldLag = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", oldTags))
+
+      val tags = new util.HashMap[String, String]()
+      tags.put("client-id", "testPerPartitionLagWithMaxPollRecords")
+      tags.put("topic", tp.topic())
+      tags.put("partition", String.valueOf(tp.partition()))
+      val lag = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags))
+      assertEquals(oldLag.metricValue(), lag.metricValue())
+
       assertEquals(s"The lag should be ${numMessages - records.count}", numMessages - records.count, lag.value, epsilon)
     } finally {
       consumer.close()
diff --git a/docs/upgrade.html b/docs/upgrade.html
index b8b88b9..6140059 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -66,6 +66,7 @@
     <li>The kafka artifact in Maven no longer depends on log4j or slf4j-log4j12. Similarly to the kafka-clients artifact, users
         can now choose the logging back-end by including the appropriate slf4j module (slf4j-log4j12, logback, etc.). The release
         tarball still includes log4j and slf4j-log4j12.</li>
+    <li><a href="https://cwiki.apache.org/confluence/x/uaBzB">KIP-225</a> changed the metric "records.lag" to use tags for topic and partition. The original version with the name format "{topic}-{partition}.records-lag" is deprecated and will be removed in 2.0.0.</li>
 </ul>
 
 <h5><a id="upgrade_110_new_protocols" href="#upgrade_110_new_protocols">New Protocol Versions</a></h5>

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].