You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2018/10/03 16:56:29 UTC

[kafka] branch trunk updated: KAFKA-6123: Give client MetricsReporter auto-generated client.id (#5383)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a5335e7  KAFKA-6123: Give client MetricsReporter auto-generated client.id (#5383)
a5335e7 is described below

commit a5335e7cbd1e53790b4bb5e95b6f9027e17fbf2b
Author: Kevin Lu <lu...@berkeley.edu>
AuthorDate: Wed Oct 3 09:56:22 2018 -0700

    KAFKA-6123: Give client MetricsReporter auto-generated client.id (#5383)
---
 .../org/apache/kafka/clients/admin/KafkaAdminClient.java   |  3 ++-
 .../org/apache/kafka/clients/consumer/KafkaConsumer.java   |  7 ++++++-
 .../org/apache/kafka/clients/producer/KafkaProducer.java   |  8 +++++++-
 .../apache/kafka/clients/consumer/KafkaConsumerTest.java   | 14 ++++++++++++++
 .../apache/kafka/clients/producer/KafkaProducerTest.java   | 14 ++++++++++++++
 .../java/org/apache/kafka/test/MockMetricsReporter.java    |  3 +++
 .../connect/runtime/distributed/WorkerGroupMember.java     |  4 +++-
 .../main/java/org/apache/kafka/streams/KafkaStreams.java   |  3 ++-
 8 files changed, 51 insertions(+), 5 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 7abe7ef..ceebc58 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -337,7 +337,8 @@ public class KafkaAdminClient extends AdminClient {
                 config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
                 config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
             List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
-                MetricsReporter.class);
+                MetricsReporter.class,
+                Collections.singletonMap(AdminClientConfig.CLIENT_ID_CONFIG, clientId));
             Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
             MetricConfig metricConfig = new MetricConfig().samples(config.getInt(AdminClientConfig.METRICS_NUM_SAMPLES_CONFIG))
                 .timeWindow(config.getLong(AdminClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 4cdc4f8..04b8ec2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -677,7 +677,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     .recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
                     .tags(metricsTags);
             List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
-                    MetricsReporter.class);
+                    MetricsReporter.class,
+                    Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
             reporters.add(new JmxReporter(JMX_PREFIX));
             this.metrics = new Metrics(metricConfig, reporters, time);
             this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
@@ -2211,4 +2212,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                 ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property");
     }
 
+    // Visible for testing
+    String getClientId() {
+        return clientId;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index b3f76ee..e249c12 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -352,7 +352,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
                     .tags(metricTags);
             List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
-                    MetricsReporter.class);
+                    MetricsReporter.class,
+                    Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
             reporters.add(new JmxReporter(JMX_PREFIX));
             this.metrics = new Metrics(metricConfig, reporters, time);
             ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
@@ -1202,6 +1203,11 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     "by setting the " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " configuration property");
     }
 
+    // Visible for testing
+    String getClientId() {
+        return clientId;
+    }
+
     private static class ClusterAndWaitTime {
         final Cluster cluster;
         final long waitedOnMetadataMs;
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 8b8cea6..2a3cbe0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -138,6 +138,20 @@ public class KafkaConsumerTest {
     public ExpectedException expectedException = ExpectedException.none();
 
     @Test
+    public void testMetricsReporterAutoGeneratedClientId() {
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
+                props, new StringDeserializer(), new StringDeserializer());
+
+        MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) consumer.metrics.reporters().get(0);
+
+        Assert.assertEquals(consumer.getClientId(), mockMetricsReporter.clientId);
+        consumer.close();
+    }
+
+    @Test
     public void testConstructorClose() {
         Properties props = new Properties();
         props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testConstructorClose");
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 676aafd..dc6fe9f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -77,6 +77,20 @@ import static org.junit.Assert.fail;
 public class KafkaProducerTest {
 
     @Test
+    public void testMetricsReporterAutoGeneratedClientId() {
+        Properties props = new Properties();
+        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+        KafkaProducer<String, String> producer = new KafkaProducer<>(
+                props, new StringSerializer(), new StringSerializer());
+
+        MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) producer.metrics.reporters().get(0);
+
+        Assert.assertEquals(producer.getClientId(), mockMetricsReporter.clientId);
+        producer.close();
+    }
+
+    @Test
     public void testConstructorWithSerializers() {
         Properties producerProps = new Properties();
         producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
diff --git a/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java b/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java
index b5f3855..40521f5 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.test;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.MetricsReporter;
 
@@ -26,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class MockMetricsReporter implements MetricsReporter {
     public static final AtomicInteger INIT_COUNT = new AtomicInteger(0);
     public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0);
+    public String clientId;
 
     public MockMetricsReporter() {
     }
@@ -48,5 +50,6 @@ public class MockMetricsReporter implements MetricsReporter {
 
     @Override
     public void configure(Map<String, ?> configs) {
+        clientId = (String) configs.get(CommonClientConfigs.CLIENT_ID_CONFIG);
     }
 }
\ No newline at end of file
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 525ce7e..aeed060 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -88,7 +88,9 @@ public class WorkerGroupMember {
             MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
                     .timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                     .tags(metricsTags);
-            List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
+            List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
+                    MetricsReporter.class,
+                    Collections.singletonMap(CommonClientConfigs.CLIENT_ID_CONFIG, clientId));
             reporters.add(new JmxReporter(JMX_PREFIX));
             this.metrics = new Metrics(metricConfig, reporters, time);
             this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 82323d9..da1a1c4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -659,7 +659,8 @@ public class KafkaStreams {
             .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
             .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS);
         final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
-                MetricsReporter.class);
+                MetricsReporter.class,
+                Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId));
         reporters.add(new JmxReporter(JMX_PREFIX));
         metrics = new Metrics(metricConfig, reporters, time);