You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2023/01/15 08:00:24 UTC

[GitHub] [kafka] ijuma commented on a diff in pull request #13067: KAFKA-14524: Rewrite KafkaMetricsGroup in Java

ijuma commented on code in PR #13067:
URL: https://github.com/apache/kafka/pull/13067#discussion_r1070521647


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -293,14 +295,14 @@ class Partition(val topicPartition: TopicPartition,
   private var controllerEpoch: Int = KafkaController.InitialControllerEpoch
   this.logIdent = s"[Partition $topicPartition broker=$localBrokerId] "
 
-  private val tags = Map("topic" -> topic, "partition" -> partitionId.toString)
+  private val tags = Map("topic" -> topic, "partition" -> partitionId.toString).asJava
 
-  newGauge("UnderReplicated", () => if (isUnderReplicated) 1 else 0, tags)
-  newGauge("InSyncReplicasCount", () => if (isLeader) partitionState.isr.size else 0, tags)
-  newGauge("UnderMinIsr", () => if (isUnderMinIsr) 1 else 0, tags)
-  newGauge("AtMinIsr", () => if (isAtMinIsr) 1 else 0, tags)
-  newGauge("ReplicasCount", () => if (isLeader) assignmentState.replicationFactor else 0, tags)
-  newGauge("LastStableOffsetLag", () => log.map(_.lastStableOffsetLag).getOrElse(0), tags)
+  Partition.metricsGroup.newGauge("UnderReplicated", () => if (isUnderReplicated) 1 else 0, tags)

Review Comment:
   `Partition.` should not be needed.



##########
core/src/test/scala/unit/kafka/metrics/MetricsTest.scala:
##########
@@ -158,7 +159,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
     val path = "C:\\windows-path\\kafka-logs"
     val tags = Map("dir" -> path)
     val expectedMBeanName = Set(tags.keySet.head, ObjectName.quote(path)).mkString("=")
-    val metric = KafkaMetricsGroup.metricName("test-metric", tags)
+    val metric = new KafkaMetricsGroup(this.getClass).metricName("test-metric", tags.asJava)

Review Comment:
   Worth noting that this is slightly different than the old code, but seems ok for what we're testing here.



##########
server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.metrics;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+import org.apache.kafka.common.utils.Sanitizer;
+
+public class KafkaMetricsGroup {
+    private final Class<?> klass;
+
+    public KafkaMetricsGroup(final Class<?> klass) {
+        this.klass = klass;
+    }
+
+    /**
+     * Creates a new MetricName object for gauges, meters, etc. created for this
+     * metrics group.
+     * @param name Descriptive name of the metric.
+     * @param tags Additional attributes which mBean will have.
+     * @return Sanitized metric name object.
+     */
+    public MetricName metricName(final String name, final Map<String, String> tags) {
+        final String pkg;
+        if (klass.getPackage() == null) {
+            pkg = "";
+        } else {
+            pkg = klass.getPackage().getName();
+        }
+        final String simpleName = klass.getSimpleName().replaceAll("\\$$", "");

Review Comment:
   Can we please file a JIRA for removing this? I don't think it's required if we don't pass the class from Scala objects (versus classes).



##########
server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.metrics;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+import org.apache.kafka.common.utils.Sanitizer;
+
+public class KafkaMetricsGroup {
+    private final Class<?> klass;
+
+    public KafkaMetricsGroup(final Class<?> klass) {
+        this.klass = klass;
+    }
+
+    /**
+     * Creates a new MetricName object for gauges, meters, etc. created for this
+     * metrics group.
+     * @param name Descriptive name of the metric.
+     * @param tags Additional attributes which mBean will have.
+     * @return Sanitized metric name object.
+     */
+    public MetricName metricName(final String name, final Map<String, String> tags) {
+        final String pkg;

Review Comment:
   We don't use `final` for parameters and local variables in Kafka (the stream modules are an exception).



##########
server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.metrics;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+import org.apache.kafka.common.utils.Sanitizer;
+
+public class KafkaMetricsGroup {
+    private final Class<?> klass;
+
+    public KafkaMetricsGroup(final Class<?> klass) {
+        this.klass = klass;
+    }
+
+    /**
+     * Creates a new MetricName object for gauges, meters, etc. created for this
+     * metrics group.
+     * @param name Descriptive name of the metric.
+     * @param tags Additional attributes which mBean will have.
+     * @return Sanitized metric name object.
+     */
+    public MetricName metricName(final String name, final Map<String, String> tags) {
+        final String pkg;
+        if (klass.getPackage() == null) {
+            pkg = "";
+        } else {
+            pkg = klass.getPackage().getName();
+        }
+        final String simpleName = klass.getSimpleName().replaceAll("\\$$", "");
+        return explicitMetricName(pkg, simpleName, name, tags);
+    }
+
+    public final MetricName explicitMetricName(final String group, final String typeName,
+                                               final String name, final Map<String, String> tags) {
+        final StringBuilder nameBuilder = new StringBuilder();
+        nameBuilder.append(group);
+        nameBuilder.append(":type=");
+        nameBuilder.append(typeName);
+
+        if (!name.isEmpty()) {
+            nameBuilder.append(",name=");
+            nameBuilder.append(name);
+        }
+
+        final String scope = toScope(tags).orElse(null);
+        final Optional<String> tagsName = toMBeanName(tags);
+        tagsName.ifPresent(s -> nameBuilder.append(",").append(s));
+
+        return new MetricName(group, typeName, name, scope, nameBuilder.toString());
+    }
+
+    public final <T> Gauge<T> newGauge(final String name, final Gauge<T> metric, final Map<String, String> tags) {
+        return KafkaYammerMetrics.defaultRegistry().newGauge(metricName(name, tags), metric);
+    }
+
+    public final <T> Gauge<T> newGauge(final String name, final Gauge<T> metric) {
+        return newGauge(name, metric, Collections.emptyMap());
+    }
+
+    public final Meter newMeter(final String name, final String eventType,
+                                final TimeUnit timeUnit, final Map<String, String> tags) {
+        return KafkaYammerMetrics.defaultRegistry().newMeter(metricName(name, tags), eventType, timeUnit);
+    }
+
+    public final Meter newMeter(final String name, final String eventType,
+                                final TimeUnit timeUnit) {
+        return newMeter(name, eventType, timeUnit, Collections.emptyMap());
+    }
+
+    public final Meter newMeter(final MetricName metricName, final String eventType, final TimeUnit timeUnit) {
+        return KafkaYammerMetrics.defaultRegistry().newMeter(metricName, eventType, timeUnit);
+    }
+
+    public final Histogram newHistogram(final String name, final boolean biased, final Map<String, String> tags) {
+        return KafkaYammerMetrics.defaultRegistry().newHistogram(metricName(name, tags), biased);
+    }
+
+    public final Histogram newHistogram(final String name) {
+        return newHistogram(name, true, Collections.emptyMap());
+    }
+
+    public final Histogram newHistogram(final String name, final boolean biased) {
+        return newHistogram(name, biased, Collections.emptyMap());
+    }
+
+    public final Histogram newHistogram(final String name, final Map<String, String> tags) {
+        return newHistogram(name, true, tags);
+    }
+
+    public final Timer newTimer(final String name, final TimeUnit durationUnit, final TimeUnit rateUnit,
+                                final Map<String, String> tags) {
+        return KafkaYammerMetrics.defaultRegistry().newTimer(metricName(name, tags), durationUnit, rateUnit);
+    }
+
+    public final Timer newTimer(final String name, final TimeUnit durationUnit, final TimeUnit rateUnit) {
+        return newTimer(name, durationUnit, rateUnit, Collections.emptyMap());
+    }
+
+    public final void removeMetric(final String name, final Map<String, String> tags) {
+        KafkaYammerMetrics.defaultRegistry().removeMetric(metricName(name, tags));
+    }
+
+    public final void removeMetric(final String name) {
+        removeMetric(name, Collections.emptyMap());
+    }
+
+    private Optional<String> toMBeanName(final Map<String, String> tags) {
+        final List<Map.Entry<String, String>> filteredTags = tags.entrySet().stream()
+                .filter(entry -> !entry.getValue().equals(""))
+                .collect(Collectors.toList());
+        if (!filteredTags.isEmpty()) {
+            final String tagsString = filteredTags.stream()
+                    .map(entry -> String.format("%s=%s", entry.getKey(), Sanitizer.jmxSanitize(entry.getValue())))

Review Comment:
   Let's use string concat (i.e. `+`) instead of `String.format`, the former is more efficient.



##########
server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.metrics;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+import org.apache.kafka.common.utils.Sanitizer;
+
+public class KafkaMetricsGroup {
+    private final Class<?> klass;
+
+    public KafkaMetricsGroup(final Class<?> klass) {
+        this.klass = klass;
+    }
+
+    /**
+     * Creates a new MetricName object for gauges, meters, etc. created for this
+     * metrics group.
+     * @param name Descriptive name of the metric.
+     * @param tags Additional attributes which mBean will have.
+     * @return Sanitized metric name object.
+     */
+    public MetricName metricName(final String name, final Map<String, String> tags) {
+        final String pkg;
+        if (klass.getPackage() == null) {
+            pkg = "";
+        } else {
+            pkg = klass.getPackage().getName();
+        }
+        final String simpleName = klass.getSimpleName().replaceAll("\\$$", "");
+        return explicitMetricName(pkg, simpleName, name, tags);
+    }
+
+    public final MetricName explicitMetricName(final String group, final String typeName,

Review Comment:
   Should this be a static method? Does it rely on any class state?



##########
server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.metrics;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+import org.apache.kafka.common.utils.Sanitizer;
+
+public class KafkaMetricsGroup {
+    private final Class<?> klass;
+
+    public KafkaMetricsGroup(final Class<?> klass) {
+        this.klass = klass;
+    }
+
+    /**
+     * Creates a new MetricName object for gauges, meters, etc. created for this
+     * metrics group.
+     * @param name Descriptive name of the metric.
+     * @param tags Additional attributes which mBean will have.
+     * @return Sanitized metric name object.
+     */
+    public MetricName metricName(final String name, final Map<String, String> tags) {
+        final String pkg;
+        if (klass.getPackage() == null) {
+            pkg = "";
+        } else {
+            pkg = klass.getPackage().getName();
+        }
+        final String simpleName = klass.getSimpleName().replaceAll("\\$$", "");
+        return explicitMetricName(pkg, simpleName, name, tags);
+    }
+
+    public final MetricName explicitMetricName(final String group, final String typeName,
+                                               final String name, final Map<String, String> tags) {
+        final StringBuilder nameBuilder = new StringBuilder();
+        nameBuilder.append(group);
+        nameBuilder.append(":type=");
+        nameBuilder.append(typeName);
+
+        if (!name.isEmpty()) {
+            nameBuilder.append(",name=");
+            nameBuilder.append(name);
+        }
+
+        final String scope = toScope(tags).orElse(null);
+        final Optional<String> tagsName = toMBeanName(tags);
+        tagsName.ifPresent(s -> nameBuilder.append(",").append(s));
+
+        return new MetricName(group, typeName, name, scope, nameBuilder.toString());
+    }
+
+    public final <T> Gauge<T> newGauge(final String name, final Gauge<T> metric, final Map<String, String> tags) {
+        return KafkaYammerMetrics.defaultRegistry().newGauge(metricName(name, tags), metric);
+    }
+
+    public final <T> Gauge<T> newGauge(final String name, final Gauge<T> metric) {
+        return newGauge(name, metric, Collections.emptyMap());
+    }
+
+    public final Meter newMeter(final String name, final String eventType,
+                                final TimeUnit timeUnit, final Map<String, String> tags) {
+        return KafkaYammerMetrics.defaultRegistry().newMeter(metricName(name, tags), eventType, timeUnit);
+    }
+
+    public final Meter newMeter(final String name, final String eventType,
+                                final TimeUnit timeUnit) {
+        return newMeter(name, eventType, timeUnit, Collections.emptyMap());
+    }
+
+    public final Meter newMeter(final MetricName metricName, final String eventType, final TimeUnit timeUnit) {
+        return KafkaYammerMetrics.defaultRegistry().newMeter(metricName, eventType, timeUnit);
+    }
+
+    public final Histogram newHistogram(final String name, final boolean biased, final Map<String, String> tags) {
+        return KafkaYammerMetrics.defaultRegistry().newHistogram(metricName(name, tags), biased);
+    }
+
+    public final Histogram newHistogram(final String name) {
+        return newHistogram(name, true, Collections.emptyMap());
+    }
+
+    public final Histogram newHistogram(final String name, final boolean biased) {
+        return newHistogram(name, biased, Collections.emptyMap());
+    }
+
+    public final Histogram newHistogram(final String name, final Map<String, String> tags) {
+        return newHistogram(name, true, tags);
+    }
+
+    public final Timer newTimer(final String name, final TimeUnit durationUnit, final TimeUnit rateUnit,
+                                final Map<String, String> tags) {
+        return KafkaYammerMetrics.defaultRegistry().newTimer(metricName(name, tags), durationUnit, rateUnit);
+    }
+
+    public final Timer newTimer(final String name, final TimeUnit durationUnit, final TimeUnit rateUnit) {
+        return newTimer(name, durationUnit, rateUnit, Collections.emptyMap());
+    }
+
+    public final void removeMetric(final String name, final Map<String, String> tags) {
+        KafkaYammerMetrics.defaultRegistry().removeMetric(metricName(name, tags));
+    }
+
+    public final void removeMetric(final String name) {
+        removeMetric(name, Collections.emptyMap());
+    }
+
+    private Optional<String> toMBeanName(final Map<String, String> tags) {

Review Comment:
   The ordering of `tags` matters when creating the mbean name. Unfortunately, we did not use an ordered map previously, which has resulted in unspecified behavior in some cases. When the number of tags is small, the order may have been maintained due to the Scala implementation having special implementations for thise (eg https://www.scala-lang.org/api/2.13.x/scala/collection/immutable/Set$$Set1.html).
   
   To make this change safe, we would need to have a test that prints _all_ mbean names and verifies that they remain the same after the conversion.



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -70,7 +70,9 @@ class DelayedOperations(topicPartition: TopicPartition,
   def numDelayedDelete: Int = deleteRecords.numDelayed
 }
 
-object Partition extends KafkaMetricsGroup {
+object Partition {
+  private val metricsGroup = new KafkaMetricsGroup(classOf[Partition])

Review Comment:
   I think we can call this and other similar `val`s `metrics`. It's a bit more concise and the `Group` suffix doesn't add any particular value.



##########
server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.metrics;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+import org.apache.kafka.common.utils.Sanitizer;
+
+public class KafkaMetricsGroup {
+    private final Class<?> klass;
+
+    public KafkaMetricsGroup(final Class<?> klass) {
+        this.klass = klass;
+    }
+
+    /**
+     * Creates a new MetricName object for gauges, meters, etc. created for this
+     * metrics group.
+     * @param name Descriptive name of the metric.
+     * @param tags Additional attributes which mBean will have.
+     * @return Sanitized metric name object.
+     */
+    public MetricName metricName(final String name, final Map<String, String> tags) {
+        final String pkg;
+        if (klass.getPackage() == null) {
+            pkg = "";
+        } else {
+            pkg = klass.getPackage().getName();
+        }
+        final String simpleName = klass.getSimpleName().replaceAll("\\$$", "");
+        return explicitMetricName(pkg, simpleName, name, tags);
+    }
+
+    public final MetricName explicitMetricName(final String group, final String typeName,
+                                               final String name, final Map<String, String> tags) {
+        final StringBuilder nameBuilder = new StringBuilder();
+        nameBuilder.append(group);
+        nameBuilder.append(":type=");
+        nameBuilder.append(typeName);
+
+        if (!name.isEmpty()) {
+            nameBuilder.append(",name=");
+            nameBuilder.append(name);
+        }
+
+        final String scope = toScope(tags).orElse(null);
+        final Optional<String> tagsName = toMBeanName(tags);
+        tagsName.ifPresent(s -> nameBuilder.append(",").append(s));
+
+        return new MetricName(group, typeName, name, scope, nameBuilder.toString());
+    }
+
+    public final <T> Gauge<T> newGauge(final String name, final Gauge<T> metric, final Map<String, String> tags) {
+        return KafkaYammerMetrics.defaultRegistry().newGauge(metricName(name, tags), metric);
+    }
+
+    public final <T> Gauge<T> newGauge(final String name, final Gauge<T> metric) {
+        return newGauge(name, metric, Collections.emptyMap());
+    }
+
+    public final Meter newMeter(final String name, final String eventType,
+                                final TimeUnit timeUnit, final Map<String, String> tags) {
+        return KafkaYammerMetrics.defaultRegistry().newMeter(metricName(name, tags), eventType, timeUnit);
+    }
+
+    public final Meter newMeter(final String name, final String eventType,
+                                final TimeUnit timeUnit) {
+        return newMeter(name, eventType, timeUnit, Collections.emptyMap());
+    }
+
+    public final Meter newMeter(final MetricName metricName, final String eventType, final TimeUnit timeUnit) {
+        return KafkaYammerMetrics.defaultRegistry().newMeter(metricName, eventType, timeUnit);
+    }
+
+    public final Histogram newHistogram(final String name, final boolean biased, final Map<String, String> tags) {
+        return KafkaYammerMetrics.defaultRegistry().newHistogram(metricName(name, tags), biased);
+    }
+
+    public final Histogram newHistogram(final String name) {
+        return newHistogram(name, true, Collections.emptyMap());
+    }
+
+    public final Histogram newHistogram(final String name, final boolean biased) {
+        return newHistogram(name, biased, Collections.emptyMap());
+    }
+
+    public final Histogram newHistogram(final String name, final Map<String, String> tags) {
+        return newHistogram(name, true, tags);
+    }
+
+    public final Timer newTimer(final String name, final TimeUnit durationUnit, final TimeUnit rateUnit,
+                                final Map<String, String> tags) {
+        return KafkaYammerMetrics.defaultRegistry().newTimer(metricName(name, tags), durationUnit, rateUnit);
+    }
+
+    public final Timer newTimer(final String name, final TimeUnit durationUnit, final TimeUnit rateUnit) {
+        return newTimer(name, durationUnit, rateUnit, Collections.emptyMap());
+    }
+
+    public final void removeMetric(final String name, final Map<String, String> tags) {
+        KafkaYammerMetrics.defaultRegistry().removeMetric(metricName(name, tags));
+    }
+
+    public final void removeMetric(final String name) {
+        removeMetric(name, Collections.emptyMap());
+    }
+
+    private Optional<String> toMBeanName(final Map<String, String> tags) {
+        final List<Map.Entry<String, String>> filteredTags = tags.entrySet().stream()

Review Comment:
   We don't have to verbatim if it means unnecessary inefficiency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org