You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/03 16:57:00 UTC

[jira] [Commented] (KAFKA-6123) Give client MetricsReporter auto-generated client.id

    [ https://issues.apache.org/jira/browse/KAFKA-6123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16637198#comment-16637198 ] 

ASF GitHub Bot commented on KAFKA-6123:
---------------------------------------

cmccabe closed pull request #5383: KAFKA-6123: Give client MetricsReporter auto-generated client.id
URL: https://github.com/apache/kafka/pull/5383
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 7abe7efd15b..ceebc58301c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -337,7 +337,8 @@ static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcesso
                 config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
                 config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
             List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
-                MetricsReporter.class);
+                MetricsReporter.class,
+                Collections.singletonMap(AdminClientConfig.CLIENT_ID_CONFIG, clientId));
             Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
             MetricConfig metricConfig = new MetricConfig().samples(config.getInt(AdminClientConfig.METRICS_NUM_SAMPLES_CONFIG))
                 .timeWindow(config.getLong(AdminClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 4cdc4f862ec..04b8ec21591 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -677,7 +677,8 @@ private KafkaConsumer(ConsumerConfig config,
                     .recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
                     .tags(metricsTags);
             List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
-                    MetricsReporter.class);
+                    MetricsReporter.class,
+                    Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
             reporters.add(new JmxReporter(JMX_PREFIX));
             this.metrics = new Metrics(metricConfig, reporters, time);
             this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
@@ -2211,4 +2212,8 @@ private void throwIfNoAssignorsConfigured() {
                 ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property");
     }
 
+    // Visible for testing
+    String getClientId() {
+        return clientId;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index b3f76eee49f..e249c12d5cc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -352,7 +352,8 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
                     .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
                     .tags(metricTags);
             List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
-                    MetricsReporter.class);
+                    MetricsReporter.class,
+                    Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
             reporters.add(new JmxReporter(JMX_PREFIX));
             this.metrics = new Metrics(metricConfig, reporters, time);
             ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
@@ -1202,6 +1203,11 @@ private void throwIfNoTransactionManager() {
                     "by setting the " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " configuration property");
     }
 
+    // Visible for testing
+    String getClientId() {
+        return clientId;
+    }
+
     private static class ClusterAndWaitTime {
         final Cluster cluster;
         final long waitedOnMetadataMs;
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 8b8cea6cce1..2a3cbe0b72d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -137,6 +137,20 @@
     @Rule
     public ExpectedException expectedException = ExpectedException.none();
 
+    @Test
+    public void testMetricsReporterAutoGeneratedClientId() {
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
+                props, new StringDeserializer(), new StringDeserializer());
+
+        MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) consumer.metrics.reporters().get(0);
+
+        Assert.assertEquals(consumer.getClientId(), mockMetricsReporter.clientId);
+        consumer.close();
+    }
+
     @Test
     public void testConstructorClose() {
         Properties props = new Properties();
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 676aafdc57d..dc6fe9f52fc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -76,6 +76,20 @@
 @PowerMockIgnore("javax.management.*")
 public class KafkaProducerTest {
 
+    @Test
+    public void testMetricsReporterAutoGeneratedClientId() {
+        Properties props = new Properties();
+        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+        KafkaProducer<String, String> producer = new KafkaProducer<>(
+                props, new StringSerializer(), new StringSerializer());
+
+        MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) producer.metrics.reporters().get(0);
+
+        Assert.assertEquals(producer.getClientId(), mockMetricsReporter.clientId);
+        producer.close();
+    }
+
     @Test
     public void testConstructorWithSerializers() {
         Properties producerProps = new Properties();
diff --git a/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java b/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java
index b5f3855034f..40521f583dd 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.test;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.MetricsReporter;
 
@@ -26,6 +27,7 @@
 public class MockMetricsReporter implements MetricsReporter {
     public static final AtomicInteger INIT_COUNT = new AtomicInteger(0);
     public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0);
+    public String clientId;
 
     public MockMetricsReporter() {
     }
@@ -48,5 +50,6 @@ public void close() {
 
     @Override
     public void configure(Map<String, ?> configs) {
+        clientId = (String) configs.get(CommonClientConfigs.CLIENT_ID_CONFIG);
     }
 }
\ No newline at end of file
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 525ce7e2de9..aeed060d83e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -88,7 +88,9 @@ public WorkerGroupMember(DistributedConfig config,
             MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
                     .timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                     .tags(metricsTags);
-            List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
+            List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
+                    MetricsReporter.class,
+                    Collections.singletonMap(CommonClientConfigs.CLIENT_ID_CONFIG, clientId));
             reporters.add(new JmxReporter(JMX_PREFIX));
             this.metrics = new Metrics(metricConfig, reporters, time);
             this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 82323d9081d..da1a1c4f4f2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -659,7 +659,8 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
             .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
             .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS);
         final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
-                MetricsReporter.class);
+                MetricsReporter.class,
+                Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId));
         reporters.add(new JmxReporter(JMX_PREFIX));
         metrics = new Metrics(metricConfig, reporters, time);
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Give client MetricsReporter auto-generated client.id
> ----------------------------------------------------
>
>                 Key: KAFKA-6123
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6123
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, metrics
>            Reporter: Kevin Lu
>            Assignee: Kevin Lu
>            Priority: Minor
>              Labels: clients, metrics, newbie++
>
> KAFKA-4756 bugfix resolved the broker's KafkaMetricsReporter missing auto generated broker ids, but this was not fixed on the client side.
>  
> Metric reporters configured for clients should also be given the auto-generated client id in the `configure` method.
> The interceptors already receive the auto-generated client id.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)