You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/06/01 07:48:53 UTC

[kafka] branch trunk updated: MINOR: inline metrics in RecordAccumulator (#12227)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 283a9955cf MINOR: inline metrics in RecordAccumulator (#12227)
283a9955cf is described below

commit 283a9955cffc566e4396fda45b550019b0d1fb8d
Author: David Jacot <dj...@confluent.io>
AuthorDate: Wed Jun 1 09:48:31 2022 +0200

    MINOR: inline metrics in RecordAccumulator (#12227)
    
    Reviewers: Kvicii <Ka...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../producer/internals/RecordAccumulator.java      | 40 ++++++++--------------
 1 file changed, 14 insertions(+), 26 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index ef1ff7d7d4..4168ea68aa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -36,14 +36,11 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.utils.ProducerIdAndEpoch;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.metrics.Measurable;
-import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.AbstractRecords;
 import org.apache.kafka.common.record.CompressionRatioEstimator;
@@ -195,29 +192,20 @@ public class RecordAccumulator {
     }
 
     private void registerMetrics(Metrics metrics, String metricGrpName) {
-        MetricName metricName = metrics.metricName("waiting-threads", metricGrpName, "The number of user threads blocked waiting for buffer memory to enqueue their records");
-        Measurable waitingThreads = new Measurable() {
-            public double measure(MetricConfig config, long now) {
-                return free.queued();
-            }
-        };
-        metrics.addMetric(metricName, waitingThreads);
-
-        metricName = metrics.metricName("buffer-total-bytes", metricGrpName, "The maximum amount of buffer memory the client can use (whether or not it is currently used).");
-        Measurable totalBytes = new Measurable() {
-            public double measure(MetricConfig config, long now) {
-                return free.totalMemory();
-            }
-        };
-        metrics.addMetric(metricName, totalBytes);
-
-        metricName = metrics.metricName("buffer-available-bytes", metricGrpName, "The total amount of buffer memory that is not being used (either unallocated or in the free list).");
-        Measurable availableBytes = new Measurable() {
-            public double measure(MetricConfig config, long now) {
-                return free.availableMemory();
-            }
-        };
-        metrics.addMetric(metricName, availableBytes);
+        metrics.addMetric(
+            metrics.metricName("waiting-threads", metricGrpName,
+                "The number of user threads blocked waiting for buffer memory to enqueue their records"),
+            (config, now) -> free.queued());
+
+        metrics.addMetric(
+            metrics.metricName("buffer-total-bytes", metricGrpName,
+                "The maximum amount of buffer memory the client can use (whether or not it is currently used)."),
+            (config, now) -> free.totalMemory());
+
+        metrics.addMetric(
+            metrics.metricName("buffer-available-bytes", metricGrpName,
+                "The total amount of buffer memory that is not being used (either unallocated or in the free list)."),
+            (config, now) -> free.availableMemory());
     }
 
     private void setPartition(AppendCallbacks callbacks, int partition) {