You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/07/07 21:43:06 UTC

kafka git commit: KAFKA-2306: add another metric for buffer exhausted; reviewed by Guozhang Wang

Repository: kafka
Updated Branches:
  refs/heads/trunk ad485e148 -> a99f70feb


KAFKA-2306: add another metric for buffer exhausted; reviewed by Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a99f70fe
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a99f70fe
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a99f70fe

Branch: refs/heads/trunk
Commit: a99f70feb23db9ac4274cad9e8cbbf9934d3d075
Parents: ad485e1
Author: Dong Lin <li...@gmail.com>
Authored: Tue Jul 7 12:42:49 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Jul 7 12:42:49 2015 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/producer/KafkaProducer.java  |  4 ++++
 .../clients/producer/internals/RecordAccumulator.java     | 10 ++++++++--
 2 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a99f70fe/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 5671a3f..03b8dd2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -403,6 +403,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         } catch (InterruptedException e) {
             this.errors.record();
             throw new InterruptException(e);
+        } catch (BufferExhaustedException e) {
+            this.errors.record();
+            this.metrics.sensor("buffer-exhausted-records").record();
+            throw e;
         } catch (KafkaException e) {
             this.errors.record();
             throw e;

http://git-wip-us.apache.org/repos/asf/kafka/blob/a99f70fe/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 87dbd64..a152bd7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.Record;
@@ -112,7 +114,6 @@ public final class RecordAccumulator {
     }
 
     private void registerMetrics(Metrics metrics, String metricGrpName, Map<String, String> metricTags) {
-
         MetricName metricName = new MetricName("waiting-threads", metricGrpName, "The number of user threads blocked waiting for buffer memory to enqueue their records", metricTags);
         Measurable waitingThreads = new Measurable() {
             public double measure(MetricConfig config, long now) {
@@ -120,7 +121,7 @@ public final class RecordAccumulator {
             }
         };
         metrics.addMetric(metricName, waitingThreads);
-                 
+
         metricName = new MetricName("buffer-total-bytes", metricGrpName, "The maximum amount of buffer memory the client can use (whether or not it is currently used).", metricTags);
         Measurable totalBytes = new Measurable() {
             public double measure(MetricConfig config, long now) {
@@ -128,6 +129,7 @@ public final class RecordAccumulator {
             }
         };
         metrics.addMetric(metricName, totalBytes);
+
         metricName = new MetricName("buffer-available-bytes", metricGrpName, "The total amount of buffer memory that is not being used (either unallocated or in the free list).", metricTags);
         Measurable availableBytes = new Measurable() {
             public double measure(MetricConfig config, long now) {
@@ -135,6 +137,10 @@ public final class RecordAccumulator {
             }
         };
         metrics.addMetric(metricName, availableBytes);
+
+        Sensor bufferExhaustedRecordSensor = metrics.sensor("buffer-exhausted-records");
+        metricName = new MetricName("buffer-exhausted-rate", metricGrpName, "The average per-second number of record sends that are dropped due to buffer exhaustion", metricTags);
+        bufferExhaustedRecordSensor.add(metricName, new Rate());
     }
 
     /**