You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/07/07 21:43:06 UTC
kafka git commit: KAFKA-2306: add another metric for buffer exhausted;
reviewed by Guozhang Wang
Repository: kafka
Updated Branches:
refs/heads/trunk ad485e148 -> a99f70feb
KAFKA-2306: add another metric for buffer exhausted; reviewed by Guozhang Wang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a99f70fe
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a99f70fe
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a99f70fe
Branch: refs/heads/trunk
Commit: a99f70feb23db9ac4274cad9e8cbbf9934d3d075
Parents: ad485e1
Author: Dong Lin <li...@gmail.com>
Authored: Tue Jul 7 12:42:49 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Jul 7 12:42:49 2015 -0700
----------------------------------------------------------------------
.../org/apache/kafka/clients/producer/KafkaProducer.java | 4 ++++
.../clients/producer/internals/RecordAccumulator.java | 10 ++++++++--
2 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a99f70fe/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 5671a3f..03b8dd2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -403,6 +403,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
} catch (InterruptedException e) {
this.errors.record();
throw new InterruptException(e);
+ } catch (BufferExhaustedException e) {
+ this.errors.record();
+ this.metrics.sensor("buffer-exhausted-records").record();
+ throw e;
} catch (KafkaException e) {
this.errors.record();
throw e;
http://git-wip-us.apache.org/repos/asf/kafka/blob/a99f70fe/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 87dbd64..a152bd7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
@@ -112,7 +114,6 @@ public final class RecordAccumulator {
}
private void registerMetrics(Metrics metrics, String metricGrpName, Map<String, String> metricTags) {
-
MetricName metricName = new MetricName("waiting-threads", metricGrpName, "The number of user threads blocked waiting for buffer memory to enqueue their records", metricTags);
Measurable waitingThreads = new Measurable() {
public double measure(MetricConfig config, long now) {
@@ -120,7 +121,7 @@ public final class RecordAccumulator {
}
};
metrics.addMetric(metricName, waitingThreads);
-
+
metricName = new MetricName("buffer-total-bytes", metricGrpName, "The maximum amount of buffer memory the client can use (whether or not it is currently used).", metricTags);
Measurable totalBytes = new Measurable() {
public double measure(MetricConfig config, long now) {
@@ -128,6 +129,7 @@ public final class RecordAccumulator {
}
};
metrics.addMetric(metricName, totalBytes);
+
metricName = new MetricName("buffer-available-bytes", metricGrpName, "The total amount of buffer memory that is not being used (either unallocated or in the free list).", metricTags);
Measurable availableBytes = new Measurable() {
public double measure(MetricConfig config, long now) {
@@ -135,6 +137,10 @@ public final class RecordAccumulator {
}
};
metrics.addMetric(metricName, availableBytes);
+
+ Sensor bufferExhaustedRecordSensor = metrics.sensor("buffer-exhausted-records");
+ metricName = new MetricName("buffer-exhausted-rate", metricGrpName, "The average per-second number of record sends that are dropped due to buffer exhaustion", metricTags);
+ bufferExhaustedRecordSensor.add(metricName, new Rate());
}
/**