You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/27 16:55:26 UTC

kafka git commit: KAFKA-5597; Improve Metrics docs generation

Repository: kafka
Updated Branches:
  refs/heads/trunk 8dfdcfd02 -> 8e0e2a5b2


KAFKA-5597; Improve Metrics docs generation

Instead of having the metrics registry and the
org.apache.kafka.common.metrics.Metrics object be separate things,
have the metrics registry hold a copy of the Metrics object.
That way, all the metricInstance stuff is hidden, and we don't
have to make sure that the metrics registry and the Metrics
object are configured identicailly (with the same tags).

I personally think this looks a little better.

Author: James Cheng <jy...@yahoo.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3799 from wushujames/producer_sender_metrics_docs_different


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8e0e2a5b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8e0e2a5b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8e0e2a5b

Branch: refs/heads/trunk
Commit: 8e0e2a5b238092a3730d22d36046fe078a01c15b
Parents: 8dfdcfd
Author: James Cheng <jy...@yahoo.com>
Authored: Wed Sep 27 17:54:29 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed Sep 27 17:54:37 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |   5 +-
 .../producer/internals/ProducerMetrics.java     |  25 +-
 .../clients/producer/internals/Sender.java      |  88 ++----
 .../internals/SenderMetricsRegistry.java        | 304 ++++++++++++-------
 .../clients/producer/internals/SenderTest.java  |  46 +--
 .../internals/TransactionManagerTest.java       |   4 +-
 6 files changed, 265 insertions(+), 207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8e0e2a5b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 8a1c2b7..d40a16c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -338,7 +338,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     MetricsReporter.class);
             reporters.add(new JmxReporter(JMX_PREFIX));
             this.metrics = new Metrics(metricConfig, reporters, time);
-            ProducerMetrics metricsRegistry = new ProducerMetrics(metricTags.keySet(), "producer");
+            ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
             this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
             long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
             if (keySerializer == null) {
@@ -391,7 +391,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
             this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
-            Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics, metricsRegistry.senderMetrics);
+            Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
             NetworkClient client = new NetworkClient(
                     new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
                             this.metrics, time, "producer", channelBuilder, logContext),
@@ -416,7 +416,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                     acks,
                     retries,
-                    this.metrics,
                     metricsRegistry.senderMetrics,
                     Time.SYSTEM,
                     this.requestTimeoutMs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e0e2a5b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetrics.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetrics.java
index 6b8487e..f78f3d6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetrics.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetrics.java
@@ -17,32 +17,37 @@
 package org.apache.kafka.clients.producer.internals;
 
 import java.util.ArrayList;
-import java.util.HashSet;
+import java.util.Collections;
 import java.util.List;
-import java.util.Set;
+import java.util.Map;
 
 import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 
 public class ProducerMetrics {
 
-    public SenderMetricsRegistry senderMetrics;
+    public final SenderMetricsRegistry senderMetrics;
+    private final Metrics metrics;
 
-    public ProducerMetrics(Set<String> tags, String metricGrpPrefix) {
-        this.senderMetrics = new SenderMetricsRegistry(tags);
+    public ProducerMetrics(Metrics metrics) {
+        this.metrics = metrics;
+        this.senderMetrics = new SenderMetricsRegistry(this.metrics);
     }
 
     private List<MetricNameTemplate> getAllTemplates() {
         List<MetricNameTemplate> l = new ArrayList<>();
-        l.addAll(this.senderMetrics.getAllTemplates());
+        l.addAll(this.senderMetrics.allTemplates());
         return l;
     }
 
     public static void main(String[] args) {
-        Set<String> tags = new HashSet<>();
-        tags.add("client-id");
-        ProducerMetrics metrics = new ProducerMetrics(tags, "producer");
-        System.out.println(Metrics.toHtmlTable("kafka.producer", metrics.getAllTemplates()));
+        Map<String, String> metricTags = Collections.singletonMap("client-id", "client-id");
+        MetricConfig metricConfig = new MetricConfig().tags(metricTags);
+        Metrics metrics = new Metrics(metricConfig);
+
+        ProducerMetrics metricsRegistry = new ProducerMetrics(metrics);
+        System.out.println(Metrics.toHtmlTable("kafka.producer", metricsRegistry.getAllTemplates()));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e0e2a5b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 1aadf3d..6d9021c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -40,7 +40,6 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
-import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
@@ -129,7 +128,6 @@ public class Sender implements Runnable {
                   int maxRequestSize,
                   short acks,
                   int retries,
-                  Metrics metrics,
                   SenderMetricsRegistry metricsRegistry,
                   Time time,
                   int requestTimeout,
@@ -146,7 +144,7 @@ public class Sender implements Runnable {
         this.acks = acks;
         this.retries = retries;
         this.time = time;
-        this.sensors = new SenderMetrics(metrics, metricsRegistry);
+        this.sensors = new SenderMetrics(metricsRegistry);
         this.requestTimeout = requestTimeout;
         this.retryBackoffMs = retryBackoffMs;
         this.apiVersions = apiVersions;
@@ -707,12 +705,10 @@ public class Sender implements Runnable {
         this.client.wakeup();
     }
 
-    public static Sensor throttleTimeSensor(Metrics metrics, SenderMetricsRegistry metricsRegistry) {
+    public static Sensor throttleTimeSensor(SenderMetricsRegistry metrics) {
         Sensor produceThrottleTimeSensor = metrics.sensor("produce-throttle-time");
-        MetricName m = metrics.metricInstance(metricsRegistry.produceThrottleTimeAvg);
-        produceThrottleTimeSensor.add(m, new Avg());
-        m = metrics.metricInstance(metricsRegistry.produceThrottleTimeMax);
-        produceThrottleTimeSensor.add(m, new Max());
+        produceThrottleTimeSensor.add(metrics.produceThrottleTimeAvg, new Avg());
+        produceThrottleTimeSensor.add(metrics.produceThrottleTimeMax, new Max());
         return produceThrottleTimeSensor;
     }
 
@@ -720,7 +716,6 @@ public class Sender implements Runnable {
      * A collection of sensors for the sender
      */
     private class SenderMetrics {
-        private final Metrics metrics;
         public final Sensor retrySensor;
         public final Sensor errorSensor;
         public final Sensor queueTimeSensor;
@@ -730,74 +725,53 @@ public class Sender implements Runnable {
         public final Sensor compressionRateSensor;
         public final Sensor maxRecordSizeSensor;
         public final Sensor batchSplitSensor;
-        private SenderMetricsRegistry metricsRegistry;
+        private final SenderMetricsRegistry metrics;
 
-        public SenderMetrics(Metrics metrics, SenderMetricsRegistry metricsRegistry) {
+        public SenderMetrics(SenderMetricsRegistry metrics) {
             this.metrics = metrics;
-            this.metricsRegistry = metricsRegistry;
 
             this.batchSizeSensor = metrics.sensor("batch-size");
-            MetricName m = metrics.metricInstance(metricsRegistry.batchSizeAvg);
-            this.batchSizeSensor.add(m, new Avg());
-            m = metrics.metricInstance(metricsRegistry.batchSizeMax);
-            this.batchSizeSensor.add(m, new Max());
+            this.batchSizeSensor.add(metrics.batchSizeAvg, new Avg());
+            this.batchSizeSensor.add(metrics.batchSizeMax, new Max());
 
             this.compressionRateSensor = metrics.sensor("compression-rate");
-            m = metrics.metricInstance(metricsRegistry.compressionRateAvg);
-            this.compressionRateSensor.add(m, new Avg());
+            this.compressionRateSensor.add(metrics.compressionRateAvg, new Avg());
 
             this.queueTimeSensor = metrics.sensor("queue-time");
-            m = metrics.metricInstance(metricsRegistry.recordQueueTimeAvg);
-            this.queueTimeSensor.add(m, new Avg());
-            m = metrics.metricInstance(metricsRegistry.recordQueueTimeMax);
-            this.queueTimeSensor.add(m, new Max());
+            this.queueTimeSensor.add(metrics.recordQueueTimeAvg, new Avg());
+            this.queueTimeSensor.add(metrics.recordQueueTimeMax, new Max());
 
             this.requestTimeSensor = metrics.sensor("request-time");
-            m = metrics.metricInstance(metricsRegistry.requestLatencyAvg);
-            this.requestTimeSensor.add(m, new Avg());
-            m = metrics.metricInstance(metricsRegistry.requestLatencyMax);
-            this.requestTimeSensor.add(m, new Max());
+            this.requestTimeSensor.add(metrics.requestLatencyAvg, new Avg());
+            this.requestTimeSensor.add(metrics.requestLatencyMax, new Max());
 
             this.recordsPerRequestSensor = metrics.sensor("records-per-request");
-            MetricName rateMetricName = metrics.metricInstance(metricsRegistry.recordSendRate);
-            MetricName totalMetricName = metrics.metricInstance(metricsRegistry.recordSendTotal);
-            this.recordsPerRequestSensor.add(new Meter(rateMetricName, totalMetricName));
-            m = metrics.metricInstance(metricsRegistry.recordsPerRequestAvg);
-            this.recordsPerRequestSensor.add(m, new Avg());
+            this.recordsPerRequestSensor.add(new Meter(metrics.recordSendRate, metrics.recordSendTotal));
+            this.recordsPerRequestSensor.add(metrics.recordsPerRequestAvg, new Avg());
 
             this.retrySensor = metrics.sensor("record-retries");
-            rateMetricName = metrics.metricInstance(metricsRegistry.recordRetryRate);
-            totalMetricName = metrics.metricInstance(metricsRegistry.recordRetryTotal);
-            this.retrySensor.add(new Meter(rateMetricName, totalMetricName));
+            this.retrySensor.add(new Meter(metrics.recordRetryRate, metrics.recordRetryTotal));
 
             this.errorSensor = metrics.sensor("errors");
-            rateMetricName = metrics.metricInstance(metricsRegistry.recordErrorRate);
-            totalMetricName = metrics.metricInstance(metricsRegistry.recordErrorTotal);
-            this.errorSensor.add(new Meter(rateMetricName, totalMetricName));
+            this.errorSensor.add(new Meter(metrics.recordErrorRate, metrics.recordErrorTotal));
 
             this.maxRecordSizeSensor = metrics.sensor("record-size");
-            m = metrics.metricInstance(metricsRegistry.recordSizeMax);
-            this.maxRecordSizeSensor.add(m, new Max());
-            m = metrics.metricInstance(metricsRegistry.recordSizeAvg);
-            this.maxRecordSizeSensor.add(m, new Avg());
+            this.maxRecordSizeSensor.add(metrics.recordSizeMax, new Max());
+            this.maxRecordSizeSensor.add(metrics.recordSizeAvg, new Avg());
 
-            m = metrics.metricInstance(metricsRegistry.requestsInFlight);
-            this.metrics.addMetric(m, new Measurable() {
+            this.metrics.addMetric(metrics.requestsInFlight, new Measurable() {
                 public double measure(MetricConfig config, long now) {
                     return client.inFlightRequestCount();
                 }
             });
-            m = metrics.metricInstance(metricsRegistry.metadataAge);
-            metrics.addMetric(m, new Measurable() {
+            metrics.addMetric(metrics.metadataAge, new Measurable() {
                 public double measure(MetricConfig config, long now) {
                     return (now - metadata.lastSuccessfulUpdate()) / 1000.0;
                 }
             });
 
             this.batchSplitSensor = metrics.sensor("batch-split-rate");
-            rateMetricName = metrics.metricInstance(metricsRegistry.batchSplitRate);
-            totalMetricName = metrics.metricInstance(metricsRegistry.batchSplitTotal);
-            this.batchSplitSensor.add(new Meter(rateMetricName, totalMetricName));
+            this.batchSplitSensor.add(new Meter(metrics.batchSplitRate, metrics.batchSplitTotal));
         }
 
         private void maybeRegisterTopicMetrics(String topic) {
@@ -809,31 +783,31 @@ public class Sender implements Runnable {
                 Map<String, String> metricTags = Collections.singletonMap("topic", topic);
 
                 topicRecordCount = this.metrics.sensor(topicRecordsCountName);
-                MetricName rateMetricName = this.metrics.metricInstance(metricsRegistry.topicRecordSendRate, metricTags);
-                MetricName totalMetricName = this.metrics.metricInstance(metricsRegistry.topicRecordSendTotal, metricTags);
+                MetricName rateMetricName = this.metrics.topicRecordSendRate(metricTags);
+                MetricName totalMetricName = this.metrics.topicRecordSendTotal(metricTags);
                 topicRecordCount.add(new Meter(rateMetricName, totalMetricName));
 
                 String topicByteRateName = "topic." + topic + ".bytes";
                 Sensor topicByteRate = this.metrics.sensor(topicByteRateName);
-                rateMetricName = this.metrics.metricInstance(metricsRegistry.topicByteRate, metricTags);
-                totalMetricName = this.metrics.metricInstance(metricsRegistry.topicByteTotal, metricTags);
+                rateMetricName = this.metrics.topicByteRate(metricTags);
+                totalMetricName = this.metrics.topicByteTotal(metricTags);
                 topicByteRate.add(new Meter(rateMetricName, totalMetricName));
 
                 String topicCompressionRateName = "topic." + topic + ".compression-rate";
                 Sensor topicCompressionRate = this.metrics.sensor(topicCompressionRateName);
-                MetricName m = this.metrics.metricInstance(metricsRegistry.topicCompressionRate, metricTags);
+                MetricName m = this.metrics.topicCompressionRate(metricTags);
                 topicCompressionRate.add(m, new Avg());
 
                 String topicRetryName = "topic." + topic + ".record-retries";
                 Sensor topicRetrySensor = this.metrics.sensor(topicRetryName);
-                rateMetricName = this.metrics.metricInstance(metricsRegistry.topicRecordRetryRate, metricTags);
-                totalMetricName = this.metrics.metricInstance(metricsRegistry.topicRecordRetryTotal, metricTags);
+                rateMetricName = this.metrics.topicRecordRetryRate(metricTags);
+                totalMetricName = this.metrics.topicRecordRetryTotal(metricTags);
                 topicRetrySensor.add(new Meter(rateMetricName, totalMetricName));
 
                 String topicErrorName = "topic." + topic + ".record-errors";
                 Sensor topicErrorSensor = this.metrics.sensor(topicErrorName);
-                rateMetricName = this.metrics.metricInstance(metricsRegistry.topicRecordErrorRate, metricTags);
-                totalMetricName = this.metrics.metricInstance(metricsRegistry.topicRecordErrorTotal, metricTags);
+                rateMetricName = this.metrics.topicRecordErrorRate(metricTags);
+                totalMetricName = this.metrics.topicRecordErrorTotal(metricTags);
                 topicErrorSensor.add(new Meter(rateMetricName, totalMetricName));
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e0e2a5b/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
index 21466e1..21dbca6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
@@ -16,134 +16,206 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
 
 public class SenderMetricsRegistry {
 
     final static String METRIC_GROUP_NAME = "producer-metrics";
     final static String TOPIC_METRIC_GROUP_NAME = "producer-topic-metrics";
 
-    public MetricNameTemplate batchSizeAvg;
-    public MetricNameTemplate batchSizeMax;
-    public MetricNameTemplate compressionRateAvg;
-    public MetricNameTemplate recordQueueTimeAvg;
-    public MetricNameTemplate recordQueueTimeMax;
-    public MetricNameTemplate requestLatencyAvg;
-    public MetricNameTemplate requestLatencyMax;
-    public MetricNameTemplate produceThrottleTimeAvg;
-    public MetricNameTemplate produceThrottleTimeMax;
-    public MetricNameTemplate recordSendRate;
-    public MetricNameTemplate recordSendTotal;
-    public MetricNameTemplate recordsPerRequestAvg;
-    public MetricNameTemplate recordRetryRate;
-    public MetricNameTemplate recordRetryTotal;
-    public MetricNameTemplate recordErrorRate;
-    public MetricNameTemplate recordErrorTotal;
-    public MetricNameTemplate recordSizeMax;
-    public MetricNameTemplate recordSizeAvg;
-    public MetricNameTemplate requestsInFlight;
-    public MetricNameTemplate metadataAge;
-    public MetricNameTemplate topicRecordSendRate;
-    public MetricNameTemplate topicRecordSendTotal;
-    public MetricNameTemplate topicByteRate;
-    public MetricNameTemplate topicByteTotal;
-    public MetricNameTemplate topicCompressionRate;
-    public MetricNameTemplate topicRecordRetryRate;
-    public MetricNameTemplate topicRecordRetryTotal;
-    public MetricNameTemplate topicRecordErrorRate;
-    public MetricNameTemplate topicRecordErrorTotal;
-    public MetricNameTemplate batchSplitRate;
-    public MetricNameTemplate batchSplitTotal;
-
-    public SenderMetricsRegistry() {
-        this(new HashSet<String>());
-    }
-
-    public SenderMetricsRegistry(Set<String> tags) {
-
-        /* ***** Client level *****/
+    private final List<MetricNameTemplate> allTemplates;
+
+    public final MetricName batchSizeAvg;
+    public final MetricName batchSizeMax;
+    public final MetricName compressionRateAvg;
+    public final MetricName recordQueueTimeAvg;
+    public final MetricName recordQueueTimeMax;
+    public final MetricName requestLatencyAvg;
+    public final MetricName requestLatencyMax;   
+    public final MetricName produceThrottleTimeAvg;
+    public final MetricName produceThrottleTimeMax;
+    public final MetricName recordSendRate;
+    public final MetricName recordSendTotal;
+    public final MetricName recordsPerRequestAvg;
+    public final MetricName recordRetryRate;
+    public final MetricName recordRetryTotal;
+    public final MetricName recordErrorRate;
+    public final MetricName recordErrorTotal;
+    public final MetricName recordSizeMax;
+    public final MetricName recordSizeAvg;
+    public final MetricName requestsInFlight;
+    public final MetricName metadataAge;
+    public final MetricName batchSplitRate;
+    public final MetricName batchSplitTotal;
+
+    private final MetricNameTemplate topicRecordSendRate;
+    private final MetricNameTemplate topicRecordSendTotal;
+    private final MetricNameTemplate topicByteRate;
+    private final MetricNameTemplate topicByteTotal;
+    private final MetricNameTemplate topicCompressionRate;
+    private final MetricNameTemplate topicRecordRetryRate;
+    private final MetricNameTemplate topicRecordRetryTotal;
+    private final MetricNameTemplate topicRecordErrorRate;
+    private final MetricNameTemplate topicRecordErrorTotal;
+    
+    private final Metrics metrics;
+    private final Set<String> tags;
+    private final HashSet<String> topicTags;
+
+    public SenderMetricsRegistry(Metrics metrics) {
+        this.metrics = metrics;
+        this.tags = this.metrics.config().tags().keySet();
+        this.allTemplates = new ArrayList<MetricNameTemplate>();
         
-        this.batchSizeAvg = new MetricNameTemplate("batch-size-avg", METRIC_GROUP_NAME, "The average number of bytes sent per partition per-request.", tags);
-        this.batchSizeMax = new MetricNameTemplate("batch-size-max", METRIC_GROUP_NAME, "The max number of bytes sent per partition per-request.", tags);
-        this.compressionRateAvg = new MetricNameTemplate("compression-rate-avg", METRIC_GROUP_NAME, "The average compression rate of record batches.", tags);
-        this.recordQueueTimeAvg = new MetricNameTemplate("record-queue-time-avg", METRIC_GROUP_NAME, "The average time in ms record batches spent in the send buffer.", tags);
-        this.recordQueueTimeMax = new MetricNameTemplate("record-queue-time-max", METRIC_GROUP_NAME, "The maximum time in ms record batches spent in the send buffer.", tags);
-        this.requestLatencyAvg = new MetricNameTemplate("request-latency-avg", METRIC_GROUP_NAME, "The average request latency in ms", tags);
-        this.requestLatencyMax = new MetricNameTemplate("request-latency-max", METRIC_GROUP_NAME, "The maximum request latency in ms", tags);
-        this.recordSendRate = new MetricNameTemplate("record-send-rate", METRIC_GROUP_NAME, "The average number of records sent per second.", tags);
-        this.recordSendTotal = new MetricNameTemplate("record-send-total", METRIC_GROUP_NAME, "The total number of records sent.", tags);
-        this.recordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", METRIC_GROUP_NAME, "The average number of records per request.", tags);
-        this.recordRetryRate = new MetricNameTemplate("record-retry-rate", METRIC_GROUP_NAME, "The average per-second number of retried record sends", tags);
-        this.recordRetryTotal = new MetricNameTemplate("record-retry-total", METRIC_GROUP_NAME, "The total number of retried record sends", tags);
-        this.recordErrorRate = new MetricNameTemplate("record-error-rate", METRIC_GROUP_NAME, "The average per-second number of record sends that resulted in errors", tags);
-        this.recordErrorTotal = new MetricNameTemplate("record-error-total", METRIC_GROUP_NAME, "The total number of record sends that resulted in errors", tags);
-        this.recordSizeMax = new MetricNameTemplate("record-size-max", METRIC_GROUP_NAME, "The maximum record size", tags);
-        this.recordSizeAvg = new MetricNameTemplate("record-size-avg", METRIC_GROUP_NAME, "The average record size", tags);
-        this.requestsInFlight = new MetricNameTemplate("requests-in-flight", METRIC_GROUP_NAME, "The current number of in-flight requests awaiting a response.", tags);
-        this.metadataAge = new MetricNameTemplate("metadata-age", METRIC_GROUP_NAME, "The age in seconds of the current producer metadata being used.", tags);
-        this.batchSplitRate = new MetricNameTemplate("batch-split-rate", METRIC_GROUP_NAME, "The average number of batch splits per second", tags);
-        this.batchSplitTotal = new MetricNameTemplate("batch-split-total", METRIC_GROUP_NAME, "The total number of batch splits", tags);
-
-        this.produceThrottleTimeAvg = new MetricNameTemplate("produce-throttle-time-avg", METRIC_GROUP_NAME, "The average time in ms a request was throttled by a broker", tags);
-        this.produceThrottleTimeMax = new MetricNameTemplate("produce-throttle-time-max", METRIC_GROUP_NAME, "The maximum time in ms a request was throttled by a broker", tags);
-
-        /* ***** Topic level *****/
-        Set<String> topicTags = new HashSet<String>(tags);
-        topicTags.add("topic");
-
-        this.topicRecordSendRate = new MetricNameTemplate("record-send-rate", TOPIC_METRIC_GROUP_NAME, "The average number of records sent per second for a topic.", topicTags);
-        this.topicRecordSendTotal = new MetricNameTemplate("record-send-total", TOPIC_METRIC_GROUP_NAME, "The total number of records sent for a topic.", topicTags);
-        this.topicByteRate = new MetricNameTemplate("byte-rate", TOPIC_METRIC_GROUP_NAME, "The average number of bytes sent per second for a topic.", topicTags);
-        this.topicByteTotal = new MetricNameTemplate("byte-total", TOPIC_METRIC_GROUP_NAME, "The total number of bytes sent for a topic.", topicTags);
-        this.topicCompressionRate = new MetricNameTemplate("compression-rate", TOPIC_METRIC_GROUP_NAME, "The average compression rate of record batches for a topic.", topicTags);
-        this.topicRecordRetryRate = new MetricNameTemplate("record-retry-rate", TOPIC_METRIC_GROUP_NAME, "The average per-second number of retried record sends for a topic", topicTags);
-        this.topicRecordRetryTotal = new MetricNameTemplate("record-retry-total", TOPIC_METRIC_GROUP_NAME, "The total number of retried record sends for a topic", topicTags);
-        this.topicRecordErrorRate = new MetricNameTemplate("record-error-rate", TOPIC_METRIC_GROUP_NAME, "The average per-second number of record sends that resulted in errors for a topic", topicTags);
-        this.topicRecordErrorTotal = new MetricNameTemplate("record-error-total", TOPIC_METRIC_GROUP_NAME, "The total number of record sends that resulted in errors for a topic", topicTags);
-
-    }
-
-    public List<MetricNameTemplate> getAllTemplates() {
-        return Arrays.asList(this.batchSizeAvg,
-                this.batchSizeMax,
-                this.compressionRateAvg,
-                this.recordQueueTimeAvg,
-                this.recordQueueTimeMax,
-                this.requestLatencyAvg,
-                this.requestLatencyMax,
-                this.recordSendRate,
-                this.recordSendTotal,
-                this.recordsPerRequestAvg,
-                this.recordRetryRate,
-                this.recordRetryTotal,
-                this.recordErrorRate,
-                this.recordErrorTotal,
-                this.recordSizeMax,
-                this.recordSizeAvg,
-                this.requestsInFlight,
-                this.metadataAge,
-                this.batchSplitRate,
-                this.batchSplitTotal,
-                
-                this.produceThrottleTimeAvg,
-                this.produceThrottleTimeMax,
-
-                // per-topic metrics
-                this.topicRecordSendRate,
-                this.topicRecordSendTotal,
-                this.topicByteRate,
-                this.topicByteTotal,
-                this.topicCompressionRate,
-                this.topicRecordRetryRate,
-                this.topicRecordRetryTotal,
-                this.topicRecordErrorRate,
-                this.topicRecordErrorTotal
-                );
+        /***** Client level *****/
+        
+        this.batchSizeAvg = createMetricName("batch-size-avg",
+                "The average number of bytes sent per partition per-request.");
+        this.batchSizeMax = createMetricName("batch-size-max",
+                "The max number of bytes sent per partition per-request.");
+        this.compressionRateAvg = createMetricName("compression-rate-avg",
+                "The average compression rate of record batches.");
+        this.recordQueueTimeAvg = createMetricName("record-queue-time-avg",
+                "The average time in ms record batches spent in the send buffer.");
+        this.recordQueueTimeMax = createMetricName("record-queue-time-max",
+                "The maximum time in ms record batches spent in the send buffer.");
+        this.requestLatencyAvg = createMetricName("request-latency-avg", 
+                "The average request latency in ms");
+        this.requestLatencyMax = createMetricName("request-latency-max", 
+                "The maximum request latency in ms");
+        this.recordSendRate = createMetricName("record-send-rate", 
+                "The average number of records sent per second.");
+        this.recordSendTotal = createMetricName("record-send-total", 
+                "The total number of records sent.");
+        this.recordsPerRequestAvg = createMetricName("records-per-request-avg",
+                "The average number of records per request.");
+        this.recordRetryRate = createMetricName("record-retry-rate",
+                "The average per-second number of retried record sends");
+        this.recordRetryTotal = createMetricName("record-retry-total", 
+                "The total number of retried record sends");
+        this.recordErrorRate = createMetricName("record-error-rate",
+                "The average per-second number of record sends that resulted in errors");
+        this.recordErrorTotal = createMetricName("record-error-total",
+                "The total number of record sends that resulted in errors");
+        this.recordSizeMax = createMetricName("record-size-max", 
+                "The maximum record size");
+        this.recordSizeAvg = createMetricName("record-size-avg", 
+                "The average record size");
+        this.requestsInFlight = createMetricName("requests-in-flight",
+                "The current number of in-flight requests awaiting a response.");
+        this.metadataAge = createMetricName("metadata-age",
+                "The age in seconds of the current producer metadata being used.");
+        this.batchSplitRate = createMetricName("batch-split-rate", 
+                "The average number of batch splits per second");
+        this.batchSplitTotal = createMetricName("batch-split-total", 
+                "The total number of batch splits");
+
+        this.produceThrottleTimeAvg = createMetricName("produce-throttle-time-avg",
+                "The average time in ms a request was throttled by a broker");
+        this.produceThrottleTimeMax = createMetricName("produce-throttle-time-max",
+                "The maximum time in ms a request was throttled by a broker");
+
+        /***** Topic level *****/
+        this.topicTags = new HashSet<String>(tags);
+        this.topicTags.add("topic");
+
+        // We can't create the MetricName up front for these, because we don't know the topic name yet.
+        this.topicRecordSendRate = createTopicTemplate("record-send-rate",
+                "The average number of records sent per second for a topic.");
+        this.topicRecordSendTotal = createTopicTemplate("record-send-total",
+                "The total number of records sent for a topic.");
+        this.topicByteRate = createTopicTemplate("byte-rate",
+                "The average number of bytes sent per second for a topic.");
+        this.topicByteTotal = createTopicTemplate("byte-total", 
+                "The total number of bytes sent for a topic.");
+        this.topicCompressionRate = createTopicTemplate("compression-rate",
+                "The average compression rate of record batches for a topic.");
+        this.topicRecordRetryRate = createTopicTemplate("record-retry-rate",
+                "The average per-second number of retried record sends for a topic");
+        this.topicRecordRetryTotal = createTopicTemplate("record-retry-total",
+                "The total number of retried record sends for a topic");
+        this.topicRecordErrorRate = createTopicTemplate("record-error-rate",
+                "The average per-second number of record sends that resulted in errors for a topic");
+        this.topicRecordErrorTotal = createTopicTemplate("record-error-total",
+                "The total number of record sends that resulted in errors for a topic");
+
+    }
+
+    private MetricName createMetricName(String name, String description) {
+        return this.metrics.metricInstance(createTemplate(name, METRIC_GROUP_NAME, description, this.tags));
+    }
+
+    private MetricNameTemplate createTopicTemplate(String name, String description) {
+        return createTemplate(name, TOPIC_METRIC_GROUP_NAME, description, this.topicTags);
+    }
+
+    /** topic level metrics **/
+    public MetricName topicRecordSendRate(Map<String, String> tags) {
+        return this.metrics.metricInstance(this.topicRecordSendRate, tags);
+    }
+
+    public MetricName topicRecordSendTotal(Map<String, String> tags) {
+        return this.metrics.metricInstance(this.topicRecordSendTotal, tags);
+    }
+
+    public MetricName topicByteRate(Map<String, String> tags) {
+        return this.metrics.metricInstance(this.topicByteRate, tags);
+    }
+
+    public MetricName topicByteTotal(Map<String, String> tags) {
+        return this.metrics.metricInstance(this.topicByteTotal, tags);
+    }
+
+    public MetricName topicCompressionRate(Map<String, String> tags) {
+        return this.metrics.metricInstance(this.topicCompressionRate, tags);
+    }
+
+    public MetricName topicRecordRetryRate(Map<String, String> tags) {
+        return this.metrics.metricInstance(this.topicRecordRetryRate, tags);
+    }
+
+    public MetricName topicRecordRetryTotal(Map<String, String> tags) {
+        return this.metrics.metricInstance(this.topicRecordRetryTotal, tags);
+    }
+
+    public MetricName topicRecordErrorRate(Map<String, String> tags) {
+        return this.metrics.metricInstance(this.topicRecordErrorRate, tags);
+    }
+
+    public MetricName topicRecordErrorTotal(Map<String, String> tags) {
+        return this.metrics.metricInstance(this.topicRecordErrorTotal, tags);
+    }
+
+    public List<MetricNameTemplate> allTemplates() {
+        return allTemplates;
+    }
+
+    public Sensor sensor(String name) {
+        return this.metrics.sensor(name);
+    }
+
+    public void addMetric(MetricName m, Measurable measurable) {
+        this.metrics.addMetric(m, measurable);
+    }
+
+    public Sensor getSensor(String name) {
+        return this.metrics.getSensor(name);
+    }
+
+    private MetricNameTemplate createTemplate(String name, String group, String description, Set<String> tags) {
+        MetricNameTemplate template = new MetricNameTemplate(name, group, description, tags);
+        this.allTemplates.add(template);
+        return template;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e0e2a5b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index a45d9ac..afccee0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -242,7 +242,7 @@ public class SenderTest {
     @Test
     public void testQuotaMetrics() throws Exception {
         MockSelector selector = new MockSelector(time);
-        Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics, this.senderMetricsRegistry);
+        Sensor throttleTimeSensor = Sender.throttleTimeSensor(this.senderMetricsRegistry);
         Cluster cluster = TestUtils.singletonCluster("test", 1);
         Node node = cluster.nodes().get(0);
         NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
@@ -270,8 +270,8 @@ public class SenderTest {
             selector.clear();
         }
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
-        KafkaMetric avgMetric = allMetrics.get(metrics.metricInstance(this.senderMetricsRegistry.produceThrottleTimeAvg));
-        KafkaMetric maxMetric = allMetrics.get(metrics.metricInstance(this.senderMetricsRegistry.produceThrottleTimeMax));
+        KafkaMetric avgMetric = allMetrics.get(this.senderMetricsRegistry.produceThrottleTimeAvg);
+        KafkaMetric maxMetric = allMetrics.get(this.senderMetricsRegistry.produceThrottleTimeMax);
         // Throttle times are ApiVersions=400, Produce=(100, 200, 300)
         assertEquals(250, avgMetric.value(), EPS);
         assertEquals(400, maxMetric.value(), EPS);
@@ -283,9 +283,9 @@ public class SenderTest {
         metrics.close();
         Map<String, String> clientTags = Collections.singletonMap("client-id", "clientA");
         metrics = new Metrics(new MetricConfig().tags(clientTags));
-        SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry(clientTags.keySet());
+        SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry(metrics);
         Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
-                1, metrics, metricsRegistry, time, REQUEST_TIMEOUT, 50, null, apiVersions);
+                1, metricsRegistry, time, REQUEST_TIMEOUT, 50, null, apiVersions);
 
         // Append a message so that topic metrics are created
         accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
@@ -294,7 +294,7 @@ public class SenderTest {
         client.respond(produceResponse(tp0, 0, Errors.NONE, 0));
         sender.run(time.milliseconds());
         // Create throttle time metrics
-        Sender.throttleTimeSensor(metrics, metricsRegistry);
+        Sender.throttleTimeSensor(metricsRegistry);
 
         // Verify that all metrics except metrics-count have registered templates
         Set<MetricNameTemplate> allMetrics = new HashSet<>();
@@ -302,7 +302,7 @@ public class SenderTest {
             if (!n.group().equals("kafka-metrics-count"))
                 allMetrics.add(new MetricNameTemplate(n.name(), n.group(), "", n.tags().keySet()));
         }
-        TestUtils.checkEquals(allMetrics, new HashSet<>(metricsRegistry.getAllTemplates()), "metrics", "templates");
+        TestUtils.checkEquals(allMetrics, new HashSet<>(metricsRegistry.allTemplates()), "metrics", "templates");
     }
 
     @Test
@@ -310,9 +310,10 @@ public class SenderTest {
         // create a sender with retries = 1
         int maxRetries = 1;
         Metrics m = new Metrics();
+        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
         try {
             Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
-                    maxRetries, m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, null, apiVersions);
+                    maxRetries, senderMetrics, time, REQUEST_TIMEOUT, 50, null, apiVersions);
             // do a successful retry
             Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
             sender.run(time.milliseconds()); // connect
@@ -357,9 +358,11 @@ public class SenderTest {
     public void testSendInOrder() throws Exception {
         int maxRetries = 1;
         Metrics m = new Metrics();
+        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
         try {
             Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
-                    m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, null, apiVersions);
+                    senderMetrics, time, REQUEST_TIMEOUT, 50, null, apiVersions);
             // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
             Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
             metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds());
@@ -1581,8 +1584,10 @@ public class SenderTest {
 
         int maxRetries = 10;
         Metrics m = new Metrics();
+        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+        
         Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
-                m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+                senderMetrics, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -1623,9 +1628,9 @@ public class SenderTest {
 
         int maxRetries = 10;
         Metrics m = new Metrics();
-        SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry();
+        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
         Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
-                m, metricsRegistry, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+                senderMetrics, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());  // connect.
@@ -1645,7 +1650,7 @@ public class SenderTest {
         sender.run(time.milliseconds()); // nothing to do, since the pid has changed. We should check the metrics for errors.
         assertEquals("Expected requests to be aborted after pid change", 0, client.inFlightRequestCount());
 
-        KafkaMetric recordErrors = m.metrics().get(m.metricInstance(metricsRegistry.recordErrorRate));
+        KafkaMetric recordErrors = m.metrics().get(senderMetrics.recordErrorRate);
         assertTrue("Expected non-zero value for record send errors", recordErrors.value() > 0);
 
         assertTrue(responseFuture.isDone());
@@ -1662,8 +1667,10 @@ public class SenderTest {
 
         int maxRetries = 10;
         Metrics m = new Metrics();
+        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+        
         Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
-                m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+                senderMetrics, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());  // connect.
@@ -1714,9 +1721,9 @@ public class SenderTest {
         try (Metrics m = new Metrics()) {
             accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time,
                     new ApiVersions(), txnManager);
-            SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry();
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
             Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
-                    m, metricsRegistry, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions());
+                    senderMetrics, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions());
             // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
             Cluster cluster1 = TestUtils.clusterWith(2, topic, 2);
             metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds());
@@ -1782,7 +1789,7 @@ public class SenderTest {
             assertTrue("There should be no batch in the accumulator", accumulator.batches().get(tp).isEmpty());
 
             assertTrue("There should be a split",
-                    m.metrics().get(m.metricInstance(metricsRegistry.batchSplitRate)).value() > 0);
+                    m.metrics().get(senderMetrics.batchSplitRate).value() > 0);
         }
     }
 
@@ -1843,11 +1850,10 @@ public class SenderTest {
         this.metrics = new Metrics(metricConfig, time);
         this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time,
                 apiVersions, transactionManager);
-
-        this.senderMetricsRegistry = new SenderMetricsRegistry(metricTags.keySet());
+        this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
 
         this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
-                Integer.MAX_VALUE, this.metrics, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+                Integer.MAX_VALUE, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
         this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e0e2a5b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 8fc43df..fab139a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -123,9 +123,11 @@ public class TransactionManagerTest {
         this.transactionManager = new TransactionManager(logContext, transactionalId, transactionTimeoutMs,
                 DEFAULT_RETRY_BACKOFF_MS);
         Metrics metrics = new Metrics(metricConfig, time);
+        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(metrics);
+
         this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, transactionManager);
         this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
-                MAX_RETRIES, metrics, new SenderMetricsRegistry(metricTags.keySet()), this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+                MAX_RETRIES, senderMetrics, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
         this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds());
         client.setNode(brokerNode);
     }