You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/02/25 02:44:00 UTC

[jira] [Commented] (KAFKA-6590) Consumer bytes-fetched and records-fetched metrics are not aggregated correctly

    [ https://issues.apache.org/jira/browse/KAFKA-6590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16375898#comment-16375898 ] 

ASF GitHub Bot commented on KAFKA-6590:
---------------------------------------

hachikuji closed pull request #4278: KAFKA-6590: Fix bug in aggregation of consumer fetch bytes and counts metrics
URL: https://github.com/apache/kafka/pull/4278
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index ac199bc4b0d..b46a3a29380 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1276,8 +1276,8 @@ public void record(TopicPartition partition, int bytes, int records) {
 
             if (this.unrecordedPartitions.isEmpty()) {
                 // once all expected partitions from the fetch have reported in, record the metrics
-                this.sensors.bytesFetched.record(topicFetchMetric.fetchBytes);
-                this.sensors.recordsFetched.record(topicFetchMetric.fetchRecords);
+                this.sensors.bytesFetched.record(this.fetchMetrics.fetchBytes);
+                this.sensors.recordsFetched.record(this.fetchMetrics.fetchRecords);
 
                 // also record per-topic metrics
                 for (Map.Entry<String, FetchMetrics> entry: this.topicFetchMetrics.entrySet()) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index e8bb4e696ba..27aba410746 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1447,26 +1447,50 @@ public void testReadCommittedLagMetric() {
 
     @Test
     public void testFetchResponseMetrics() {
-        subscriptions.assignFromUser(singleton(tp0));
-        subscriptions.seek(tp0, 0);
+        String topic1 = "foo";
+        String topic2 = "bar";
+        TopicPartition tp1 = new TopicPartition(topic1, 0);
+        TopicPartition tp2 = new TopicPartition(topic2, 0);
+
+        Map<String, Integer> partitionCounts = new HashMap<>();
+        partitionCounts.put(topic1, 1);
+        partitionCounts.put(topic2, 1);
+        Cluster cluster = TestUtils.clusterWith(1, partitionCounts);
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
-        Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
-        KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
-        KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg));
-
-        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
-                TimestampType.CREATE_TIME, 0L);
-        for (int v = 0; v < 3; v++)
-            builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
-        MemoryRecords records = builder.build();
+        subscriptions.assignFromUser(Utils.mkSet(tp1, tp2));
 
         int expectedBytes = 0;
-        for (Record record : records.records())
-            expectedBytes += record.sizeInBytes();
+        LinkedHashMap<TopicPartition, FetchResponse.PartitionData> fetchPartitionData = new LinkedHashMap<>();
 
-        fetchRecords(tp0, records, Errors.NONE, 100L, 0);
+        for (TopicPartition tp : Utils.mkSet(tp1, tp2)) {
+            subscriptions.seek(tp, 0);
+
+            MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+                    TimestampType.CREATE_TIME, 0L);
+            for (int v = 0; v < 3; v++)
+                builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
+            MemoryRecords records = builder.build();
+            for (Record record : records.records())
+                expectedBytes += record.sizeInBytes();
+
+            fetchPartitionData.put(tp, new FetchResponse.PartitionData(Errors.NONE, 15L,
+                    FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records));
+        }
+
+        assertEquals(1, fetcher.sendFetches());
+        client.prepareResponse(new FetchResponse(Errors.NONE, fetchPartitionData, 0, INVALID_SESSION_ID));
+        consumerClient.poll(0);
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords();
+        assertEquals(3, fetchedRecords.get(tp1).size());
+        assertEquals(3, fetchedRecords.get(tp2).size());
+
+        Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+        KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
+        KafkaMetric recordsCountAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg));
         assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON);
-        assertEquals(3, recordsCountAverage.value(), EPSILON);
+        assertEquals(6, recordsCountAverage.value(), EPSILON);
     }
 
     @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Consumer bytes-fetched and records-fetched metrics are not aggregated correctly
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-6590
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6590
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jason Gustafson
>            Priority: Major
>
> There is a bug in the recording of these metrics which causes only the aggregated value from only one topic in the response to be recorded for the entire fetch.
> This issue was found by github user kiest, who submitted a patch here: https://github.com/apache/kafka/pull/4278/files



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)