You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/06/12 06:33:08 UTC

[kafka] branch 2.0 updated: MINOR: Remove deprecated per-partition lag metrics

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 743d64c  MINOR: Remove deprecated per-partition lag metrics
743d64c is described below

commit 743d64c22cf20a265c080233dc6d19d948976533
Author: Dong Lin <li...@gmail.com>
AuthorDate: Mon Jun 11 23:32:30 2018 -0700

    MINOR: Remove deprecated per-partition lag metrics
    
    It takes O(n^2) time to instantiate a mbean with n attributes which can be very slow if the number of attributes of this mbean is large. This PR removes metrics whose number of attributes can grow with the number of partitions in the cluster to fix the performance issue. These metrics have already been marked for removal in 2.0 by KIP-225.
    
    Author: Dong Lin <li...@gmail.com>
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
    
    Closes #5172 from lindong28/remove-deprecated-metrics
    
    (cherry picked from commit 4580d9f16aabc44f3d97931c6bfa29de1e40bf2d)
    Signed-off-by: Dong Lin <li...@gmail.com>
---
 .../kafka/clients/consumer/internals/Fetcher.java  | 10 ----
 .../consumer/internals/FetcherMetricsRegistry.java | 54 ++++++++--------------
 .../clients/consumer/internals/FetcherTest.java    |  5 --
 .../kafka/api/PlaintextConsumerTest.scala          | 29 +++++-------
 docs/upgrade.html                                  |  1 +
 5 files changed, 32 insertions(+), 67 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index ca8e0d2..fd52cb6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1447,16 +1447,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                 recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLag, metricTags), new Value());
                 recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLagMax, metricTags), new Max());
                 recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLagAvg, metricTags), new Avg());
-
-                recordsLag.add(this.metrics.metricName(name,
-                        metricsRegistry.partitionRecordsLagDeprecated.group(),
-                        metricsRegistry.partitionRecordsLagDeprecated.description()), new Value());
-                recordsLag.add(this.metrics.metricName(name + "-max",
-                        metricsRegistry.partitionRecordsLagMaxDeprecated.group(),
-                        metricsRegistry.partitionRecordsLagMaxDeprecated.description()), new Max());
-                recordsLag.add(this.metrics.metricName(name + "-avg",
-                        metricsRegistry.partitionRecordsLagAvgDeprecated.group(),
-                        metricsRegistry.partitionRecordsLagAvgDeprecated.description()), new Avg());
             }
             recordsLag.record(lag);
         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
index 6eb4fa2..f869615 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
@@ -54,10 +54,6 @@ public class FetcherMetricsRegistry {
     public MetricNameTemplate partitionRecordsLead;
     public MetricNameTemplate partitionRecordsLeadMin;
     public MetricNameTemplate partitionRecordsLeadAvg;
-    // To remove in 2.0
-    public MetricNameTemplate partitionRecordsLagDeprecated;
-    public MetricNameTemplate partitionRecordsLagMaxDeprecated;
-    public MetricNameTemplate partitionRecordsLagAvgDeprecated;
 
     public FetcherMetricsRegistry() {
         this(new HashSet<String>(), "");
@@ -68,72 +64,65 @@ public class FetcherMetricsRegistry {
     }
 
     public FetcherMetricsRegistry(Set<String> tags, String metricGrpPrefix) {
-        
+
         /***** Client level *****/
         String groupName = metricGrpPrefix + "-fetch-manager-metrics";
-                
-        this.fetchSizeAvg = new MetricNameTemplate("fetch-size-avg", groupName, 
+
+        this.fetchSizeAvg = new MetricNameTemplate("fetch-size-avg", groupName,
                 "The average number of bytes fetched per request", tags);
 
-        this.fetchSizeMax = new MetricNameTemplate("fetch-size-max", groupName, 
+        this.fetchSizeMax = new MetricNameTemplate("fetch-size-max", groupName,
                 "The maximum number of bytes fetched per request", tags);
-        this.bytesConsumedRate = new MetricNameTemplate("bytes-consumed-rate", groupName, 
+        this.bytesConsumedRate = new MetricNameTemplate("bytes-consumed-rate", groupName,
                 "The average number of bytes consumed per second", tags);
         this.bytesConsumedTotal = new MetricNameTemplate("bytes-consumed-total", groupName,
                 "The total number of bytes consumed", tags);
 
-        this.recordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", groupName, 
+        this.recordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", groupName,
                 "The average number of records in each request", tags);
-        this.recordsConsumedRate = new MetricNameTemplate("records-consumed-rate", groupName, 
+        this.recordsConsumedRate = new MetricNameTemplate("records-consumed-rate", groupName,
                 "The average number of records consumed per second", tags);
         this.recordsConsumedTotal = new MetricNameTemplate("records-consumed-total", groupName,
                 "The total number of records consumed", tags);
 
-        this.fetchLatencyAvg = new MetricNameTemplate("fetch-latency-avg", groupName, 
+        this.fetchLatencyAvg = new MetricNameTemplate("fetch-latency-avg", groupName,
                 "The average time taken for a fetch request.", tags);
-        this.fetchLatencyMax = new MetricNameTemplate("fetch-latency-max", groupName, 
+        this.fetchLatencyMax = new MetricNameTemplate("fetch-latency-max", groupName,
                 "The max time taken for any fetch request.", tags);
-        this.fetchRequestRate = new MetricNameTemplate("fetch-rate", groupName, 
+        this.fetchRequestRate = new MetricNameTemplate("fetch-rate", groupName,
                 "The number of fetch requests per second.", tags);
         this.fetchRequestTotal = new MetricNameTemplate("fetch-total", groupName,
                 "The total number of fetch requests.", tags);
 
-        this.recordsLagMax = new MetricNameTemplate("records-lag-max", groupName, 
+        this.recordsLagMax = new MetricNameTemplate("records-lag-max", groupName,
                 "The maximum lag in terms of number of records for any partition in this window", tags);
         this.recordsLeadMin = new MetricNameTemplate("records-lead-min", groupName,
                 "The minimum lead in terms of number of records for any partition in this window", tags);
 
-        this.fetchThrottleTimeAvg = new MetricNameTemplate("fetch-throttle-time-avg", groupName, 
+        this.fetchThrottleTimeAvg = new MetricNameTemplate("fetch-throttle-time-avg", groupName,
                 "The average throttle time in ms", tags);
-        this.fetchThrottleTimeMax = new MetricNameTemplate("fetch-throttle-time-max", groupName, 
+        this.fetchThrottleTimeMax = new MetricNameTemplate("fetch-throttle-time-max", groupName,
                 "The maximum throttle time in ms", tags);
 
         /***** Topic level *****/
         Set<String> topicTags = new LinkedHashSet<>(tags);
         topicTags.add("topic");
 
-        this.topicFetchSizeAvg = new MetricNameTemplate("fetch-size-avg", groupName, 
+        this.topicFetchSizeAvg = new MetricNameTemplate("fetch-size-avg", groupName,
                 "The average number of bytes fetched per request for a topic", topicTags);
-        this.topicFetchSizeMax = new MetricNameTemplate("fetch-size-max", groupName, 
+        this.topicFetchSizeMax = new MetricNameTemplate("fetch-size-max", groupName,
                 "The maximum number of bytes fetched per request for a topic", topicTags);
-        this.topicBytesConsumedRate = new MetricNameTemplate("bytes-consumed-rate", groupName, 
+        this.topicBytesConsumedRate = new MetricNameTemplate("bytes-consumed-rate", groupName,
                 "The average number of bytes consumed per second for a topic", topicTags);
         this.topicBytesConsumedTotal = new MetricNameTemplate("bytes-consumed-total", groupName,
                 "The total number of bytes consumed for a topic", topicTags);
 
-        this.topicRecordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", groupName, 
+        this.topicRecordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", groupName,
                 "The average number of records in each request for a topic", topicTags);
-        this.topicRecordsConsumedRate = new MetricNameTemplate("records-consumed-rate", groupName, 
+        this.topicRecordsConsumedRate = new MetricNameTemplate("records-consumed-rate", groupName,
                 "The average number of records consumed per second for a topic", topicTags);
         this.topicRecordsConsumedTotal = new MetricNameTemplate("records-consumed-total", groupName,
                 "The total number of records consumed for a topic", topicTags);
-        
-        this.partitionRecordsLagDeprecated = new MetricNameTemplate("{topic}-{partition}.records-lag", groupName,
-                "The latest lag of the partition (DEPRECATED use the tag based version instead)", tags);
-        this.partitionRecordsLagMaxDeprecated = new MetricNameTemplate("{topic}-{partition}.records-lag-max", groupName,
-                "The max lag of the partition (DEPRECATED use the tag based version instead)", tags);
-        this.partitionRecordsLagAvgDeprecated = new MetricNameTemplate("{topic}-{partition}.records-lag-avg", groupName,
-                "The average lag of the partition (DEPRECATED use the tag based version instead)", tags);
 
         /***** Partition level *****/
         Set<String> partitionTags = new HashSet<>(topicTags);
@@ -150,9 +139,9 @@ public class FetcherMetricsRegistry {
                 "The min lead of the partition", partitionTags);
         this.partitionRecordsLeadAvg = new MetricNameTemplate("records-lead-avg", groupName,
                 "The average lead of the partition", partitionTags);
-    
+
     }
-    
+
     public List<MetricNameTemplate> getAllTemplates() {
         return Arrays.asList(
             fetchSizeAvg,
@@ -177,9 +166,6 @@ public class FetcherMetricsRegistry {
             topicRecordsPerRequestAvg,
             topicRecordsConsumedRate,
             topicRecordsConsumedTotal,
-            partitionRecordsLagDeprecated,
-            partitionRecordsLagAvgDeprecated,
-            partitionRecordsLagMaxDeprecated,
             partitionRecordsLag,
             partitionRecordsLagAvg,
             partitionRecordsLagMax,
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 9164daa..46666ca 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1529,7 +1529,6 @@ public class FetcherTest {
         tags.put("topic", tp0.topic());
         tags.put("partition", String.valueOf(tp0.partition()));
         MetricName partitionLagMetric = metrics.metricName("records-lag", metricGroup, tags);
-        MetricName partitionLagMetricDeprecated = metrics.metricName(tp0 + ".records-lag", metricGroup);
 
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
         KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric);
@@ -1544,9 +1543,6 @@ public class FetcherTest {
         KafkaMetric partitionLag = allMetrics.get(partitionLagMetric);
         assertEquals(50, partitionLag.value(), EPSILON);
 
-        KafkaMetric partitionLagDeprecated = allMetrics.get(partitionLagMetricDeprecated);
-        assertEquals(50, partitionLagDeprecated.value(), EPSILON);
-
         // recordsFetchLagMax should be lso - offset of the last message after receiving a non-empty FetchResponse
         MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
                 TimestampType.CREATE_TIME, 0L);
@@ -1559,7 +1555,6 @@ public class FetcherTest {
         // verify de-registration of partition lag
         subscriptions.unsubscribe();
         assertFalse(allMetrics.containsKey(partitionLagMetric));
-        assertFalse(allMetrics.containsKey(partitionLagMetricDeprecated));
     }
 
     @Test
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 372cc3f..b1f7b13 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -43,11 +43,11 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   def testHeaders() {
     val numRecords = 1
     val record = new ProducerRecord(tp.topic, tp.partition, null, "key".getBytes, "value".getBytes)
-    
+
     record.headers().add("headerKey", "headerValue".getBytes)
-    
+
     this.producers.head.send(record)
-    
+
     assertEquals(0, this.consumers.head.assignment.size)
     this.consumers.head.assign(List(tp).asJava)
     assertEquals(1, this.consumers.head.assignment.size)
@@ -63,23 +63,23 @@ class PlaintextConsumerTest extends BaseConsumerTest {
       assertEquals("headerValue", if (header == null) null else new String(header.value()))
     }
   }
-  
+
   @Test
   def testHeadersExtendedSerializerDeserializer() {
     val numRecords = 1
     val record = new ProducerRecord(tp.topic, tp.partition, null, "key".getBytes, "value".getBytes)
 
     val extendedSerializer = new ExtendedSerializer[Array[Byte]] {
-      
+
       var serializer = new ByteArraySerializer()
-      
+
       override def serialize(topic: String, headers: Headers, data: Array[Byte]): Array[Byte] = {
         headers.add("content-type", "application/octet-stream".getBytes)
         serializer.serialize(topic, data)
       }
 
       override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = serializer.configure(configs, isKey)
-      
+
       override def close(): Unit = serializer.close()
 
       override def serialize(topic: String, data: Array[Byte]): Array[Byte] = {
@@ -90,9 +90,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
 
     val extendedDeserializer = new ExtendedDeserializer[Array[Byte]] {
-      
+
       var deserializer = new ByteArrayDeserializer()
-      
+
       override def deserialize(topic: String, headers: Headers, data: Array[Byte]): Array[Byte] = {
         val header = headers.lastHeader("content-type")
         assertEquals("application/octet-stream", if (header == null) null else new String(header.value()))
@@ -110,7 +110,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
       }
 
     }
-    
+
     val producer0 = new KafkaProducer(this.producerConfig, new ByteArraySerializer(), extendedSerializer)
     producers += producer0
     producer0.send(record)
@@ -127,7 +127,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     assertEquals(numRecords, records.size)
   }
-  
+
   @Test
   def testMaxPollRecords() {
     val maxPollRecords = 2
@@ -1534,10 +1534,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
       val fetchLag = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags))
       assertNotNull(fetchLag)
 
-      val oldTags = Collections.singletonMap("client-id", "testPerPartitionLagMetricsCleanUpWithAssign")
-      val oldFetchLag = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", oldTags))
-      assertEquals(fetchLag.metricValue(), oldFetchLag.metricValue())
-
       val expectedLag = numMessages - records.count
       assertEquals(s"The lag should be $expectedLag", expectedLag, fetchLag.value, epsilon)
 
@@ -1594,15 +1590,12 @@ class PlaintextConsumerTest extends BaseConsumerTest {
         records = consumer.poll(100)
         !records.isEmpty
       }, "Consumer did not consume any message before timeout.")
-      val oldTags = Collections.singletonMap("client-id", "testPerPartitionLagWithMaxPollRecords")
-      val oldLag = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", oldTags))
 
       val tags = new util.HashMap[String, String]()
       tags.put("client-id", "testPerPartitionLagWithMaxPollRecords")
       tags.put("topic", tp.topic())
       tags.put("partition", String.valueOf(tp.partition()))
       val lag = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags))
-      assertEquals(oldLag.metricValue(), lag.metricValue())
 
       assertEquals(s"The lag should be ${numMessages - records.count}", numMessages - records.count, lag.value, epsilon)
     } finally {
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 0430b43..c92e8af 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -80,6 +80,7 @@
         JMX monitoring tools that do not automatically aggregate. To get the total count for a specific request type, the tool needs to be
         updated to aggregate across different versions.
     </li>
+    <li><a href="https://cwiki.apache.org/confluence/x/uaBzB">KIP-225</a> changed the metric "records.lag" to use tags for topic and partition. The original version with the name format "{topic}-{partition}.records-lag" has been removed.</li>
     <li>The Scala producers, which have been deprecated since 0.10.0.0, have been removed. The Java producer has been the recommended option
         since 0.9.0.0. Note that the behaviour of the default partitioner in the Java producer differs from the default partitioner
         in the Scala producers. Users migrating should consider configuring a custom partitioner that retains the previous behaviour.</li>

-- 
To stop receiving notification emails like this one, please contact
lindong@apache.org.