You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/06/01 07:48:53 UTC
[kafka] branch trunk updated: MINOR: inline metrics in RecordAccumulator (#12227)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 283a9955cf MINOR: inline metrics in RecordAccumulator (#12227)
283a9955cf is described below
commit 283a9955cffc566e4396fda45b550019b0d1fb8d
Author: David Jacot <dj...@confluent.io>
AuthorDate: Wed Jun 1 09:48:31 2022 +0200
MINOR: inline metrics in RecordAccumulator (#12227)
Reviewers: Kvicii <Ka...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
.../producer/internals/RecordAccumulator.java | 40 ++++++++--------------
1 file changed, 14 insertions(+), 26 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index ef1ff7d7d4..4168ea68aa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -36,14 +36,11 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.metrics.Measurable;
-import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionRatioEstimator;
@@ -195,29 +192,20 @@ public class RecordAccumulator {
}
private void registerMetrics(Metrics metrics, String metricGrpName) {
- MetricName metricName = metrics.metricName("waiting-threads", metricGrpName, "The number of user threads blocked waiting for buffer memory to enqueue their records");
- Measurable waitingThreads = new Measurable() {
- public double measure(MetricConfig config, long now) {
- return free.queued();
- }
- };
- metrics.addMetric(metricName, waitingThreads);
-
- metricName = metrics.metricName("buffer-total-bytes", metricGrpName, "The maximum amount of buffer memory the client can use (whether or not it is currently used).");
- Measurable totalBytes = new Measurable() {
- public double measure(MetricConfig config, long now) {
- return free.totalMemory();
- }
- };
- metrics.addMetric(metricName, totalBytes);
-
- metricName = metrics.metricName("buffer-available-bytes", metricGrpName, "The total amount of buffer memory that is not being used (either unallocated or in the free list).");
- Measurable availableBytes = new Measurable() {
- public double measure(MetricConfig config, long now) {
- return free.availableMemory();
- }
- };
- metrics.addMetric(metricName, availableBytes);
+ metrics.addMetric(
+ metrics.metricName("waiting-threads", metricGrpName,
+ "The number of user threads blocked waiting for buffer memory to enqueue their records"),
+ (config, now) -> free.queued());
+
+ metrics.addMetric(
+ metrics.metricName("buffer-total-bytes", metricGrpName,
+ "The maximum amount of buffer memory the client can use (whether or not it is currently used)."),
+ (config, now) -> free.totalMemory());
+
+ metrics.addMetric(
+ metrics.metricName("buffer-available-bytes", metricGrpName,
+ "The total amount of buffer memory that is not being used (either unallocated or in the free list)."),
+ (config, now) -> free.availableMemory());
}
private void setPartition(AppendCallbacks callbacks, int partition) {