You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by wc...@apache.org on 2023/10/31 20:35:10 UTC

(kafka) branch trunk updated: KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) (#14619)

This is an automated email from the ASF dual-hosted git repository.

wcarlson pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ed3fa83d385 KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) (#14619)
ed3fa83d385 is described below

commit ed3fa83d385bbe2fa6dc4943660f56fec708bbc8
Author: Apoorv Mittal <am...@confluent.io>
AuthorDate: Tue Oct 31 20:35:02 2023 +0000

    KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) (#14619)
    
    The PR defines the naming convention for telemetry metric names for KIP-714 - jira. Telemetry metric name should be dot separated and tags should be snake case.
    
    PR adds the interface which will be used in MetricsReporter implementation to construct metric names.
    
    Reviewers: Xavier Léauté <xv...@apache.org>, Walker Carlson <wc...@apache.org>, Matthias J. Sax <mj...@apache.org>, Andrew Schofield <an...@uk.ibm.com>
---
 .../telemetry/internals/MetricNamingStrategy.java  |  85 ++++++
 .../internals/TelemetryMetricNamingConvention.java | 124 +++++++++
 .../TelemetryMetricNamingConventionTest.java       | 301 +++++++++++++++++++++
 3 files changed, 510 insertions(+)

diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingStrategy.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingStrategy.java
new file mode 100644
index 00000000000..0b5a98dd0d7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingStrategy.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.MetricName;
+
+/**
+ * {@code MetricNamingStrategy} provides a strategy pattern-based means of converting from an
+ * implementation-specific metric name (e.g. Kafka {@link MetricName}) representing a
+ * particular metric name and associated tags into a canonical {@link MetricKey}
+ * (name and tags) representation.
+ *
+ * <p>
+ *
+ * Each strategy may define its own conventions for how the resulting metric should be named,
+ * including things such conforming name and tags to use specific casing and separators for
+ * different parts of the metric name.
+ *
+ * <p>
+ *
+ * In general, a {@code MetricNamingStrategy} implementation is closely managed by another entity,
+ * referred to as the "telemetry reporter", as that reporter handles the conversion between different
+ * representations of metric names and keys.
+ *
+ * <p>
+ *
+ * This class is primarily used by the telemetry reporter, {@link MetricsCollector}, and
+ * {@link MetricsEmitter} layers.
+ */
+public interface MetricNamingStrategy<T> {
+
+    /**
+     * Converts the given metric name into a {@link MetricKey} representation.
+     *
+     * @param metricName Implementation-specific metric
+     * @return {@link MetricKey}
+     */
+    MetricKey metricKey(T metricName);
+
+    /**
+     * Creates a derived {@link MetricKey} from an existing {@link MetricKey}.
+     *
+     * <p>
+     *
+     * Some metrics may include multiple components derived from the same underlying source
+     * of data (e.g. a Meter that exposes multiple rates and a counter) in which case it may
+     * be desirable to create a new metric key derived from the primary one, with a different
+     * name for each component of the metric.
+     *
+     * <p>
+     *
+     * Some metrics may be derived from others by the collector itself. For example, a delta
+     * metric might be created from a cumulative counter.
+     *
+     * <p>
+     *
+     * This method exists so each strategy can define its own convention for how to name
+     * derived metrics keys.
+     *
+     * <p>
+     *
+     * The derived key should have the same tags as the input key, and its name new name
+     * will typically be composed of the input key name and the component name.
+     *
+     * @param key Input {@link MetricKey} used to construct the derived key
+     * @param derivedComponent Name to use for the derived component of the input metric
+     * @return Derived {@link MetricKey} with a new metric name composed of the input key
+     * name and the additional name
+     */
+    MetricKey derivedMetricKey(MetricKey key, String derivedComponent);
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/TelemetryMetricNamingConvention.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/TelemetryMetricNamingConvention.java
new file mode 100644
index 00000000000..7d41b1519a0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/TelemetryMetricNamingConvention.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.MetricName;
+
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates naming and mapping conventions defined as part of
+ * <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat">Metrics naming and format</a>
+ */
+public class TelemetryMetricNamingConvention {
+
+    private static final String NAME_JOINER = ".";
+    private static final String TAG_JOINER = "_";
+
+    // remove metrics as it is redundant for telemetry metrics naming convention
+    private final static Pattern GROUP_PATTERN = Pattern.compile("\\.(metrics)");
+
+    public static MetricNamingStrategy<MetricName> getClientTelemetryMetricNamingStrategy(String prefix) {
+        Objects.requireNonNull(prefix, "prefix cannot be null");
+
+        return new MetricNamingStrategy<MetricName>() {
+            @Override
+            public MetricKey metricKey(MetricName metricName) {
+                Objects.requireNonNull(metricName, "metric name cannot be null");
+
+                return new MetricKey(fullMetricName(prefix, metricName.group(), metricName.name()),
+                    Collections.unmodifiableMap(cleanTags(metricName.tags())));
+            }
+
+            @Override
+            public MetricKey derivedMetricKey(MetricKey key, String derivedComponent) {
+                Objects.requireNonNull(derivedComponent, "derived component cannot be null");
+                return new MetricKey(key.getName() + NAME_JOINER + derivedComponent, key.tags());
+            }
+        };
+    }
+
+    /**
+     * Creates a metric name from the given prefix, group, and name. The new String follows the following
+     * conventions and rules:
+     *
+     * <ul>
+     *   <li>prefix is expected to be a host-name like value, e.g. {@code org.apache.kafka}</li>
+     *   <li>group is cleaned of redundant words: "-metrics"</li>
+     *   <li>the group and metric name is dot separated</li>
+     *   <li>The name is created by joining the three components, e.g.:
+     *     {@code org.apache.kafka.producer.connection.creation.rate}</li>
+     * </ul>
+     */
+    private static String fullMetricName(String prefix, String group, String name) {
+        return prefix
+            + NAME_JOINER
+            + cleanGroup(group)
+            + NAME_JOINER
+            + cleanMetric(name);
+    }
+
+    /**
+     * This method maps a group name to follow conventions and cleans up the result to be more legible:
+     * <ul>
+     *  <li> converts names to lower case conventions
+     *  <li> normalizes artifacts of hyphen case in group name to dot separated conversion
+     *  <li> strips redundant parts of the metric name, such as -metrics
+     * </ul>
+     */
+    private static String cleanGroup(String group) {
+        group = clean(group, NAME_JOINER);
+        return GROUP_PATTERN.matcher(group).replaceAll("");
+    }
+
+    /**
+     * This method maps a metric name to follow conventions and cleans up the result to be more legible:
+     * <ul>
+     *  <li> converts names to lower case conventions
+     *  <li> normalizes artifacts of hyphen case in metric name to dot separated conversion
+     * </ul>
+     */
+    private static String cleanMetric(String metric) {
+        return clean(metric, NAME_JOINER);
+    }
+
+    /**
+     * Converts a tag name to match the telemetry naming conventions by converting into snake_case.
+     * <p>
+     * Kafka metrics have tags name in lower case separated by hyphens. Eg: total-errors
+     *
+     * @param raw the input map
+     * @return the new map with keys replaced by snake_case representations.
+     */
+    private static Map<String, String> cleanTags(Map<String, String> raw) {
+        return raw.entrySet()
+            .stream()
+            .collect(Collectors.toMap(s -> clean(s.getKey(), TAG_JOINER), Entry::getValue));
+    }
+
+    private static String clean(String raw, String joiner) {
+        Objects.requireNonNull(raw, "metric data cannot be null");
+        String lowerCase = raw.toLowerCase(Locale.ROOT);
+        return lowerCase.replaceAll("-", joiner);
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/TelemetryMetricNamingConventionTest.java b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/TelemetryMetricNamingConventionTest.java
new file mode 100644
index 00000000000..5ac0a11d68f
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/TelemetryMetricNamingConventionTest.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class TelemetryMetricNamingConventionTest {
+
+    private MetricNamingStrategy<MetricName> metricNamingStrategy;
+
+    @BeforeEach
+    public void setUp() {
+        metricNamingStrategy = TelemetryMetricNamingConvention
+            .getClientTelemetryMetricNamingStrategy("org.apache.kafka");
+    }
+
+    @Test
+    public void testMetricKey() {
+        MetricName metricName = new MetricName("name", "group", "description",
+            Collections.emptyMap());
+        MetricKey metricKey = metricNamingStrategy.metricKey(metricName);
+
+        assertEquals("org.apache.kafka.group.name", metricKey.getName());
+        assertEquals(Collections.emptyMap(), metricKey.tags());
+    }
+
+    @Test
+    public void testMetricKeyWithHyphenNameAndNonEmptyTags() {
+        Map<String, String> tags = new HashMap<>();
+        tags.put("tag1", "value1");
+        tags.put("tag2", "value2");
+
+        MetricName metricName = new MetricName("test-name", "group-name", "description", tags);
+        MetricKey metricKey = metricNamingStrategy.metricKey(metricName);
+
+        assertEquals("org.apache.kafka.group.name.test.name", metricKey.getName());
+        assertEquals(tags, metricKey.tags());
+    }
+
+    /**
+     * Test metric key with mixed name and mixed tags where mixed represents combination of upper case,
+     * lower case, numbers, hyphen, dot, underscore and special characters.
+     */
+    @Test
+    public void testMetricKeyWithMixedNameAndMixedTags() {
+        Map<String, String> tags = new HashMap<>();
+        tags.put("tag1-Ab-2_(", "value1");
+        tags.put("tag2-HELLO.@", "value2");
+
+        MetricName metricName = new MetricName("test-Name-1.$", "grouP-name-AB_&", "description", tags);
+        MetricKey metricKey = metricNamingStrategy.metricKey(metricName);
+
+        tags.clear();
+        tags.put("tag1_ab_2_(", "value1");
+        tags.put("tag2_hello.@", "value2");
+
+        assertEquals("org.apache.kafka.group.name.ab_&.test.name.1.$", metricKey.getName());
+        assertEquals(tags, metricKey.tags());
+    }
+
+    @Test
+    public void testMetricKeyWithNullMetricName() {
+        Exception e = assertThrows(NullPointerException.class, () -> metricNamingStrategy.metricKey(null));
+        assertEquals("metric name cannot be null", e.getMessage());
+    }
+
+    @Test
+    public void testMetricKeyWithEmptyName() {
+        MetricName metricName = new MetricName("", "group-1A", "description",
+            Collections.emptyMap());
+        MetricKey metricKey = metricNamingStrategy.metricKey(metricName);
+
+        // If there is no name, then the telemetry metric name will have dot in the end though
+        // metric names always have a name.
+        assertEquals("org.apache.kafka.group.1a.", metricKey.getName());
+        assertEquals(Collections.emptyMap(), metricKey.tags());
+    }
+
+    @Test
+    public void testMetricKeyWithEmptyGroup() {
+        MetricName metricName = new MetricName("name", "", "description",
+            Collections.emptyMap());
+        MetricKey metricKey = metricNamingStrategy.metricKey(metricName);
+
+        // If there is no group, then the telemetry metric name will have consecutive dots, though
+        // metric names always have group name.
+        assertEquals("org.apache.kafka..name", metricKey.getName());
+        assertEquals(Collections.emptyMap(), metricKey.tags());
+    }
+
+    @Test
+    public void testMetricKeyWithAdditionalMetricsSuffixInGroup() {
+        MetricName metricName = new MetricName("name", "group-metrics", "description",
+            Collections.emptyMap());
+        MetricKey metricKey = metricNamingStrategy.metricKey(metricName);
+
+        // '-metrics' gets removed from the group name.
+        assertEquals("org.apache.kafka.group.name", metricKey.getName());
+        assertEquals(Collections.emptyMap(), metricKey.tags());
+    }
+
+    @Test
+    public void testMetricKeyWithMultipleMetricsSuffixInGroup() {
+        MetricName metricName = new MetricName("name-metrics", "group-metrics-metrics", "description",
+            Collections.emptyMap());
+        MetricKey metricKey = metricNamingStrategy.metricKey(metricName);
+
+        // '-metrics' gets removed from the group name.
+        assertEquals("org.apache.kafka.group.name.metrics", metricKey.getName());
+        assertEquals(Collections.emptyMap(), metricKey.tags());
+    }
+
+    @Test
+    public void testMetricKeyWithNullTagKey() {
+        MetricName metricName = new MetricName("name", "group", "description",
+            Collections.singletonMap(null, "value1"));
+        Exception e = assertThrows(NullPointerException.class, () -> metricNamingStrategy.metricKey(metricName));
+        assertEquals("metric data cannot be null", e.getMessage());
+    }
+
+    @Test
+    public void testMetricKeyWithBlankTagKey() {
+        MetricName metricName = new MetricName("name", "group", "description",
+            Collections.singletonMap("", "value1"));
+        MetricKey metricKey = metricNamingStrategy.metricKey(metricName);
+
+        assertEquals("org.apache.kafka.group.name", metricKey.getName());
+        assertEquals(Collections.singletonMap("", "value1"), metricKey.tags());
+    }
+
+    @Test
+    public void testDerivedMetricKey() {
+        MetricName metricName = new MetricName("name", "group", "description",
+            Collections.emptyMap());
+        MetricKey metricKey = metricNamingStrategy.derivedMetricKey(
+            metricNamingStrategy.metricKey(metricName), "delta");
+
+        assertEquals("org.apache.kafka.group.name.delta", metricKey.getName());
+        assertEquals(Collections.emptyMap(), metricKey.tags());
+    }
+
+    @Test
+    public void testDerivedMetricKeyWithTags() {
+        MetricName metricName = new MetricName("name", "group", "description",
+            Collections.singletonMap("tag1", "value1"));
+        MetricKey metricKey = metricNamingStrategy.derivedMetricKey(
+            metricNamingStrategy.metricKey(metricName), "delta");
+
+        assertEquals("org.apache.kafka.group.name.delta", metricKey.getName());
+        assertEquals(Collections.singletonMap("tag1", "value1"), metricKey.tags());
+    }
+
+    @Test
+    public void testDerivedMetricKeyWithNullComponent() {
+        MetricName metricName = new MetricName("name", "group", "description",
+            Collections.emptyMap());
+        Exception e = assertThrows(NullPointerException.class, () -> metricNamingStrategy.derivedMetricKey(
+            metricNamingStrategy.metricKey(metricName), null));
+        assertEquals("derived component cannot be null", e.getMessage());
+    }
+
+    @Test
+    public void testDerivedMetricKeyWithBlankComponent() {
+        MetricName metricName = new MetricName("name", "group", "description",
+            Collections.emptyMap());
+        MetricKey metricKey = metricNamingStrategy.derivedMetricKey(
+            metricNamingStrategy.metricKey(metricName), "");
+
+        // Ends with dot, though derived component should not be blank, omitting the check in the code.
+        assertEquals("org.apache.kafka.group.name.", metricKey.getName());
+        assertEquals(Collections.emptyMap(), metricKey.tags());
+    }
+
+    @Test
+    public void testNullPrefix() {
+        Exception e = assertThrows(NullPointerException.class, () -> TelemetryMetricNamingConvention
+            .getClientTelemetryMetricNamingStrategy(null));
+        assertEquals("prefix cannot be null", e.getMessage());
+    }
+
+    /**
+     * Standard producer metrics are the one's defined in KIP-714 under the section "Standard producer metrics":
+     * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Standardproducermetrics
+     */
+    @Test
+    public void testStandardProducerMetrics() {
+        assertEquals("org.apache.kafka.producer.connection.creation.rate",
+            metricNamingStrategy.metricKey(new MetricName("connection-creation-rate",
+                "producer-metrics", "description", Collections.emptyMap())).getName());
+
+        assertEquals("org.apache.kafka.producer.connection.creation.total",
+            metricNamingStrategy.metricKey(new MetricName("connection-creation-total",
+                "producer-metrics", "description", Collections.emptyMap())).getName());
+
+        assertEquals("org.apache.kafka.producer.node.request.latency.avg",
+            metricNamingStrategy.metricKey(new MetricName("request-latency-avg",
+                "producer-node-metrics", "description", Collections.emptyMap())).getName());
+
+        assertEquals("org.apache.kafka.producer.node.request.latency.max",
+            metricNamingStrategy.metricKey(new MetricName("request-latency-max",
+                "producer-node-metrics", "description", Collections.emptyMap())).getName());
+
+        assertEquals("org.apache.kafka.producer.produce.throttle.time.avg",
+            metricNamingStrategy.metricKey(new MetricName("produce-throttle-time-avg",
+                "producer-metrics", "description", Collections.emptyMap())).getName());
+
+        assertEquals("org.apache.kafka.producer.produce.throttle.time.max",
+            metricNamingStrategy.metricKey(new MetricName("produce-throttle-time-max",
+                "producer-metrics", "description", Collections.emptyMap())).getName());
+
+        assertEquals("org.apache.kafka.producer.record.queue.time.avg",
+            metricNamingStrategy.metricKey(new MetricName("record-queue-time-avg",
+                "producer-metrics", "description", Collections.emptyMap())).getName());
+
+        assertEquals("org.apache.kafka.producer.record.queue.time.max",
+            metricNamingStrategy.metricKey(new MetricName("record-queue-time-max",
+                "producer-metrics", "description", Collections.emptyMap())).getName());
+    }
+
+    /**
+     * Standard consumer metrics are the one's defined in KIP-714 under the section "Standard consumer metrics":
+     * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Standardconsumermetrics
+     */
+    @Test
+    public void testStandardConsumerMetrics() {
+        assertEquals("org.apache.kafka.consumer.connection.creation.rate",
+            metricNamingStrategy.metricKey(new MetricName("connection-creation-rate",
+                "consumer-metrics", "description", Collections.emptyMap())).getName());
+
+        assertEquals("org.apache.kafka.consumer.connection.creation.total",
+            metricNamingStrategy.metricKey(new MetricName("connection-creation-total",
+                "consumer-metrics", "description", Collections.emptyMap())).getName());
+
+        assertEquals("org.apache.kafka.consumer.node.request.latency.avg",
+            metricNamingStrategy.metricKey(new MetricName("request-latency-avg",
+                "consumer-node-metrics", "description", Collections.emptyMap())).getName());
+
+        assertEquals("org.apache.kafka.consumer.node.request.latency.max",
+            metricNamingStrategy.metricKey(new MetricName("request-latency-max",
+                "consumer-node-metrics", "description", Collections.emptyMap())).getName());
+
+        assertEquals("org.apache.kafka.consumer.poll.idle.ratio.avg",
+            metricNamingStrategy.metricKey(new MetricName("poll-idle-ratio-avg",
+                "consumer-metrics", "description", Collections.emptyMap())).getName());
+
+        assertEquals("org.apache.kafka.consumer.coordinator.commit.latency.avg",
+            metricNamingStrategy.metricKey(new MetricName("commit-latency-avg",
+                "consumer-coordinator-metrics", "description", Collections.emptyMap())).getName());
+
+        assertEquals("org.apache.kafka.consumer.coordinator.commit.latency.max",
+            metricNamingStrategy.metricKey(new MetricName("commit-latency-max",
+                "consumer-coordinator-metrics", "description", Collections.emptyMap())).getName());
+
+        assertEquals("org.apache.kafka.consumer.coordinator.assigned.partitions",
+            metricNamingStrategy.metricKey(new MetricName("assigned-partitions",
+                "consumer-coordinator-metrics", "description", Collections.emptyMap())).getName());
+
+        assertEquals("org.apache.kafka.consumer.coordinator.rebalance.latency.avg",
+            metricNamingStrategy.metricKey(new MetricName("rebalance-latency-avg",
+                "consumer-coordinator-metrics", "description", Collections.emptyMap())).getName());
+
+        assertEquals("org.apache.kafka.consumer.coordinator.rebalance.latency.max",
+            metricNamingStrategy.metricKey(new MetricName("rebalance-latency-max",
+                "consumer-coordinator-metrics", "description", Collections.emptyMap())).getName());
+
+        assertEquals("org.apache.kafka.consumer.coordinator.rebalance.latency.total",
+            metricNamingStrategy.metricKey(new MetricName("rebalance-latency-total",
+                "consumer-coordinator-metrics", "description", Collections.emptyMap())).getName());
+
+        assertEquals("org.apache.kafka.consumer.fetch.manager.fetch.latency.avg",
+            metricNamingStrategy.metricKey(new MetricName("fetch-latency-avg",
+                "consumer-fetch-manager-metrics", "description", Collections.emptyMap())).getName());
+
+        assertEquals("org.apache.kafka.consumer.fetch.manager.fetch.latency.max",
+            metricNamingStrategy.metricKey(new MetricName("fetch-latency-max",
+                "consumer-fetch-manager-metrics", "description", Collections.emptyMap())).getName());
+    }
+}