You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2018/01/15 00:19:03 UTC
[kafka] branch trunk updated: KAFKA-5890;
records.lag should use tags for topic and partition rather than
using metric name.
This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5d81639 KAFKA-5890; records.lag should use tags for topic and partition rather than using metric name.
5d81639 is described below
commit 5d81639907869ce7355c40d2bac176a655e52074
Author: cmolter <cm...@apple.com>
AuthorDate: Sun Jan 14 16:18:39 2018 -0800
KAFKA-5890; records.lag should use tags for topic and partition rather than using metric name.
This is the implementation of KIP-225.
It marks the previous metrics as deprecated in the documentation and adds new metrics using tags.
Testing verifies that both the new and the old metric report the same value.
Author: cmolter <cm...@apple.com>
Reviewers: Jiangjie (Becket) Qin <be...@gmail.com>
Closes #4362 from lahabana/kafka-5890
---
.../kafka/clients/consumer/internals/Fetcher.java | 21 +++++++----
.../consumer/internals/FetcherMetricsRegistry.java | 28 +++++++++++----
.../clients/consumer/internals/FetcherTest.java | 16 +++++++--
.../kafka/api/PlaintextConsumerTest.scala | 41 +++++++++++++++++-----
docs/upgrade.html | 1 +
5 files changed, 85 insertions(+), 22 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 4c68f1f..5dc0b26 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1327,16 +1327,25 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
String name = partitionLagMetricName(tp);
Sensor recordsLag = this.metrics.getSensor(name);
if (recordsLag == null) {
+ Map<String, String> metricTags = new HashMap<>(2);
+ metricTags.put("topic", tp.topic().replace('.', '_'));
+ metricTags.put("partition", String.valueOf(tp.partition()));
+
recordsLag = this.metrics.sensor(name);
+
+ recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLag, metricTags), new Value());
+ recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLagMax, metricTags), new Max());
+ recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLagAvg, metricTags), new Avg());
+
recordsLag.add(this.metrics.metricName(name,
- metricsRegistry.partitionRecordsLag.group(),
- metricsRegistry.partitionRecordsLag.description()), new Value());
+ metricsRegistry.partitionRecordsLagDeprecated.group(),
+ metricsRegistry.partitionRecordsLagDeprecated.description()), new Value());
recordsLag.add(this.metrics.metricName(name + "-max",
- metricsRegistry.partitionRecordsLagMax.group(),
- metricsRegistry.partitionRecordsLagMax.description()), new Max());
+ metricsRegistry.partitionRecordsLagMaxDeprecated.group(),
+ metricsRegistry.partitionRecordsLagMaxDeprecated.description()), new Max());
recordsLag.add(this.metrics.metricName(name + "-avg",
- metricsRegistry.partitionRecordsLagAvg.group(),
- metricsRegistry.partitionRecordsLagAvg.description()), new Avg());
+ metricsRegistry.partitionRecordsLagAvgDeprecated.group(),
+ metricsRegistry.partitionRecordsLagAvgDeprecated.description()), new Avg());
}
recordsLag.record(lag);
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
index acf42ec..f15ac06 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
@@ -49,6 +49,10 @@ public class FetcherMetricsRegistry {
public MetricNameTemplate partitionRecordsLag;
public MetricNameTemplate partitionRecordsLagMax;
public MetricNameTemplate partitionRecordsLagAvg;
+ // To remove in 2.0
+ public MetricNameTemplate partitionRecordsLagDeprecated;
+ public MetricNameTemplate partitionRecordsLagMaxDeprecated;
+ public MetricNameTemplate partitionRecordsLagAvgDeprecated;
public FetcherMetricsRegistry() {
this(new HashSet<String>(), "");
@@ -117,13 +121,22 @@ public class FetcherMetricsRegistry {
this.topicRecordsConsumedTotal = new MetricNameTemplate("records-consumed-total", groupName,
"The total number of records consumed for a topic", topicTags);
+ this.partitionRecordsLagDeprecated = new MetricNameTemplate("{topic}-{partition}.records-lag", groupName,
+ "The latest lag of the partition (DEPRECATED use the tag based version instead)", tags);
+ this.partitionRecordsLagMaxDeprecated = new MetricNameTemplate("{topic}-{partition}.records-lag-max", groupName,
+ "The max lag of the partition (DEPRECATED use the tag based version instead)", tags);
+ this.partitionRecordsLagAvgDeprecated = new MetricNameTemplate("{topic}-{partition}.records-lag-avg", groupName,
+ "The average lag of the partition (DEPRECATED use the tag based version instead)", tags);
+
/***** Partition level *****/
- this.partitionRecordsLag = new MetricNameTemplate("{topic}-{partition}.records-lag", groupName,
- "The latest lag of the partition", tags);
- this.partitionRecordsLagMax = new MetricNameTemplate("{topic}-{partition}.records-lag-max", groupName,
- "The max lag of the partition", tags);
- this.partitionRecordsLagAvg = new MetricNameTemplate("{topic}-{partition}.records-lag-avg", groupName,
- "The average lag of the partition", tags);
+ Set<String> partitionTags = new HashSet<>(topicTags);
+ partitionTags.add("partition");
+ this.partitionRecordsLag = new MetricNameTemplate("records-lag", groupName,
+ "The latest lag of the partition", partitionTags);
+ this.partitionRecordsLagMax = new MetricNameTemplate("records-lag-max", groupName,
+ "The max lag of the partition", partitionTags);
+ this.partitionRecordsLagAvg = new MetricNameTemplate("records-lag-avg", groupName,
+ "The average lag of the partition", partitionTags);
}
@@ -151,6 +164,9 @@ public class FetcherMetricsRegistry {
topicRecordsPerRequestAvg,
topicRecordsConsumedRate,
topicRecordsConsumedTotal,
+ partitionRecordsLagDeprecated,
+ partitionRecordsLagAvgDeprecated,
+ partitionRecordsLagMaxDeprecated,
partitionRecordsLag,
partitionRecordsLagAvg,
partitionRecordsLagMax
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 7c1d9ba..26d7a50 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1171,7 +1171,10 @@ public class FetcherTest {
subscriptions.seek(tp0, 0);
MetricName maxLagMetric = metrics.metricInstance(metricsRegistry.recordsLagMax);
- MetricName partitionLagMetric = metrics.metricName(tp0 + ".records-lag", metricGroup);
+ Map<String, String> tags = new HashMap<>();
+ tags.put("topic", tp0.topic());
+ tags.put("partition", String.valueOf(tp0.partition()));
+ MetricName partitionLagMetric = metrics.metricName("records-lag", metricGroup, tags);
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric);
@@ -1210,7 +1213,12 @@ public class FetcherTest {
subscriptions.seek(tp0, 0);
MetricName maxLagMetric = metrics.metricInstance(metricsRegistry.recordsLagMax);
- MetricName partitionLagMetric = metrics.metricName(tp0 + ".records-lag", metricGroup);
+
+ Map<String, String> tags = new HashMap<>();
+ tags.put("topic", tp0.topic());
+ tags.put("partition", String.valueOf(tp0.partition()));
+ MetricName partitionLagMetric = metrics.metricName("records-lag", metricGroup, tags);
+ MetricName partitionLagMetricDeprecated = metrics.metricName(tp0 + ".records-lag", metricGroup);
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric);
@@ -1225,6 +1233,9 @@ public class FetcherTest {
KafkaMetric partitionLag = allMetrics.get(partitionLagMetric);
assertEquals(50, partitionLag.value(), EPSILON);
+ KafkaMetric partitionLagDeprecated = allMetrics.get(partitionLagMetricDeprecated);
+ assertEquals(50, partitionLagDeprecated.value(), EPSILON);
+
// recordsFetchLagMax should be lso - offset of the last message after receiving a non-empty FetchResponse
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
TimestampType.CREATE_TIME, 0L);
@@ -1237,6 +1248,7 @@ public class FetcherTest {
// verify de-registration of partition lag
subscriptions.unsubscribe();
assertFalse(allMetrics.containsKey(partitionLagMetric));
+ assertFalse(allMetrics.containsKey(partitionLagMetricDeprecated));
}
@Test
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 04935f8..103a28a 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -1398,8 +1398,16 @@ class PlaintextConsumerTest extends BaseConsumerTest {
}, "Consumer did not consume any message before timeout.")
assertEquals("should be assigned once", 1, listener0.callsToAssigned)
// Verify the metric exist.
- val tags = Collections.singletonMap("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe")
- val fetchLag0 = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags))
+ val tags1 = new util.HashMap[String, String]()
+ tags1.put("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe")
+ tags1.put("topic", tp.topic())
+ tags1.put("partition", String.valueOf(tp.partition()))
+
+ val tags2 = new util.HashMap[String, String]()
+ tags2.put("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe")
+ tags2.put("topic", tp2.topic())
+ tags2.put("partition", String.valueOf(tp2.partition()))
+ val fetchLag0 = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags1))
assertNotNull(fetchLag0)
val expectedLag = numMessages - records.count
assertEquals(s"The lag should be $expectedLag", expectedLag, fetchLag0.value, epsilon)
@@ -1411,8 +1419,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
listener0.callsToAssigned >= 2
}, "Expected rebalance did not occur.")
// Verify the metric has gone
- assertNull(consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags)))
- assertNull(consumer.metrics.get(new MetricName(tp2 + ".records-lag", "consumer-fetch-manager-metrics", "", tags)))
+ assertNull(consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags1)))
+ assertNull(consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags2)))
} finally {
consumer.close()
}
@@ -1436,15 +1444,24 @@ class PlaintextConsumerTest extends BaseConsumerTest {
!records.records(tp).isEmpty
}, "Consumer did not consume any message before timeout.")
// Verify the metric exist.
- val tags = Collections.singletonMap("client-id", "testPerPartitionLagMetricsCleanUpWithAssign")
- val fetchLag = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags))
+ val tags = new util.HashMap[String, String]()
+ tags.put("client-id", "testPerPartitionLagMetricsCleanUpWithAssign")
+ tags.put("topic", tp.topic())
+ tags.put("partition", String.valueOf(tp.partition()))
+ val fetchLag = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags))
assertNotNull(fetchLag)
+
+ val oldTags = Collections.singletonMap("client-id", "testPerPartitionLagMetricsCleanUpWithAssign")
+ val oldFetchLag = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", oldTags))
+ assertEquals(fetchLag.metricValue(), oldFetchLag.metricValue())
+
val expectedLag = numMessages - records.count
assertEquals(s"The lag should be $expectedLag", expectedLag, fetchLag.value, epsilon)
consumer.assign(List(tp2).asJava)
TestUtils.waitUntilTrue(() => !consumer.poll(100).isEmpty, "Consumer did not consume any message before timeout.")
assertNull(consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags)))
+ assertNull(consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags)))
} finally {
consumer.close()
}
@@ -1466,8 +1483,16 @@ class PlaintextConsumerTest extends BaseConsumerTest {
records = consumer.poll(100)
!records.isEmpty
}, "Consumer did not consume any message before timeout.")
- val tags = Collections.singletonMap("client-id", "testPerPartitionLagWithMaxPollRecords")
- val lag = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags))
+ val oldTags = Collections.singletonMap("client-id", "testPerPartitionLagWithMaxPollRecords")
+ val oldLag = consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", oldTags))
+
+ val tags = new util.HashMap[String, String]()
+ tags.put("client-id", "testPerPartitionLagWithMaxPollRecords")
+ tags.put("topic", tp.topic())
+ tags.put("partition", String.valueOf(tp.partition()))
+ val lag = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags))
+ assertEquals(oldLag.metricValue(), lag.metricValue())
+
assertEquals(s"The lag should be ${numMessages - records.count}", numMessages - records.count, lag.value, epsilon)
} finally {
consumer.close()
diff --git a/docs/upgrade.html b/docs/upgrade.html
index b8b88b9..6140059 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -66,6 +66,7 @@
<li>The kafka artifact in Maven no longer depends on log4j or slf4j-log4j12. Similarly to the kafka-clients artifact, users
can now choose the logging back-end by including the appropriate slf4j module (slf4j-log4j12, logback, etc.). The release
tarball still includes log4j and slf4j-log4j12.</li>
+ <li><a href="https://cwiki.apache.org/confluence/x/uaBzB">KIP-225</a> changed the metric "records.lag" to use tags for topic and partition. The original version with the name format "{topic}-{partition}.records-lag" is deprecated and will be removed in 2.0.0.</li>
</ul>
<h5><a id="upgrade_110_new_protocols" href="#upgrade_110_new_protocols">New Protocol Versions</a></h5>
--
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].