You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/02/06 18:40:55 UTC
[kafka] branch trunk updated: KAFKA-6184;
report a metric of the lag between the consumer offset ...
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7c97b23 KAFKA-6184; report a metric of the lag between the consumer offset ...
7c97b23 is described below
commit 7c97b239a5261cf030984f71427287196e657d95
Author: huxihx <hu...@hotmail.com>
AuthorDate: Tue Feb 6 10:40:51 2018 -0800
KAFKA-6184; report a metric of the lag between the consumer offset ...
Add `records-lead` and partition-level `{topic}-{partition}.records-lead-min|avg` for fetcher metrics.
junrao Please kindly review. Thanks.
Author: huxihx <hu...@hotmail.com>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #4191 from huxihx/KAFKA-6184
---
.../kafka/clients/consumer/internals/Fetcher.java | 47 ++++++++-
.../consumer/internals/FetcherMetricsRegistry.java | 19 +++-
.../consumer/internals/SubscriptionState.java | 11 ++
.../clients/consumer/internals/FetcherTest.java | 54 ++++++++++
.../kafka/api/PlaintextConsumerTest.scala | 111 +++++++++++++++++++++
5 files changed, 237 insertions(+), 5 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 32782ee..0711d15 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -45,6 +45,7 @@ import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BufferSupplier;
@@ -587,6 +588,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
if (partitionLag != null)
this.sensors.recordPartitionLag(partitionRecords.partition, partitionLag);
+ Long lead = subscriptions.partitionLead(partitionRecords.partition);
+ if (lead != null) {
+ this.sensors.recordPartitionLead(partitionRecords.partition, lead);
+ }
+
return partRecords;
} else {
// these records aren't next in line based on the last consumed position, ignore them
@@ -871,6 +877,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
subscriptions.updateHighWatermark(tp, partition.highWatermark);
}
+ if (partition.logStartOffset >= 0) {
+ log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset);
+ subscriptions.updateLogStartOffset(tp, partition.logStartOffset);
+ }
+
if (partition.lastStableOffset >= 0) {
log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset);
subscriptions.updateLastStableOffset(tp, partition.lastStableOffset);
@@ -945,7 +956,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
@Override
public void onAssignment(Set<TopicPartition> assignment) {
- sensors.updatePartitionLagSensors(assignment);
+ sensors.updatePartitionLagAndLeadSensors(assignment);
}
public static Sensor throttleTimeSensor(Metrics metrics, FetcherMetricsRegistry metricsRegistry) {
@@ -1261,6 +1272,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
private final Sensor recordsFetched;
private final Sensor fetchLatency;
private final Sensor recordsFetchLag;
+ private final Sensor recordsFetchLead;
private Set<TopicPartition> assignedPartitions;
@@ -1287,6 +1299,9 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
this.recordsFetchLag = metrics.sensor("records-lag");
this.recordsFetchLag.add(metrics.metricInstance(metricsRegistry.recordsLagMax), new Max());
+
+ this.recordsFetchLead = metrics.sensor("records-lead");
+ this.recordsFetchLead.add(metrics.metricInstance(metricsRegistry.recordsLeadMin), new Min());
}
private void recordTopicFetchMetrics(String topic, int bytes, int records) {
@@ -1322,16 +1337,37 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
recordsFetched.record(records);
}
- private void updatePartitionLagSensors(Set<TopicPartition> assignedPartitions) {
+ private void updatePartitionLagAndLeadSensors(Set<TopicPartition> assignedPartitions) {
if (this.assignedPartitions != null) {
for (TopicPartition tp : this.assignedPartitions) {
- if (!assignedPartitions.contains(tp))
+ if (!assignedPartitions.contains(tp)) {
metrics.removeSensor(partitionLagMetricName(tp));
+ metrics.removeSensor(partitionLeadMetricName(tp));
+ }
}
}
this.assignedPartitions = assignedPartitions;
}
+ private void recordPartitionLead(TopicPartition tp, long lead) {
+ this.recordsFetchLead.record(lead);
+
+ String name = partitionLeadMetricName(tp);
+ Sensor recordsLead = this.metrics.getSensor(name);
+ if (recordsLead == null) {
+ Map<String, String> metricTags = new HashMap<>(2);
+ metricTags.put("topic", tp.topic().replace('.', '_'));
+ metricTags.put("partition", String.valueOf(tp.partition()));
+
+ recordsLead = this.metrics.sensor(name);
+
+ recordsLead.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLead, metricTags), new Value());
+ recordsLead.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLeadMin, metricTags), new Min());
+ recordsLead.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLeadAvg, metricTags), new Avg());
+ }
+ recordsLead.record(lead);
+ }
+
private void recordPartitionLag(TopicPartition tp, long lag) {
this.recordsFetchLag.record(lag);
@@ -1364,6 +1400,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
private static String partitionLagMetricName(TopicPartition tp) {
return tp + ".records-lag";
}
+
+ private static String partitionLeadMetricName(TopicPartition tp) {
+ return tp + ".records-lead";
+ }
+
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
index 301363a..6eb4fa2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
@@ -38,6 +38,7 @@ public class FetcherMetricsRegistry {
public MetricNameTemplate fetchRequestRate;
public MetricNameTemplate fetchRequestTotal;
public MetricNameTemplate recordsLagMax;
+ public MetricNameTemplate recordsLeadMin;
public MetricNameTemplate fetchThrottleTimeAvg;
public MetricNameTemplate fetchThrottleTimeMax;
public MetricNameTemplate topicFetchSizeAvg;
@@ -50,6 +51,9 @@ public class FetcherMetricsRegistry {
public MetricNameTemplate partitionRecordsLag;
public MetricNameTemplate partitionRecordsLagMax;
public MetricNameTemplate partitionRecordsLagAvg;
+ public MetricNameTemplate partitionRecordsLead;
+ public MetricNameTemplate partitionRecordsLeadMin;
+ public MetricNameTemplate partitionRecordsLeadAvg;
// To remove in 2.0
public MetricNameTemplate partitionRecordsLagDeprecated;
public MetricNameTemplate partitionRecordsLagMaxDeprecated;
@@ -96,6 +100,8 @@ public class FetcherMetricsRegistry {
this.recordsLagMax = new MetricNameTemplate("records-lag-max", groupName,
"The maximum lag in terms of number of records for any partition in this window", tags);
+ this.recordsLeadMin = new MetricNameTemplate("records-lead-min", groupName,
+ "The minimum lead in terms of number of records for any partition in this window", tags);
this.fetchThrottleTimeAvg = new MetricNameTemplate("fetch-throttle-time-avg", groupName,
"The average throttle time in ms", tags);
@@ -138,7 +144,12 @@ public class FetcherMetricsRegistry {
"The max lag of the partition", partitionTags);
this.partitionRecordsLagAvg = new MetricNameTemplate("records-lag-avg", groupName,
"The average lag of the partition", partitionTags);
-
+ this.partitionRecordsLead = new MetricNameTemplate("records-lead", groupName,
+ "The latest lead of the partition", partitionTags);
+ this.partitionRecordsLeadMin = new MetricNameTemplate("records-lead-min", groupName,
+ "The min lead of the partition", partitionTags);
+ this.partitionRecordsLeadAvg = new MetricNameTemplate("records-lead-avg", groupName,
+ "The average lead of the partition", partitionTags);
}
@@ -156,6 +167,7 @@ public class FetcherMetricsRegistry {
fetchRequestRate,
fetchRequestTotal,
recordsLagMax,
+ recordsLeadMin,
fetchThrottleTimeAvg,
fetchThrottleTimeMax,
topicFetchSizeAvg,
@@ -170,7 +182,10 @@ public class FetcherMetricsRegistry {
partitionRecordsLagMaxDeprecated,
partitionRecordsLag,
partitionRecordsLagAvg,
- partitionRecordsLagMax
+ partitionRecordsLagMax,
+ partitionRecordsLead,
+ partitionRecordsLeadMin,
+ partitionRecordsLeadAvg
);
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index adced58..8228707 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -327,10 +327,19 @@ public class SubscriptionState {
return topicPartitionState.highWatermark == null ? null : topicPartitionState.highWatermark - topicPartitionState.position;
}
+ public Long partitionLead(TopicPartition tp) {
+ TopicPartitionState topicPartitionState = assignedState(tp);
+ return topicPartitionState.logStartOffset == null ? null : topicPartitionState.position - topicPartitionState.logStartOffset;
+ }
+
public void updateHighWatermark(TopicPartition tp, long highWatermark) {
assignedState(tp).highWatermark = highWatermark;
}
+ public void updateLogStartOffset(TopicPartition tp, long logStartOffset) {
+ assignedState(tp).logStartOffset = logStartOffset;
+ }
+
public void updateLastStableOffset(TopicPartition tp, long lastStableOffset) {
assignedState(tp).lastStableOffset = lastStableOffset;
}
@@ -435,6 +444,7 @@ public class SubscriptionState {
private static class TopicPartitionState {
private Long position; // last consumed position
private Long highWatermark; // the high watermark from last fetch
+ private Long logStartOffset; // the log start offset
private Long lastStableOffset;
private OffsetAndMetadata committed; // last committed position
private boolean paused; // whether this partition has been paused by the user
@@ -444,6 +454,7 @@ public class SubscriptionState {
this.paused = false;
this.position = null;
this.highWatermark = null;
+ this.logStartOffset = null;
this.lastStableOffset = null;
this.committed = null;
this.resetStrategy = null;
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index a0205e7..e81e3d1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1211,6 +1211,45 @@ public class FetcherTest {
}
@Test
+ public void testFetcherLeadMetric() {
+ subscriptions.assignFromUser(singleton(tp0));
+ subscriptions.seek(tp0, 0);
+
+ MetricName minLeadMetric = metrics.metricInstance(metricsRegistry.recordsLeadMin);
+ Map<String, String> tags = new HashMap<>(2);
+ tags.put("topic", tp0.topic());
+ tags.put("partition", String.valueOf(tp0.partition()));
+ MetricName partitionLeadMetric = metrics.metricName("records-lead", metricGroup, "", tags);
+
+ Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+ KafkaMetric recordsFetchLeadMin = allMetrics.get(minLeadMetric);
+
+ // recordsFetchLeadMin should be initialized to MAX_VALUE
+ assertEquals(Double.MAX_VALUE, recordsFetchLeadMin.value(), EPSILON);
+
+ // recordsFetchLeadMin should be position - logStartOffset after receiving an empty FetchResponse
+ fetchRecords(tp0, MemoryRecords.EMPTY, Errors.NONE, 100L, -1L, 0L, 0);
+ assertEquals(0L, recordsFetchLeadMin.value(), EPSILON);
+
+ KafkaMetric partitionLead = allMetrics.get(partitionLeadMetric);
+ assertEquals(0L, partitionLead.value(), EPSILON);
+
+ // recordsFetchLeadMin should be position - logStartOffset after receiving a non-empty FetchResponse
+ MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+ TimestampType.CREATE_TIME, 0L);
+ for (int v = 0; v < 3; v++) {
+ builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
+ }
+ fetchRecords(tp0, builder.build(), Errors.NONE, 200L, -1L, 0L, 0);
+ assertEquals(0L, recordsFetchLeadMin.value(), EPSILON);
+ assertEquals(3L, partitionLead.value(), EPSILON);
+
+ // verify de-registration of partition lag
+ subscriptions.unsubscribe();
+ assertFalse(allMetrics.containsKey(partitionLeadMetric));
+ }
+
+ @Test
public void testReadCommittedLagMetric() {
Metrics metrics = new Metrics();
fetcher = createFetcher(subscriptions, metrics, new ByteArrayDeserializer(),
@@ -1430,6 +1469,14 @@ public class FetcherTest {
return fetcher.fetchedRecords();
}
+ private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(
+ TopicPartition tp, MemoryRecords records, Errors error, long hw, long lastStableOffset, long logStartOffset, int throttleTime) {
+ assertEquals(1, fetcher.sendFetches());
+ client.prepareResponse(fetchResponse(tp, records, error, hw, lastStableOffset, logStartOffset, throttleTime));
+ consumerClient.poll(0);
+ return fetcher.fetchedRecords();
+ }
+
@Test
public void testGetOffsetsForTimesTimeout() {
try {
@@ -2132,6 +2179,13 @@ public class FetcherTest {
return new FetchResponse(Errors.NONE, new LinkedHashMap<>(partitions), throttleTime, INVALID_SESSION_ID);
}
+ private FetchResponse fetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw,
+ long lastStableOffset, long logStartOffset, int throttleTime) {
+ Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp,
+ new FetchResponse.PartitionData(error, hw, lastStableOffset, logStartOffset, null, records));
+ return new FetchResponse(new LinkedHashMap<>(partitions), throttleTime);
+ }
+
private MetadataResponse newMetadataResponse(String topic, Errors error) {
List<MetadataResponse.PartitionMetadata> partitionsMetadata = new ArrayList<>();
if (error == Errors.NONE) {
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 103a28a..a06e9e3 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -1375,6 +1375,54 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(500, consumer0.committed(tp2).offset)
}
+ @Test
+ def testPerPartitionLeadMetricsCleanUpWithSubscribe() {
+ val numMessages = 1000
+ val topic2 = "topic2"
+ createTopic(topic2, 2, serverCount)
+ // send some messages.
+ sendRecords(numMessages, tp)
+ // Test subscribe
+ // Create a consumer and consumer some messages.
+ consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithSubscribe")
+ consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithSubscribe")
+ val consumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+ try {
+ val listener0 = new TestConsumerReassignmentListener
+ consumer.subscribe(List(topic, topic2).asJava, listener0)
+ var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
+ TestUtils.waitUntilTrue(() => {
+ records = consumer.poll(100)
+ !records.records(tp).isEmpty
+ }, "Consumer did not consume any message before timeout.")
+ assertEquals("should be assigned once", 1, listener0.callsToAssigned)
+ // Verify the metric exist.
+ val tags1 = new util.HashMap[String, String]()
+ tags1.put("client-id", "testPerPartitionLeadMetricsCleanUpWithSubscribe")
+ tags1.put("topic", tp.topic())
+ tags1.put("partition", String.valueOf(tp.partition()))
+
+ val tags2 = new util.HashMap[String, String]()
+ tags2.put("client-id", "testPerPartitionLeadMetricsCleanUpWithSubscribe")
+ tags2.put("topic", tp2.topic())
+ tags2.put("partition", String.valueOf(tp2.partition()))
+ val fetchLead0 = consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1))
+ assertNotNull(fetchLead0)
+ assertTrue(s"The lead should be ${records.count}", fetchLead0.metricValue() == records.count)
+
+ // Remove topic from subscription
+ consumer.subscribe(List(topic2).asJava, listener0)
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(100)
+ listener0.callsToAssigned >= 2
+ }, "Expected rebalance did not occur.")
+ // Verify the metric has gone
+ assertNull(consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1)))
+ assertNull(consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags2)))
+ } finally {
+ consumer.close()
+ }
+ }
@Test
def testPerPartitionLagMetricsCleanUpWithSubscribe() {
@@ -1427,6 +1475,41 @@ class PlaintextConsumerTest extends BaseConsumerTest {
}
@Test
+ def testPerPartitionLeadMetricsCleanUpWithAssign() {
+ val numMessages = 1000
+ // Test assign
+ // send some messages.
+ sendRecords(numMessages, tp)
+ sendRecords(numMessages, tp2)
+ consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithAssign")
+ consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithAssign")
+ val consumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+ try {
+ consumer.assign(List(tp).asJava)
+ var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
+ TestUtils.waitUntilTrue(() => {
+ records = consumer.poll(100)
+ !records.records(tp).isEmpty
+ }, "Consumer did not consume any message before timeout.")
+ // Verify the metric exist.
+ val tags = new util.HashMap[String, String]()
+ tags.put("client-id", "testPerPartitionLeadMetricsCleanUpWithAssign")
+ tags.put("topic", tp.topic())
+ tags.put("partition", String.valueOf(tp.partition()))
+ val fetchLead = consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags))
+ assertNotNull(fetchLead)
+
+ assertTrue(s"The lead should be ${records.count}", records.count == fetchLead.metricValue())
+
+ consumer.assign(List(tp2).asJava)
+ TestUtils.waitUntilTrue(() => !consumer.poll(100).isEmpty, "Consumer did not consume any message before timeout.")
+ assertNull(consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags)))
+ } finally {
+ consumer.close()
+ }
+ }
+
+ @Test
def testPerPartitionLagMetricsCleanUpWithAssign() {
val numMessages = 1000
// Test assign
@@ -1468,6 +1551,34 @@ class PlaintextConsumerTest extends BaseConsumerTest {
}
@Test
+ def testPerPartitionLeadWithMaxPollRecords() {
+ val numMessages = 1000
+ val maxPollRecords = 10
+ sendRecords(numMessages, tp)
+ consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLeadWithMaxPollRecords")
+ consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLeadWithMaxPollRecords")
+ consumerConfig.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString)
+ val consumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+ consumer.assign(List(tp).asJava)
+ try {
+ var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
+ TestUtils.waitUntilTrue(() => {
+ records = consumer.poll(100)
+ !records.isEmpty
+ }, "Consumer did not consume any message before timeout.")
+
+ val tags = new util.HashMap[String, String]()
+ tags.put("client-id", "testPerPartitionLeadWithMaxPollRecords")
+ tags.put("topic", tp.topic())
+ tags.put("partition", String.valueOf(tp.partition()))
+ val lead = consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags))
+ assertTrue(s"The lead should be $maxPollRecords", lead.metricValue() == maxPollRecords)
+ } finally {
+ consumer.close()
+ }
+ }
+
+ @Test
def testPerPartitionLagWithMaxPollRecords() {
val numMessages = 1000
val maxPollRecords = 10
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.