You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/10/19 14:43:30 UTC

kafka git commit: KAFKA-6069: Properly tag KafkaStreams metrics with the client id.

Repository: kafka
Updated Branches:
  refs/heads/trunk 7fdafda97 -> 249e398bf


KAFKA-6069: Properly tag KafkaStreams metrics with the client id.

Author: Tommy Becker <to...@tivo.com>

Reviewers: Bill Bejeck <bi...@confluent.io>, Damian Guy <da...@gmail.com>

Closes #4081 from twbecker/KAFKA-6069


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/249e398b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/249e398b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/249e398b

Branch: refs/heads/trunk
Commit: 249e398bf84cdd475af6529e163e78486b43c570
Parents: 7fdafda
Author: Tommy Becker <to...@tivo.com>
Authored: Thu Oct 19 15:40:26 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Thu Oct 19 15:40:26 2017 +0100

----------------------------------------------------------------------
 .../processor/internals/StreamsKafkaClient.java |  4 +-
 .../internals/StreamsKafkaClientTest.java       | 44 ++++++++++++++++++++
 2 files changed, 46 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/249e398b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index d725ed8..1e99ad2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -111,7 +111,8 @@ public class StreamsKafkaClient {
         final Time time = new SystemTime();
 
         final Map<String, String> metricTags = new LinkedHashMap<>();
-        metricTags.put("client-id", StreamsConfig.CLIENT_ID_CONFIG);
+        final String clientId = streamsConfig.getString(StreamsConfig.CLIENT_ID_CONFIG);
+        metricTags.put("client-id", clientId);
 
         final Metadata metadata = new Metadata(streamsConfig.getLong(
                 StreamsConfig.RETRY_BACKOFF_MS_CONFIG),
@@ -129,7 +130,6 @@ public class StreamsKafkaClient {
         final Metrics metrics = new Metrics(metricConfig, reporters, time);
 
         final ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(streamsConfig);
-        final String clientId = streamsConfig.getString(StreamsConfig.CLIENT_ID_CONFIG);
         final LogContext logContext = createLogContext(clientId);
 
         final Selector selector = new Selector(

http://git-wip-us.apache.org/repos/asf/kafka/blob/249e398b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
index 7a75b81..0bb7682 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
@@ -17,11 +17,13 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
@@ -46,6 +48,7 @@ import java.util.Map;
 import static java.util.Arrays.asList;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 
 public class StreamsKafkaClientTest {
@@ -130,6 +133,17 @@ public class StreamsKafkaClientTest {
         verifyCorrectTopicConfigs(streamsKafkaClient, topicConfigWithNoOverrides, Collections.singletonMap("cleanup.policy", "delete"));
     }
 
+    @Test
+    public void metricsShouldBeTaggedWithClientId() {
+        config.put(StreamsConfig.CLIENT_ID_CONFIG, "some_client_id");
+        config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, TestMetricsReporter.class.getName());
+        StreamsKafkaClient.create(new StreamsConfig(config));
+        assertFalse(TestMetricsReporter.METRICS.isEmpty());
+        for (KafkaMetric kafkaMetric : TestMetricsReporter.METRICS.values()) {
+            assertEquals("some_client_id", kafkaMetric.metricName().tags().get("client-id"));
+        }
+    }
+
     @Test(expected = StreamsException.class)
     public void shouldThrowStreamsExceptionOnEmptyBrokerCompatibilityResponse() {
         kafkaClient.prepareResponse(null);
@@ -203,4 +217,34 @@ public class StreamsKafkaClientTest {
                                                                              kafkaClient,
                                                                              reporters);
     }
+
+
+    public static class TestMetricsReporter implements MetricsReporter {
+        static final Map<MetricName, KafkaMetric> METRICS = new HashMap<>();
+
+        @Override
+        public void configure(final Map<String, ?> configs) { }
+
+        @Override
+        public void init(final List<KafkaMetric> metrics) {
+            for (final KafkaMetric metric : metrics) {
+                metricChange(metric);
+            }
+        }
+
+        @Override
+        public void metricChange(final KafkaMetric metric) {
+            METRICS.put(metric.metricName(), metric);
+        }
+
+        @Override
+        public void metricRemoval(final KafkaMetric metric) {
+            METRICS.remove(metric.metricName());
+        }
+
+        @Override
+        public void close() {
+            METRICS.clear();
+        }
+    }
 }