You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by wc...@apache.org on 2023/10/31 20:35:10 UTC
(kafka) branch trunk updated: KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) (#14619)
This is an automated email from the ASF dual-hosted git repository.
wcarlson pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ed3fa83d385 KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) (#14619)
ed3fa83d385 is described below
commit ed3fa83d385bbe2fa6dc4943660f56fec708bbc8
Author: Apoorv Mittal <am...@confluent.io>
AuthorDate: Tue Oct 31 20:35:02 2023 +0000
KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) (#14619)
The PR defines the naming convention for telemetry metric names for KIP-714 - jira. Telemetry metric name should be dot separated and tags should be snake case.
PR adds the interface which will be used in MetricsReporter implementation to construct metric names.
Reviewers: Xavier Léauté <xv...@apache.org>, Walker Carlson <wc...@apache.org>, Matthias J. Sax <mj...@apache.org>, Andrew Schofield <an...@uk.ibm.com>
---
.../telemetry/internals/MetricNamingStrategy.java | 85 ++++++
.../internals/TelemetryMetricNamingConvention.java | 124 +++++++++
.../TelemetryMetricNamingConventionTest.java | 301 +++++++++++++++++++++
3 files changed, 510 insertions(+)
diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingStrategy.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingStrategy.java
new file mode 100644
index 00000000000..0b5a98dd0d7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingStrategy.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.MetricName;
+
+/**
+ * {@code MetricNamingStrategy} provides a strategy pattern-based means of converting from an
+ * implementation-specific metric name (e.g. Kafka {@link MetricName}) representing a
+ * particular metric name and associated tags into a canonical {@link MetricKey}
+ * (name and tags) representation.
+ *
+ * <p>
+ *
+ * Each strategy may define its own conventions for how the resulting metric should be named,
+ * including things such conforming name and tags to use specific casing and separators for
+ * different parts of the metric name.
+ *
+ * <p>
+ *
+ * In general, a {@code MetricNamingStrategy} implementation is closely managed by another entity,
+ * referred to as the "telemetry reporter", as that reporter handles the conversion between different
+ * representations of metric names and keys.
+ *
+ * <p>
+ *
+ * This class is primarily used by the telemetry reporter, {@link MetricsCollector}, and
+ * {@link MetricsEmitter} layers.
+ */
+public interface MetricNamingStrategy<T> {
+
+ /**
+ * Converts the given metric name into a {@link MetricKey} representation.
+ *
+ * @param metricName Implementation-specific metric
+ * @return {@link MetricKey}
+ */
+ MetricKey metricKey(T metricName);
+
+ /**
+ * Creates a derived {@link MetricKey} from an existing {@link MetricKey}.
+ *
+ * <p>
+ *
+ * Some metrics may include multiple components derived from the same underlying source
+ * of data (e.g. a Meter that exposes multiple rates and a counter) in which case it may
+ * be desirable to create a new metric key derived from the primary one, with a different
+ * name for each component of the metric.
+ *
+ * <p>
+ *
+ * Some metrics may be derived from others by the collector itself. For example, a delta
+ * metric might be created from a cumulative counter.
+ *
+ * <p>
+ *
+ * This method exists so each strategy can define its own convention for how to name
+ * derived metrics keys.
+ *
+ * <p>
+ *
+ * The derived key should have the same tags as the input key, and its name new name
+ * will typically be composed of the input key name and the component name.
+ *
+ * @param key Input {@link MetricKey} used to construct the derived key
+ * @param derivedComponent Name to use for the derived component of the input metric
+ * @return Derived {@link MetricKey} with a new metric name composed of the input key
+ * name and the additional name
+ */
+ MetricKey derivedMetricKey(MetricKey key, String derivedComponent);
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/TelemetryMetricNamingConvention.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/TelemetryMetricNamingConvention.java
new file mode 100644
index 00000000000..7d41b1519a0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/TelemetryMetricNamingConvention.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.MetricName;
+
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates naming and mapping conventions defined as part of
+ * <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat">Metrics naming and format</a>
+ */
+public class TelemetryMetricNamingConvention {
+
+ private static final String NAME_JOINER = ".";
+ private static final String TAG_JOINER = "_";
+
+ // remove metrics as it is redundant for telemetry metrics naming convention
+ private final static Pattern GROUP_PATTERN = Pattern.compile("\\.(metrics)");
+
+ public static MetricNamingStrategy<MetricName> getClientTelemetryMetricNamingStrategy(String prefix) {
+ Objects.requireNonNull(prefix, "prefix cannot be null");
+
+ return new MetricNamingStrategy<MetricName>() {
+ @Override
+ public MetricKey metricKey(MetricName metricName) {
+ Objects.requireNonNull(metricName, "metric name cannot be null");
+
+ return new MetricKey(fullMetricName(prefix, metricName.group(), metricName.name()),
+ Collections.unmodifiableMap(cleanTags(metricName.tags())));
+ }
+
+ @Override
+ public MetricKey derivedMetricKey(MetricKey key, String derivedComponent) {
+ Objects.requireNonNull(derivedComponent, "derived component cannot be null");
+ return new MetricKey(key.getName() + NAME_JOINER + derivedComponent, key.tags());
+ }
+ };
+ }
+
+ /**
+ * Creates a metric name from the given prefix, group, and name. The new String follows the following
+ * conventions and rules:
+ *
+ * <ul>
+ * <li>prefix is expected to be a host-name like value, e.g. {@code org.apache.kafka}</li>
+ * <li>group is cleaned of redundant words: "-metrics"</li>
+ * <li>the group and metric name is dot separated</li>
+ * <li>The name is created by joining the three components, e.g.:
+ * {@code org.apache.kafka.producer.connection.creation.rate}</li>
+ * </ul>
+ */
+ private static String fullMetricName(String prefix, String group, String name) {
+ return prefix
+ + NAME_JOINER
+ + cleanGroup(group)
+ + NAME_JOINER
+ + cleanMetric(name);
+ }
+
+ /**
+ * This method maps a group name to follow conventions and cleans up the result to be more legible:
+ * <ul>
+ * <li> converts names to lower case conventions
+ * <li> normalizes artifacts of hyphen case in group name to dot separated conversion
+ * <li> strips redundant parts of the metric name, such as -metrics
+ * </ul>
+ */
+ private static String cleanGroup(String group) {
+ group = clean(group, NAME_JOINER);
+ return GROUP_PATTERN.matcher(group).replaceAll("");
+ }
+
+ /**
+ * This method maps a metric name to follow conventions and cleans up the result to be more legible:
+ * <ul>
+ * <li> converts names to lower case conventions
+ * <li> normalizes artifacts of hyphen case in metric name to dot separated conversion
+ * </ul>
+ */
+ private static String cleanMetric(String metric) {
+ return clean(metric, NAME_JOINER);
+ }
+
+ /**
+ * Converts a tag name to match the telemetry naming conventions by converting into snake_case.
+ * <p>
+ * Kafka metrics have tags name in lower case separated by hyphens. Eg: total-errors
+ *
+ * @param raw the input map
+ * @return the new map with keys replaced by snake_case representations.
+ */
+ private static Map<String, String> cleanTags(Map<String, String> raw) {
+ return raw.entrySet()
+ .stream()
+ .collect(Collectors.toMap(s -> clean(s.getKey(), TAG_JOINER), Entry::getValue));
+ }
+
+ private static String clean(String raw, String joiner) {
+ Objects.requireNonNull(raw, "metric data cannot be null");
+ String lowerCase = raw.toLowerCase(Locale.ROOT);
+ return lowerCase.replaceAll("-", joiner);
+ }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/TelemetryMetricNamingConventionTest.java b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/TelemetryMetricNamingConventionTest.java
new file mode 100644
index 00000000000..5ac0a11d68f
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/TelemetryMetricNamingConventionTest.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class TelemetryMetricNamingConventionTest {
+
+ private MetricNamingStrategy<MetricName> metricNamingStrategy;
+
+ @BeforeEach
+ public void setUp() {
+ metricNamingStrategy = TelemetryMetricNamingConvention
+ .getClientTelemetryMetricNamingStrategy("org.apache.kafka");
+ }
+
+ @Test
+ public void testMetricKey() {
+ MetricName metricName = new MetricName("name", "group", "description",
+ Collections.emptyMap());
+ MetricKey metricKey = metricNamingStrategy.metricKey(metricName);
+
+ assertEquals("org.apache.kafka.group.name", metricKey.getName());
+ assertEquals(Collections.emptyMap(), metricKey.tags());
+ }
+
+ @Test
+ public void testMetricKeyWithHyphenNameAndNonEmptyTags() {
+ Map<String, String> tags = new HashMap<>();
+ tags.put("tag1", "value1");
+ tags.put("tag2", "value2");
+
+ MetricName metricName = new MetricName("test-name", "group-name", "description", tags);
+ MetricKey metricKey = metricNamingStrategy.metricKey(metricName);
+
+ assertEquals("org.apache.kafka.group.name.test.name", metricKey.getName());
+ assertEquals(tags, metricKey.tags());
+ }
+
+ /**
+ * Test metric key with mixed name and mixed tags where mixed represents combination of upper case,
+ * lower case, numbers, hyphen, dot, underscore and special characters.
+ */
+ @Test
+ public void testMetricKeyWithMixedNameAndMixedTags() {
+ Map<String, String> tags = new HashMap<>();
+ tags.put("tag1-Ab-2_(", "value1");
+ tags.put("tag2-HELLO.@", "value2");
+
+ MetricName metricName = new MetricName("test-Name-1.$", "grouP-name-AB_&", "description", tags);
+ MetricKey metricKey = metricNamingStrategy.metricKey(metricName);
+
+ tags.clear();
+ tags.put("tag1_ab_2_(", "value1");
+ tags.put("tag2_hello.@", "value2");
+
+ assertEquals("org.apache.kafka.group.name.ab_&.test.name.1.$", metricKey.getName());
+ assertEquals(tags, metricKey.tags());
+ }
+
+ @Test
+ public void testMetricKeyWithNullMetricName() {
+ Exception e = assertThrows(NullPointerException.class, () -> metricNamingStrategy.metricKey(null));
+ assertEquals("metric name cannot be null", e.getMessage());
+ }
+
+ @Test
+ public void testMetricKeyWithEmptyName() {
+ MetricName metricName = new MetricName("", "group-1A", "description",
+ Collections.emptyMap());
+ MetricKey metricKey = metricNamingStrategy.metricKey(metricName);
+
+ // If there is no name, then the telemetry metric name will have dot in the end though
+ // metric names always have a name.
+ assertEquals("org.apache.kafka.group.1a.", metricKey.getName());
+ assertEquals(Collections.emptyMap(), metricKey.tags());
+ }
+
+ @Test
+ public void testMetricKeyWithEmptyGroup() {
+ MetricName metricName = new MetricName("name", "", "description",
+ Collections.emptyMap());
+ MetricKey metricKey = metricNamingStrategy.metricKey(metricName);
+
+ // If there is no group, then the telemetry metric name will have consecutive dots, though
+ // metric names always have group name.
+ assertEquals("org.apache.kafka..name", metricKey.getName());
+ assertEquals(Collections.emptyMap(), metricKey.tags());
+ }
+
+ @Test
+ public void testMetricKeyWithAdditionalMetricsSuffixInGroup() {
+ MetricName metricName = new MetricName("name", "group-metrics", "description",
+ Collections.emptyMap());
+ MetricKey metricKey = metricNamingStrategy.metricKey(metricName);
+
+ // '-metrics' gets removed from the group name.
+ assertEquals("org.apache.kafka.group.name", metricKey.getName());
+ assertEquals(Collections.emptyMap(), metricKey.tags());
+ }
+
+ @Test
+ public void testMetricKeyWithMultipleMetricsSuffixInGroup() {
+ MetricName metricName = new MetricName("name-metrics", "group-metrics-metrics", "description",
+ Collections.emptyMap());
+ MetricKey metricKey = metricNamingStrategy.metricKey(metricName);
+
+ // '-metrics' gets removed from the group name.
+ assertEquals("org.apache.kafka.group.name.metrics", metricKey.getName());
+ assertEquals(Collections.emptyMap(), metricKey.tags());
+ }
+
+ @Test
+ public void testMetricKeyWithNullTagKey() {
+ MetricName metricName = new MetricName("name", "group", "description",
+ Collections.singletonMap(null, "value1"));
+ Exception e = assertThrows(NullPointerException.class, () -> metricNamingStrategy.metricKey(metricName));
+ assertEquals("metric data cannot be null", e.getMessage());
+ }
+
+ @Test
+ public void testMetricKeyWithBlankTagKey() {
+ MetricName metricName = new MetricName("name", "group", "description",
+ Collections.singletonMap("", "value1"));
+ MetricKey metricKey = metricNamingStrategy.metricKey(metricName);
+
+ assertEquals("org.apache.kafka.group.name", metricKey.getName());
+ assertEquals(Collections.singletonMap("", "value1"), metricKey.tags());
+ }
+
+ @Test
+ public void testDerivedMetricKey() {
+ MetricName metricName = new MetricName("name", "group", "description",
+ Collections.emptyMap());
+ MetricKey metricKey = metricNamingStrategy.derivedMetricKey(
+ metricNamingStrategy.metricKey(metricName), "delta");
+
+ assertEquals("org.apache.kafka.group.name.delta", metricKey.getName());
+ assertEquals(Collections.emptyMap(), metricKey.tags());
+ }
+
+ @Test
+ public void testDerivedMetricKeyWithTags() {
+ MetricName metricName = new MetricName("name", "group", "description",
+ Collections.singletonMap("tag1", "value1"));
+ MetricKey metricKey = metricNamingStrategy.derivedMetricKey(
+ metricNamingStrategy.metricKey(metricName), "delta");
+
+ assertEquals("org.apache.kafka.group.name.delta", metricKey.getName());
+ assertEquals(Collections.singletonMap("tag1", "value1"), metricKey.tags());
+ }
+
+ @Test
+ public void testDerivedMetricKeyWithNullComponent() {
+ MetricName metricName = new MetricName("name", "group", "description",
+ Collections.emptyMap());
+ Exception e = assertThrows(NullPointerException.class, () -> metricNamingStrategy.derivedMetricKey(
+ metricNamingStrategy.metricKey(metricName), null));
+ assertEquals("derived component cannot be null", e.getMessage());
+ }
+
+ @Test
+ public void testDerivedMetricKeyWithBlankComponent() {
+ MetricName metricName = new MetricName("name", "group", "description",
+ Collections.emptyMap());
+ MetricKey metricKey = metricNamingStrategy.derivedMetricKey(
+ metricNamingStrategy.metricKey(metricName), "");
+
+ // Ends with dot, though derived component should not be blank, omitting the check in the code.
+ assertEquals("org.apache.kafka.group.name.", metricKey.getName());
+ assertEquals(Collections.emptyMap(), metricKey.tags());
+ }
+
+ @Test
+ public void testNullPrefix() {
+ Exception e = assertThrows(NullPointerException.class, () -> TelemetryMetricNamingConvention
+ .getClientTelemetryMetricNamingStrategy(null));
+ assertEquals("prefix cannot be null", e.getMessage());
+ }
+
+ /**
+ * Standard producer metrics are the one's defined in KIP-714 under the section "Standard producer metrics":
+ * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Standardproducermetrics
+ */
+ @Test
+ public void testStandardProducerMetrics() {
+ assertEquals("org.apache.kafka.producer.connection.creation.rate",
+ metricNamingStrategy.metricKey(new MetricName("connection-creation-rate",
+ "producer-metrics", "description", Collections.emptyMap())).getName());
+
+ assertEquals("org.apache.kafka.producer.connection.creation.total",
+ metricNamingStrategy.metricKey(new MetricName("connection-creation-total",
+ "producer-metrics", "description", Collections.emptyMap())).getName());
+
+ assertEquals("org.apache.kafka.producer.node.request.latency.avg",
+ metricNamingStrategy.metricKey(new MetricName("request-latency-avg",
+ "producer-node-metrics", "description", Collections.emptyMap())).getName());
+
+ assertEquals("org.apache.kafka.producer.node.request.latency.max",
+ metricNamingStrategy.metricKey(new MetricName("request-latency-max",
+ "producer-node-metrics", "description", Collections.emptyMap())).getName());
+
+ assertEquals("org.apache.kafka.producer.produce.throttle.time.avg",
+ metricNamingStrategy.metricKey(new MetricName("produce-throttle-time-avg",
+ "producer-metrics", "description", Collections.emptyMap())).getName());
+
+ assertEquals("org.apache.kafka.producer.produce.throttle.time.max",
+ metricNamingStrategy.metricKey(new MetricName("produce-throttle-time-max",
+ "producer-metrics", "description", Collections.emptyMap())).getName());
+
+ assertEquals("org.apache.kafka.producer.record.queue.time.avg",
+ metricNamingStrategy.metricKey(new MetricName("record-queue-time-avg",
+ "producer-metrics", "description", Collections.emptyMap())).getName());
+
+ assertEquals("org.apache.kafka.producer.record.queue.time.max",
+ metricNamingStrategy.metricKey(new MetricName("record-queue-time-max",
+ "producer-metrics", "description", Collections.emptyMap())).getName());
+ }
+
+ /**
+ * Standard consumer metrics are the one's defined in KIP-714 under the section "Standard consumer metrics":
+ * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Standardconsumermetrics
+ */
+ @Test
+ public void testStandardConsumerMetrics() {
+ assertEquals("org.apache.kafka.consumer.connection.creation.rate",
+ metricNamingStrategy.metricKey(new MetricName("connection-creation-rate",
+ "consumer-metrics", "description", Collections.emptyMap())).getName());
+
+ assertEquals("org.apache.kafka.consumer.connection.creation.total",
+ metricNamingStrategy.metricKey(new MetricName("connection-creation-total",
+ "consumer-metrics", "description", Collections.emptyMap())).getName());
+
+ assertEquals("org.apache.kafka.consumer.node.request.latency.avg",
+ metricNamingStrategy.metricKey(new MetricName("request-latency-avg",
+ "consumer-node-metrics", "description", Collections.emptyMap())).getName());
+
+ assertEquals("org.apache.kafka.consumer.node.request.latency.max",
+ metricNamingStrategy.metricKey(new MetricName("request-latency-max",
+ "consumer-node-metrics", "description", Collections.emptyMap())).getName());
+
+ assertEquals("org.apache.kafka.consumer.poll.idle.ratio.avg",
+ metricNamingStrategy.metricKey(new MetricName("poll-idle-ratio-avg",
+ "consumer-metrics", "description", Collections.emptyMap())).getName());
+
+ assertEquals("org.apache.kafka.consumer.coordinator.commit.latency.avg",
+ metricNamingStrategy.metricKey(new MetricName("commit-latency-avg",
+ "consumer-coordinator-metrics", "description", Collections.emptyMap())).getName());
+
+ assertEquals("org.apache.kafka.consumer.coordinator.commit.latency.max",
+ metricNamingStrategy.metricKey(new MetricName("commit-latency-max",
+ "consumer-coordinator-metrics", "description", Collections.emptyMap())).getName());
+
+ assertEquals("org.apache.kafka.consumer.coordinator.assigned.partitions",
+ metricNamingStrategy.metricKey(new MetricName("assigned-partitions",
+ "consumer-coordinator-metrics", "description", Collections.emptyMap())).getName());
+
+ assertEquals("org.apache.kafka.consumer.coordinator.rebalance.latency.avg",
+ metricNamingStrategy.metricKey(new MetricName("rebalance-latency-avg",
+ "consumer-coordinator-metrics", "description", Collections.emptyMap())).getName());
+
+ assertEquals("org.apache.kafka.consumer.coordinator.rebalance.latency.max",
+ metricNamingStrategy.metricKey(new MetricName("rebalance-latency-max",
+ "consumer-coordinator-metrics", "description", Collections.emptyMap())).getName());
+
+ assertEquals("org.apache.kafka.consumer.coordinator.rebalance.latency.total",
+ metricNamingStrategy.metricKey(new MetricName("rebalance-latency-total",
+ "consumer-coordinator-metrics", "description", Collections.emptyMap())).getName());
+
+ assertEquals("org.apache.kafka.consumer.fetch.manager.fetch.latency.avg",
+ metricNamingStrategy.metricKey(new MetricName("fetch-latency-avg",
+ "consumer-fetch-manager-metrics", "description", Collections.emptyMap())).getName());
+
+ assertEquals("org.apache.kafka.consumer.fetch.manager.fetch.latency.max",
+ metricNamingStrategy.metricKey(new MetricName("fetch-latency-max",
+ "consumer-fetch-manager-metrics", "description", Collections.emptyMap())).getName());
+ }
+}