You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2018/10/03 16:56:29 UTC
[kafka] branch trunk updated: KAFKA-6123: Give client
MetricsReporter auto-generated client.id (#5383)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a5335e7 KAFKA-6123: Give client MetricsReporter auto-generated client.id (#5383)
a5335e7 is described below
commit a5335e7cbd1e53790b4bb5e95b6f9027e17fbf2b
Author: Kevin Lu <lu...@berkeley.edu>
AuthorDate: Wed Oct 3 09:56:22 2018 -0700
KAFKA-6123: Give client MetricsReporter auto-generated client.id (#5383)
---
.../org/apache/kafka/clients/admin/KafkaAdminClient.java | 3 ++-
.../org/apache/kafka/clients/consumer/KafkaConsumer.java | 7 ++++++-
.../org/apache/kafka/clients/producer/KafkaProducer.java | 8 +++++++-
.../apache/kafka/clients/consumer/KafkaConsumerTest.java | 14 ++++++++++++++
.../apache/kafka/clients/producer/KafkaProducerTest.java | 14 ++++++++++++++
.../java/org/apache/kafka/test/MockMetricsReporter.java | 3 +++
.../connect/runtime/distributed/WorkerGroupMember.java | 4 +++-
.../main/java/org/apache/kafka/streams/KafkaStreams.java | 3 ++-
8 files changed, 51 insertions(+), 5 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 7abe7ef..ceebc58 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -337,7 +337,8 @@ public class KafkaAdminClient extends AdminClient {
config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
- MetricsReporter.class);
+ MetricsReporter.class,
+ Collections.singletonMap(AdminClientConfig.CLIENT_ID_CONFIG, clientId));
Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(AdminClientConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(AdminClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 4cdc4f8..04b8ec2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -677,7 +677,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricsTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
- MetricsReporter.class);
+ MetricsReporter.class,
+ Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time);
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
@@ -2211,4 +2212,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property");
}
+ // Visible for testing
+ String getClientId() {
+ return clientId;
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index b3f76ee..e249c12 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -352,7 +352,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
- MetricsReporter.class);
+ MetricsReporter.class,
+ Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time);
ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
@@ -1202,6 +1203,11 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
"by setting the " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " configuration property");
}
+ // Visible for testing
+ String getClientId() {
+ return clientId;
+ }
+
private static class ClusterAndWaitTime {
final Cluster cluster;
final long waitedOnMetadataMs;
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 8b8cea6..2a3cbe0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -138,6 +138,20 @@ public class KafkaConsumerTest {
public ExpectedException expectedException = ExpectedException.none();
@Test
+ public void testMetricsReporterAutoGeneratedClientId() {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
+ props, new StringDeserializer(), new StringDeserializer());
+
+ MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) consumer.metrics.reporters().get(0);
+
+ Assert.assertEquals(consumer.getClientId(), mockMetricsReporter.clientId);
+ consumer.close();
+ }
+
+ @Test
public void testConstructorClose() {
Properties props = new Properties();
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testConstructorClose");
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 676aafd..dc6fe9f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -77,6 +77,20 @@ import static org.junit.Assert.fail;
public class KafkaProducerTest {
@Test
+ public void testMetricsReporterAutoGeneratedClientId() {
+ Properties props = new Properties();
+ props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+ KafkaProducer<String, String> producer = new KafkaProducer<>(
+ props, new StringSerializer(), new StringSerializer());
+
+ MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) producer.metrics.reporters().get(0);
+
+ Assert.assertEquals(producer.getClientId(), mockMetricsReporter.clientId);
+ producer.close();
+ }
+
+ @Test
public void testConstructorWithSerializers() {
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
diff --git a/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java b/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java
index b5f3855..40521f5 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.test;
+import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
@@ -26,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
public class MockMetricsReporter implements MetricsReporter {
public static final AtomicInteger INIT_COUNT = new AtomicInteger(0);
public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0);
+ public String clientId;
public MockMetricsReporter() {
}
@@ -48,5 +50,6 @@ public class MockMetricsReporter implements MetricsReporter {
@Override
public void configure(Map<String, ?> configs) {
+ clientId = (String) configs.get(CommonClientConfigs.CLIENT_ID_CONFIG);
}
}
\ No newline at end of file
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 525ce7e..aeed060 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -88,7 +88,9 @@ public class WorkerGroupMember {
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.tags(metricsTags);
- List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
+ List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
+ MetricsReporter.class,
+ Collections.singletonMap(CommonClientConfigs.CLIENT_ID_CONFIG, clientId));
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time);
this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 82323d9..da1a1c4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -659,7 +659,8 @@ public class KafkaStreams {
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS);
final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
- MetricsReporter.class);
+ MetricsReporter.class,
+ Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId));
reporters.add(new JmxReporter(JMX_PREFIX));
metrics = new Metrics(metricConfig, reporters, time);