You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/10/19 14:43:30 UTC
kafka git commit: KAFKA-6069: Properly tag KafkaStreams metrics with
the client id.
Repository: kafka
Updated Branches:
refs/heads/trunk 7fdafda97 -> 249e398bf
KAFKA-6069: Properly tag KafkaStreams metrics with the client id.
Author: Tommy Becker <to...@tivo.com>
Reviewers: Bill Bejeck <bi...@confluent.io>, Damian Guy <da...@gmail.com>
Closes #4081 from twbecker/KAFKA-6069
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/249e398b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/249e398b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/249e398b
Branch: refs/heads/trunk
Commit: 249e398bf84cdd475af6529e163e78486b43c570
Parents: 7fdafda
Author: Tommy Becker <to...@tivo.com>
Authored: Thu Oct 19 15:40:26 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Thu Oct 19 15:40:26 2017 +0100
----------------------------------------------------------------------
.../processor/internals/StreamsKafkaClient.java | 4 +-
.../internals/StreamsKafkaClientTest.java | 44 ++++++++++++++++++++
2 files changed, 46 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/249e398b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index d725ed8..1e99ad2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -111,7 +111,8 @@ public class StreamsKafkaClient {
final Time time = new SystemTime();
final Map<String, String> metricTags = new LinkedHashMap<>();
- metricTags.put("client-id", StreamsConfig.CLIENT_ID_CONFIG);
+ final String clientId = streamsConfig.getString(StreamsConfig.CLIENT_ID_CONFIG);
+ metricTags.put("client-id", clientId);
final Metadata metadata = new Metadata(streamsConfig.getLong(
StreamsConfig.RETRY_BACKOFF_MS_CONFIG),
@@ -129,7 +130,6 @@ public class StreamsKafkaClient {
final Metrics metrics = new Metrics(metricConfig, reporters, time);
final ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(streamsConfig);
- final String clientId = streamsConfig.getString(StreamsConfig.CLIENT_ID_CONFIG);
final LogContext logContext = createLogContext(clientId);
final Selector selector = new Selector(
http://git-wip-us.apache.org/repos/asf/kafka/blob/249e398b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
index 7a75b81..0bb7682 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
@@ -17,11 +17,13 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
@@ -46,6 +48,7 @@ import java.util.Map;
import static java.util.Arrays.asList;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
public class StreamsKafkaClientTest {
@@ -130,6 +133,17 @@ public class StreamsKafkaClientTest {
verifyCorrectTopicConfigs(streamsKafkaClient, topicConfigWithNoOverrides, Collections.singletonMap("cleanup.policy", "delete"));
}
+ @Test
+ public void metricsShouldBeTaggedWithClientId() {
+ config.put(StreamsConfig.CLIENT_ID_CONFIG, "some_client_id");
+ config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, TestMetricsReporter.class.getName());
+ StreamsKafkaClient.create(new StreamsConfig(config));
+ assertFalse(TestMetricsReporter.METRICS.isEmpty());
+ for (KafkaMetric kafkaMetric : TestMetricsReporter.METRICS.values()) {
+ assertEquals("some_client_id", kafkaMetric.metricName().tags().get("client-id"));
+ }
+ }
+
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionOnEmptyBrokerCompatibilityResponse() {
kafkaClient.prepareResponse(null);
@@ -203,4 +217,34 @@ public class StreamsKafkaClientTest {
kafkaClient,
reporters);
}
+
+
+ public static class TestMetricsReporter implements MetricsReporter {
+ static final Map<MetricName, KafkaMetric> METRICS = new HashMap<>();
+
+ @Override
+ public void configure(final Map<String, ?> configs) { }
+
+ @Override
+ public void init(final List<KafkaMetric> metrics) {
+ for (final KafkaMetric metric : metrics) {
+ metricChange(metric);
+ }
+ }
+
+ @Override
+ public void metricChange(final KafkaMetric metric) {
+ METRICS.put(metric.metricName(), metric);
+ }
+
+ @Override
+ public void metricRemoval(final KafkaMetric metric) {
+ METRICS.remove(metric.metricName());
+ }
+
+ @Override
+ public void close() {
+ METRICS.clear();
+ }
+ }
}