You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/03/28 19:03:23 UTC

[flink] branch master updated: [FLINK-26850][metrics] Add Metric#getMetricType

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d54921  [FLINK-26850][metrics] Add Metric#getMetricType
5d54921 is described below

commit 5d54921b60060e1722115a23711d6b686567107c
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Mar 23 14:17:27 2022 +0100

    [FLINK-26850][metrics] Add Metric#getMetricType
---
 .../java/org/apache/flink/metrics/Counter.java     |   5 +
 .../main/java/org/apache/flink/metrics/Gauge.java  |   5 +
 .../java/org/apache/flink/metrics/Histogram.java   |   5 +
 .../main/java/org/apache/flink/metrics/Meter.java  |   5 +
 .../main/java/org/apache/flink/metrics/Metric.java |   6 +-
 .../java/org/apache/flink/metrics/MetricType.java  |  29 ++++++
 .../flink/metrics/reporter/AbstractReporter.java   |  62 +++++++-----
 .../flink/metrics/datadog/DatadogHttpReporter.java |  72 ++++++++------
 .../dropwizard/ScheduledDropwizardReporter.java    |  94 ++++++++++--------
 .../flink/metrics/influxdb/AbstractReporter.java   |  62 +++++++-----
 .../org/apache/flink/metrics/jmx/JMXReporter.java  |  33 ++++---
 .../prometheus/AbstractPrometheusReporter.java     | 109 ++++++++++++---------
 .../metrics/prometheus/PrometheusReporterTest.java |   7 --
 13 files changed, 300 insertions(+), 194 deletions(-)

diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Counter.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Counter.java
index 6d54c3b..003af69 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Counter.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Counter.java
@@ -50,4 +50,9 @@ public interface Counter extends Metric {
      * @return current count
      */
     long getCount();
+
+    @Override
+    default MetricType getMetricType() {
+        return MetricType.COUNTER;
+    }
 }
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Gauge.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Gauge.java
index 2fd1796..d1bf3da 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Gauge.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Gauge.java
@@ -30,4 +30,9 @@ public interface Gauge<T> extends Metric {
      * @return calculated value
      */
     T getValue();
+
+    @Override
+    default MetricType getMetricType() {
+        return MetricType.GAUGE;
+    }
 }
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Histogram.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Histogram.java
index 914171d..cc720a4 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Histogram.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Histogram.java
@@ -49,4 +49,9 @@ public interface Histogram extends Metric {
      * @return Statistics about the currently recorded elements
      */
     HistogramStatistics getStatistics();
+
+    @Override
+    default MetricType getMetricType() {
+        return MetricType.HISTOGRAM;
+    }
 }
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Meter.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Meter.java
index 7f14b2f..822df4a 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Meter.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Meter.java
@@ -47,4 +47,9 @@ public interface Meter extends Metric {
      * @return number of events marked on the meter
      */
     long getCount();
+
+    @Override
+    default MetricType getMetricType() {
+        return MetricType.METER;
+    }
 }
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Metric.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Metric.java
index b7def6a..b6907aa 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Metric.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Metric.java
@@ -22,4 +22,8 @@ import org.apache.flink.annotation.Public;
 
 /** Common super interface for all metrics. */
 @Public
-public interface Metric {}
+public interface Metric {
+    default MetricType getMetricType() {
+        throw new UnsupportedOperationException("Custom metric types are not supported.");
+    }
+}
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricType.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricType.java
new file mode 100644
index 0000000..a128128
--- /dev/null
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics;
+
+import org.apache.flink.annotation.Public;
+
+/** Enum describing the different metric types. */
+@Public
+public enum MetricType {
+    COUNTER,
+    METER,
+    GAUGE,
+    HISTOGRAM
+}
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
index dcce16c..2e1c0ed 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
@@ -48,19 +48,24 @@ public abstract class AbstractReporter implements MetricReporter, CharacterFilte
         final String name = group.getMetricIdentifier(metricName, this);
 
         synchronized (this) {
-            if (metric instanceof Counter) {
-                counters.put((Counter) metric, name);
-            } else if (metric instanceof Gauge) {
-                gauges.put((Gauge<?>) metric, name);
-            } else if (metric instanceof Histogram) {
-                histograms.put((Histogram) metric, name);
-            } else if (metric instanceof Meter) {
-                meters.put((Meter) metric, name);
-            } else {
-                log.warn(
-                        "Cannot add unknown metric type {}. This indicates that the reporter "
-                                + "does not support this metric type.",
-                        metric.getClass().getName());
+            switch (metric.getMetricType()) {
+                case COUNTER:
+                    counters.put((Counter) metric, name);
+                    break;
+                case GAUGE:
+                    gauges.put((Gauge<?>) metric, name);
+                    break;
+                case HISTOGRAM:
+                    histograms.put((Histogram) metric, name);
+                    break;
+                case METER:
+                    meters.put((Meter) metric, name);
+                    break;
+                default:
+                    log.warn(
+                            "Cannot add unknown metric type {}. This indicates that the reporter "
+                                    + "does not support this metric type.",
+                            metric.getClass().getName());
             }
         }
     }
@@ -68,19 +73,24 @@ public abstract class AbstractReporter implements MetricReporter, CharacterFilte
     @Override
     public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
         synchronized (this) {
-            if (metric instanceof Counter) {
-                counters.remove(metric);
-            } else if (metric instanceof Gauge) {
-                gauges.remove(metric);
-            } else if (metric instanceof Histogram) {
-                histograms.remove(metric);
-            } else if (metric instanceof Meter) {
-                meters.remove(metric);
-            } else {
-                log.warn(
-                        "Cannot remove unknown metric type {}. This indicates that the reporter "
-                                + "does not support this metric type.",
-                        metric.getClass().getName());
+            switch (metric.getMetricType()) {
+                case COUNTER:
+                    counters.remove(metric);
+                    break;
+                case GAUGE:
+                    gauges.remove(metric);
+                    break;
+                case HISTOGRAM:
+                    histograms.remove(metric);
+                    break;
+                case METER:
+                    meters.remove(metric);
+                    break;
+                default:
+                    log.warn(
+                            "Cannot remove unknown metric type {}. This indicates that the reporter "
+                                    + "does not support this metric type.",
+                            metric.getClass().getName());
             }
         }
     }
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
index 2621245..c7cab1d 100644
--- a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
@@ -77,42 +77,52 @@ public class DatadogHttpReporter implements MetricReporter, Scheduled {
         tags.addAll(getTagsFromMetricGroup(group));
         String host = getHostFromMetricGroup(group);
 
-        if (metric instanceof Counter) {
-            Counter c = (Counter) metric;
-            counters.put(c, new DCounter(c, name, host, tags, clock));
-        } else if (metric instanceof Gauge) {
-            Gauge g = (Gauge) metric;
-            gauges.put(g, new DGauge(g, name, host, tags, clock));
-        } else if (metric instanceof Meter) {
-            Meter m = (Meter) metric;
-            // Only consider rate
-            meters.put(m, new DMeter(m, name, host, tags, clock));
-        } else if (metric instanceof Histogram) {
-            Histogram h = (Histogram) metric;
-            histograms.put(h, new DHistogram(h, name, host, tags, clock));
-        } else {
-            LOGGER.warn(
-                    "Cannot add unknown metric type {}. This indicates that the reporter "
-                            + "does not support this metric type.",
-                    metric.getClass().getName());
+        switch (metric.getMetricType()) {
+            case COUNTER:
+                Counter c = (Counter) metric;
+                counters.put(c, new DCounter(c, name, host, tags, clock));
+                break;
+            case GAUGE:
+                Gauge g = (Gauge) metric;
+                gauges.put(g, new DGauge(g, name, host, tags, clock));
+                break;
+            case METER:
+                Meter m = (Meter) metric;
+                // Only consider rate
+                meters.put(m, new DMeter(m, name, host, tags, clock));
+                break;
+            case HISTOGRAM:
+                Histogram h = (Histogram) metric;
+                histograms.put(h, new DHistogram(h, name, host, tags, clock));
+                break;
+            default:
+                LOGGER.warn(
+                        "Cannot add unknown metric type {}. This indicates that the reporter "
+                                + "does not support this metric type.",
+                        metric.getClass().getName());
         }
     }
 
     @Override
     public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
-        if (metric instanceof Counter) {
-            counters.remove(metric);
-        } else if (metric instanceof Gauge) {
-            gauges.remove(metric);
-        } else if (metric instanceof Meter) {
-            meters.remove(metric);
-        } else if (metric instanceof Histogram) {
-            histograms.remove(metric);
-        } else {
-            LOGGER.warn(
-                    "Cannot remove unknown metric type {}. This indicates that the reporter "
-                            + "does not support this metric type.",
-                    metric.getClass().getName());
+        switch (metric.getMetricType()) {
+            case COUNTER:
+                counters.remove(metric);
+                break;
+            case GAUGE:
+                gauges.remove(metric);
+                break;
+            case METER:
+                meters.remove(metric);
+                break;
+            case HISTOGRAM:
+                histograms.remove(metric);
+                break;
+            default:
+                LOGGER.warn(
+                        "Cannot remove unknown metric type {}. This indicates that the reporter "
+                                + "does not support this metric type.",
+                        metric.getClass().getName());
         }
     }
 
diff --git a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
index 855c68b..3b8cd6a 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
@@ -127,38 +127,43 @@ public abstract class ScheduledDropwizardReporter
         final String fullName = group.getMetricIdentifier(metricName, this);
 
         synchronized (this) {
-            if (metric instanceof Counter) {
-                counters.put((Counter) metric, fullName);
-                registry.register(fullName, new FlinkCounterWrapper((Counter) metric));
-            } else if (metric instanceof Gauge) {
-                gauges.put((Gauge<?>) metric, fullName);
-                registry.register(fullName, FlinkGaugeWrapper.fromGauge((Gauge<?>) metric));
-            } else if (metric instanceof Histogram) {
-                Histogram histogram = (Histogram) metric;
-                histograms.put(histogram, fullName);
-
-                if (histogram instanceof DropwizardHistogramWrapper) {
-                    registry.register(
-                            fullName,
-                            ((DropwizardHistogramWrapper) histogram).getDropwizardHistogram());
-                } else {
-                    registry.register(fullName, new FlinkHistogramWrapper(histogram));
-                }
-            } else if (metric instanceof Meter) {
-                Meter meter = (Meter) metric;
-                meters.put(meter, fullName);
-
-                if (meter instanceof DropwizardMeterWrapper) {
-                    registry.register(
-                            fullName, ((DropwizardMeterWrapper) meter).getDropwizardMeter());
-                } else {
-                    registry.register(fullName, new FlinkMeterWrapper(meter));
-                }
-            } else {
-                log.warn(
-                        "Cannot add metric of type {}. This indicates that the reporter "
-                                + "does not support this metric type.",
-                        metric.getClass().getName());
+            switch (metric.getMetricType()) {
+                case COUNTER:
+                    counters.put((Counter) metric, fullName);
+                    registry.register(fullName, new FlinkCounterWrapper((Counter) metric));
+                    break;
+                case GAUGE:
+                    gauges.put((Gauge<?>) metric, fullName);
+                    registry.register(fullName, FlinkGaugeWrapper.fromGauge((Gauge<?>) metric));
+                    break;
+                case HISTOGRAM:
+                    Histogram histogram = (Histogram) metric;
+                    histograms.put(histogram, fullName);
+
+                    if (histogram instanceof DropwizardHistogramWrapper) {
+                        registry.register(
+                                fullName,
+                                ((DropwizardHistogramWrapper) histogram).getDropwizardHistogram());
+                    } else {
+                        registry.register(fullName, new FlinkHistogramWrapper(histogram));
+                    }
+                    break;
+                case METER:
+                    Meter meter = (Meter) metric;
+                    meters.put(meter, fullName);
+
+                    if (meter instanceof DropwizardMeterWrapper) {
+                        registry.register(
+                                fullName, ((DropwizardMeterWrapper) meter).getDropwizardMeter());
+                    } else {
+                        registry.register(fullName, new FlinkMeterWrapper(meter));
+                    }
+                    break;
+                default:
+                    log.warn(
+                            "Cannot add metric of type {}. This indicates that the reporter "
+                                    + "does not support this metric type.",
+                            metric.getClass().getName());
             }
         }
     }
@@ -168,16 +173,21 @@ public abstract class ScheduledDropwizardReporter
         synchronized (this) {
             String fullName;
 
-            if (metric instanceof Counter) {
-                fullName = counters.remove(metric);
-            } else if (metric instanceof Gauge) {
-                fullName = gauges.remove(metric);
-            } else if (metric instanceof Histogram) {
-                fullName = histograms.remove(metric);
-            } else if (metric instanceof Meter) {
-                fullName = meters.remove(metric);
-            } else {
-                fullName = null;
+            switch (metric.getMetricType()) {
+                case COUNTER:
+                    fullName = counters.remove(metric);
+                    break;
+                case GAUGE:
+                    fullName = gauges.remove(metric);
+                    break;
+                case HISTOGRAM:
+                    fullName = histograms.remove(metric);
+                    break;
+                case METER:
+                    fullName = meters.remove(metric);
+                    break;
+                default:
+                    fullName = null;
             }
 
             if (fullName != null) {
diff --git a/flink-metrics/flink-metrics-influxdb/src/main/java/org/apache/flink/metrics/influxdb/AbstractReporter.java b/flink-metrics/flink-metrics-influxdb/src/main/java/org/apache/flink/metrics/influxdb/AbstractReporter.java
index 9a86439..4f62786 100644
--- a/flink-metrics/flink-metrics-influxdb/src/main/java/org/apache/flink/metrics/influxdb/AbstractReporter.java
+++ b/flink-metrics/flink-metrics-influxdb/src/main/java/org/apache/flink/metrics/influxdb/AbstractReporter.java
@@ -55,19 +55,24 @@ abstract class AbstractReporter<MetricInfo> implements MetricReporter {
     public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
         final MetricInfo metricInfo = metricInfoProvider.getMetricInfo(metricName, group);
         synchronized (this) {
-            if (metric instanceof Counter) {
-                counters.put((Counter) metric, metricInfo);
-            } else if (metric instanceof Gauge) {
-                gauges.put((Gauge<?>) metric, metricInfo);
-            } else if (metric instanceof Histogram) {
-                histograms.put((Histogram) metric, metricInfo);
-            } else if (metric instanceof Meter) {
-                meters.put((Meter) metric, metricInfo);
-            } else {
-                log.warn(
-                        "Cannot add unknown metric type {}. This indicates that the reporter "
-                                + "does not support this metric type.",
-                        metric.getClass().getName());
+            switch (metric.getMetricType()) {
+                case COUNTER:
+                    counters.put((Counter) metric, metricInfo);
+                    break;
+                case GAUGE:
+                    gauges.put((Gauge<?>) metric, metricInfo);
+                    break;
+                case HISTOGRAM:
+                    histograms.put((Histogram) metric, metricInfo);
+                    break;
+                case METER:
+                    meters.put((Meter) metric, metricInfo);
+                    break;
+                default:
+                    log.warn(
+                            "Cannot add unknown metric type {}. This indicates that the reporter "
+                                    + "does not support this metric type.",
+                            metric.getClass().getName());
             }
         }
     }
@@ -75,19 +80,24 @@ abstract class AbstractReporter<MetricInfo> implements MetricReporter {
     @Override
     public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
         synchronized (this) {
-            if (metric instanceof Counter) {
-                counters.remove(metric);
-            } else if (metric instanceof Gauge) {
-                gauges.remove(metric);
-            } else if (metric instanceof Histogram) {
-                histograms.remove(metric);
-            } else if (metric instanceof Meter) {
-                meters.remove(metric);
-            } else {
-                log.warn(
-                        "Cannot remove unknown metric type {}. This indicates that the reporter "
-                                + "does not support this metric type.",
-                        metric.getClass().getName());
+            switch (metric.getMetricType()) {
+                case COUNTER:
+                    counters.remove(metric);
+                    break;
+                case GAUGE:
+                    gauges.remove(metric);
+                    break;
+                case HISTOGRAM:
+                    histograms.remove(metric);
+                    break;
+                case METER:
+                    meters.remove(metric);
+                    break;
+                default:
+                    log.warn(
+                            "Cannot remove unknown metric type {}. This indicates that the reporter "
+                                    + "does not support this metric type.",
+                            metric.getClass().getName());
             }
         }
     }
diff --git a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
index 3dd9ac0..1c311f7 100644
--- a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
+++ b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
@@ -124,20 +124,25 @@ public class JMXReporter implements MetricReporter {
             return;
         }
 
-        if (metric instanceof Gauge) {
-            jmxMetric = new JmxGauge((Gauge<?>) metric);
-        } else if (metric instanceof Counter) {
-            jmxMetric = new JmxCounter((Counter) metric);
-        } else if (metric instanceof Histogram) {
-            jmxMetric = new JmxHistogram((Histogram) metric);
-        } else if (metric instanceof Meter) {
-            jmxMetric = new JmxMeter((Meter) metric);
-        } else {
-            LOG.error(
-                    "Cannot add unknown metric type: {}. This indicates that the metric type "
-                            + "is not supported by this reporter.",
-                    metric.getClass().getName());
-            return;
+        switch (metric.getMetricType()) {
+            case GAUGE:
+                jmxMetric = new JmxGauge((Gauge<?>) metric);
+                break;
+            case COUNTER:
+                jmxMetric = new JmxCounter((Counter) metric);
+                break;
+            case HISTOGRAM:
+                jmxMetric = new JmxHistogram((Histogram) metric);
+                break;
+            case METER:
+                jmxMetric = new JmxMeter((Meter) metric);
+                break;
+            default:
+                LOG.error(
+                        "Cannot add unknown metric type: {}. This indicates that the metric type "
+                                + "is not supported by this reporter.",
+                        metric.getClass().getName());
+                return;
         }
 
         try {
diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java
index 14e863c..47af68c 100644
--- a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java
+++ b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java
@@ -156,62 +156,77 @@ public abstract class AbstractPrometheusReporter implements MetricReporter {
             String scopedMetricName,
             String helpString) {
         Collector collector;
-        if (metric instanceof Gauge || metric instanceof Counter || metric instanceof Meter) {
-            collector =
-                    io.prometheus.client.Gauge.build()
-                            .name(scopedMetricName)
-                            .help(helpString)
-                            .labelNames(toArray(dimensionKeys))
-                            .create();
-        } else if (metric instanceof Histogram) {
-            collector =
-                    new HistogramSummaryProxy(
-                            (Histogram) metric,
-                            scopedMetricName,
-                            helpString,
-                            dimensionKeys,
-                            dimensionValues);
-        } else {
-            log.warn(
-                    "Cannot create collector for unknown metric type: {}. This indicates that the metric type is not supported by this reporter.",
-                    metric.getClass().getName());
-            collector = null;
+        switch (metric.getMetricType()) {
+            case GAUGE:
+            case COUNTER:
+            case METER:
+                collector =
+                        io.prometheus.client.Gauge.build()
+                                .name(scopedMetricName)
+                                .help(helpString)
+                                .labelNames(toArray(dimensionKeys))
+                                .create();
+                break;
+            case HISTOGRAM:
+                collector =
+                        new HistogramSummaryProxy(
+                                (Histogram) metric,
+                                scopedMetricName,
+                                helpString,
+                                dimensionKeys,
+                                dimensionValues);
+                break;
+            default:
+                log.warn(
+                        "Cannot create collector for unknown metric type: {}. This indicates that the metric type is not supported by this reporter.",
+                        metric.getClass().getName());
+                collector = null;
         }
         return collector;
     }
 
     private void addMetric(Metric metric, List<String> dimensionValues, Collector collector) {
-        if (metric instanceof Gauge) {
-            ((io.prometheus.client.Gauge) collector)
-                    .setChild(gaugeFrom((Gauge) metric), toArray(dimensionValues));
-        } else if (metric instanceof Counter) {
-            ((io.prometheus.client.Gauge) collector)
-                    .setChild(gaugeFrom((Counter) metric), toArray(dimensionValues));
-        } else if (metric instanceof Meter) {
-            ((io.prometheus.client.Gauge) collector)
-                    .setChild(gaugeFrom((Meter) metric), toArray(dimensionValues));
-        } else if (metric instanceof Histogram) {
-            ((HistogramSummaryProxy) collector).addChild((Histogram) metric, dimensionValues);
-        } else {
-            log.warn(
-                    "Cannot add unknown metric type: {}. This indicates that the metric type is not supported by this reporter.",
-                    metric.getClass().getName());
+        switch (metric.getMetricType()) {
+            case GAUGE:
+                ((io.prometheus.client.Gauge) collector)
+                        .setChild(gaugeFrom((Gauge) metric), toArray(dimensionValues));
+                break;
+            case COUNTER:
+                ((io.prometheus.client.Gauge) collector)
+                        .setChild(gaugeFrom((Counter) metric), toArray(dimensionValues));
+                break;
+            case METER:
+                ((io.prometheus.client.Gauge) collector)
+                        .setChild(gaugeFrom((Meter) metric), toArray(dimensionValues));
+                break;
+            case HISTOGRAM:
+                ((HistogramSummaryProxy) collector).addChild((Histogram) metric, dimensionValues);
+                break;
+            default:
+                log.warn(
+                        "Cannot add unknown metric type: {}. This indicates that the metric type is not supported by this reporter.",
+                        metric.getClass().getName());
         }
     }
 
     private void removeMetric(Metric metric, List<String> dimensionValues, Collector collector) {
-        if (metric instanceof Gauge) {
-            ((io.prometheus.client.Gauge) collector).remove(toArray(dimensionValues));
-        } else if (metric instanceof Counter) {
-            ((io.prometheus.client.Gauge) collector).remove(toArray(dimensionValues));
-        } else if (metric instanceof Meter) {
-            ((io.prometheus.client.Gauge) collector).remove(toArray(dimensionValues));
-        } else if (metric instanceof Histogram) {
-            ((HistogramSummaryProxy) collector).remove(dimensionValues);
-        } else {
-            log.warn(
-                    "Cannot remove unknown metric type: {}. This indicates that the metric type is not supported by this reporter.",
-                    metric.getClass().getName());
+        switch (metric.getMetricType()) {
+            case GAUGE:
+                ((io.prometheus.client.Gauge) collector).remove(toArray(dimensionValues));
+                break;
+            case COUNTER:
+                ((io.prometheus.client.Gauge) collector).remove(toArray(dimensionValues));
+                break;
+            case METER:
+                ((io.prometheus.client.Gauge) collector).remove(toArray(dimensionValues));
+                break;
+            case HISTOGRAM:
+                ((HistogramSummaryProxy) collector).remove(dimensionValues);
+                break;
+            default:
+                log.warn(
+                        "Cannot remove unknown metric type: {}. This indicates that the metric type is not supported by this reporter.",
+                        metric.getClass().getName());
         }
     }
 
diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
index 6273ae0..45d2b19 100644
--- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
+++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
@@ -269,13 +269,6 @@ class PrometheusReporterTest {
     }
 
     @Test
-    void addingUnknownMetricTypeDoesNotThrowException() {
-        class SomeMetricType implements Metric {}
-
-        reporter.notifyOfAddedMetric(new SomeMetricType(), "name", metricGroup);
-    }
-
-    @Test
     void cannotStartTwoReportersOnSamePort() throws Exception {
         ReporterSetup setup1 = createReporterSetup("test1", portRangeProvider.next());