You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/08/05 13:10:24 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-28749] Include resource type in KubernetesOperatorResourceNamespaceMetricGroup

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new c91ebbd1 [FLINK-28749] Include resource type in KubernetesOperatorResourceNamespaceMetricGroup
c91ebbd1 is described below

commit c91ebbd1ae989749948dc6de69255be70c5b7557
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Fri Aug 5 15:10:19 2022 +0200

    [FLINK-28749] Include resource type in KubernetesOperatorResourceNamespaceMetricGroup
---
 .../kubernetes_operator_metric_configuration.html  |   4 +-
 .../operator/metrics/FlinkDeploymentMetrics.java   |   6 +-
 .../operator/metrics/FlinkSessionJobMetrics.java   |   4 +-
 .../metrics/KubernetesOperatorMetricGroup.java     |  11 +-
 .../metrics/KubernetesOperatorMetricOptions.java   |   5 +-
 .../KubernetesResourceNamespaceMetricGroup.java    |   9 +-
 .../KubernetesResourceNamespaceScopeFormat.java    |  12 ++-
 .../metrics/KubernetesResourceScopeFormat.java     |  11 +-
 .../operator/metrics/OperatorJosdkMetrics.java     |  90 +++++++++++-----
 .../metrics/lifecycle/LifecycleMetrics.java        |  15 +--
 .../metrics/FlinkDeploymentMetricsTest.java        |  69 ++++++++++---
 .../metrics/FlinkSessionJobMetricsTest.java        |  12 ++-
 .../metrics/KubernetesOperatorMetricGroupTest.java |  26 +++--
 .../operator/metrics/OperatorJosdkMetricsTest.java | 113 +++++++++++----------
 .../operator/metrics/TestingMetricListener.java    |  23 ++++-
 15 files changed, 271 insertions(+), 139 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
index 74a02191..57859b88 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
@@ -52,13 +52,13 @@
         </tr>
         <tr>
             <td><h5>metrics.scope.k8soperator.resource</h5></td>
-            <td style="word-wrap: break-word;">"&lt;host&gt;.k8soperator.&lt;namespace&gt;.&lt;name&gt;.resource.&lt;resourcens&gt;.&lt;resourcename&gt;"</td>
+            <td style="word-wrap: break-word;">"&lt;host&gt;.k8soperator.&lt;namespace&gt;.&lt;name&gt;.resource.&lt;resourcens&gt;.&lt;resourcename&gt;.&lt;resourcetype&gt;"</td>
             <td>String</td>
             <td>Defines the scope format string that is applied to all metrics scoped to the kubernetes operator resource.</td>
         </tr>
         <tr>
             <td><h5>metrics.scope.k8soperator.resourcens</h5></td>
-            <td style="word-wrap: break-word;">"&lt;host&gt;.k8soperator.&lt;namespace&gt;.&lt;name&gt;.namespace.&lt;resourcens&gt;"</td>
+            <td style="word-wrap: break-word;">"&lt;host&gt;.k8soperator.&lt;namespace&gt;.&lt;name&gt;.namespace.&lt;resourcens&gt;.&lt;resourcetype&gt;"</td>
             <td>String</td>
             <td>Defines the scope format string that is applied to all metrics scoped to the kubernetes operator resource namespace.</td>
         </tr>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
index 0c8f3826..5ec4f42f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
@@ -32,7 +32,6 @@ public class FlinkDeploymentMetrics implements CustomResourceMetrics<FlinkDeploy
     private final Configuration configuration;
     private final Map<String, Map<JobManagerDeploymentStatus, Set<String>>> deployments =
             new ConcurrentHashMap<>();
-    public static final String DEPLOYMENT_GROUP_NAME = FlinkDeployment.class.getSimpleName();
     public static final String STATUS_GROUP_NAME = "JmDeploymentStatus";
     public static final String COUNTER_NAME = "Count";
 
@@ -68,8 +67,7 @@ public class FlinkDeploymentMetrics implements CustomResourceMetrics<FlinkDeploy
 
     private void initNamespaceDeploymentCounts(String ns) {
         parentMetricGroup
-                .createResourceNamespaceGroup(configuration, ns)
-                .addGroup(DEPLOYMENT_GROUP_NAME)
+                .createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns)
                 .gauge(
                         COUNTER_NAME,
                         () -> deployments.get(ns).values().stream().mapToInt(Set::size).sum());
@@ -78,7 +76,7 @@ public class FlinkDeploymentMetrics implements CustomResourceMetrics<FlinkDeploy
     private void initNamespaceStatusCounts(String ns) {
         for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) {
             parentMetricGroup
-                    .createResourceNamespaceGroup(configuration, ns)
+                    .createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns)
                     .addGroup(STATUS_GROUP_NAME)
                     .addGroup(status.toString())
                     .gauge(COUNTER_NAME, () -> deployments.get(ns).get(status).size());
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetrics.java
index 1d099519..cf43254b 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetrics.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetrics.java
@@ -30,7 +30,6 @@ public class FlinkSessionJobMetrics implements CustomResourceMetrics<FlinkSessio
     private final KubernetesOperatorMetricGroup parentMetricGroup;
     private final Configuration configuration;
     private final Map<String, Set<String>> sessionJobs = new ConcurrentHashMap<>();
-    public static final String METRIC_GROUP_NAME = FlinkSessionJob.class.getSimpleName();
     public static final String COUNTER_NAME = "Count";
 
     public FlinkSessionJobMetrics(
@@ -62,8 +61,7 @@ public class FlinkSessionJobMetrics implements CustomResourceMetrics<FlinkSessio
 
     private void initNamespaceSessionJobCounts(String ns) {
         parentMetricGroup
-                .createResourceNamespaceGroup(configuration, ns)
-                .addGroup(METRIC_GROUP_NAME)
+                .createResourceNamespaceGroup(configuration, FlinkSessionJob.class, ns)
                 .gauge(COUNTER_NAME, () -> sessionJobs.get(ns).size());
     }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
index a410d6f8..210af978 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.metrics;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
@@ -46,13 +47,17 @@ public class KubernetesOperatorMetricGroup extends AbstractMetricGroup<AbstractM
     }
 
     public KubernetesResourceNamespaceMetricGroup createResourceNamespaceGroup(
-            Configuration config, String resourceNs) {
+            Configuration config,
+            Class<? extends AbstractFlinkResource> resourceClass,
+            String resourceNs) {
+        var resourceType = resourceClass.getSimpleName();
         return new KubernetesResourceNamespaceMetricGroup(
                 registry,
                 this,
                 KubernetesResourceNamespaceScopeFormat.fromConfig(config)
-                        .formatScope(namespace, name, hostname, resourceNs),
-                resourceNs);
+                        .formatScope(namespace, name, hostname, resourceNs, resourceType),
+                resourceNs,
+                resourceType);
     }
 
     public static KubernetesOperatorMetricGroup create(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
index ae014b9f..9da0b360 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
@@ -82,7 +82,8 @@ public class KubernetesOperatorMetricOptions {
     public static final ConfigOption<String> SCOPE_NAMING_KUBERNETES_OPERATOR_RESOURCENS =
             ConfigOptions.key("metrics.scope.k8soperator.resourcens")
                     .stringType()
-                    .defaultValue("<host>.k8soperator.<namespace>.<name>.namespace.<resourcens>")
+                    .defaultValue(
+                            "<host>.k8soperator.<namespace>.<name>.namespace.<resourcens>.<resourcetype>")
                     .withDescription(
                             "Defines the scope format string that is applied to all metrics scoped to the kubernetes operator resource namespace.");
 
@@ -90,7 +91,7 @@ public class KubernetesOperatorMetricOptions {
             ConfigOptions.key("metrics.scope.k8soperator.resource")
                     .stringType()
                     .defaultValue(
-                            "<host>.k8soperator.<namespace>.<name>.resource.<resourcens>.<resourcename>")
+                            "<host>.k8soperator.<namespace>.<name>.resource.<resourcens>.<resourcename>.<resourcetype>")
                     .withDescription(
                             "Defines the scope format string that is applied to all metrics scoped to the kubernetes operator resource.");
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceMetricGroup.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceMetricGroup.java
index bd6f8e22..b3dd9513 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceMetricGroup.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceMetricGroup.java
@@ -30,14 +30,17 @@ public class KubernetesResourceNamespaceMetricGroup
         extends AbstractMetricGroup<KubernetesOperatorMetricGroup> {
 
     private final String resourceNs;
+    private final String resourceType;
 
     protected KubernetesResourceNamespaceMetricGroup(
             MetricRegistry registry,
             KubernetesOperatorMetricGroup parent,
             String[] scope,
-            String resourceNs) {
+            String resourceNs,
+            String resourceType) {
         super(registry, scope, parent);
         this.resourceNs = resourceNs;
+        this.resourceType = resourceType;
     }
 
     public KubernetesResourceMetricGroup createResourceNamespaceGroup(
@@ -51,13 +54,15 @@ public class KubernetesResourceNamespaceMetricGroup
                                 parent.name,
                                 parent.hostname,
                                 resourceNs,
-                                resourceName),
+                                resourceName,
+                                resourceType),
                 resourceName);
     }
 
     @Override
     protected final void putVariables(Map<String, String> variables) {
         variables.put(KubernetesResourceNamespaceScopeFormat.RESOURCE_NS, resourceNs);
+        variables.put(KubernetesResourceNamespaceScopeFormat.RESOURCE_TYPE, resourceType);
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceScopeFormat.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceScopeFormat.java
index 17b6ffe1..d421832f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceScopeFormat.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceScopeFormat.java
@@ -28,14 +28,20 @@ import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorSco
 public class KubernetesResourceNamespaceScopeFormat extends ScopeFormat {
 
     public static final String RESOURCE_NS = asVariable("resourcens");
+    public static final String RESOURCE_TYPE = asVariable("resourcetype");
 
     public KubernetesResourceNamespaceScopeFormat(String format) {
-        super(format, null, new String[] {NAMESPACE, NAME, SCOPE_HOST, RESOURCE_NS});
+        super(format, null, new String[] {NAMESPACE, NAME, SCOPE_HOST, RESOURCE_NS, RESOURCE_TYPE});
     }
 
-    public String[] formatScope(String namespace, String name, String hostname, String resourceNs) {
+    public String[] formatScope(
+            String namespace,
+            String name,
+            String hostname,
+            String resourceNs,
+            String resourceType) {
         final String[] template = copyTemplate();
-        final String[] values = {namespace, name, hostname, resourceNs};
+        final String[] values = {namespace, name, hostname, resourceNs, resourceType};
         return bindVariables(template, values);
     }
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceScopeFormat.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceScopeFormat.java
index 7f05ab63..0c93e9aa 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceScopeFormat.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceScopeFormat.java
@@ -24,6 +24,7 @@ import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMet
 import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorScopeFormat.NAME;
 import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorScopeFormat.NAMESPACE;
 import static org.apache.flink.kubernetes.operator.metrics.KubernetesResourceNamespaceScopeFormat.RESOURCE_NS;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesResourceNamespaceScopeFormat.RESOURCE_TYPE;
 
 /** Format for metrics. * */
 public class KubernetesResourceScopeFormat extends ScopeFormat {
@@ -31,7 +32,10 @@ public class KubernetesResourceScopeFormat extends ScopeFormat {
     public static final String RESOURCE = asVariable("resourcename");
 
     public KubernetesResourceScopeFormat(String format) {
-        super(format, null, new String[] {NAMESPACE, NAME, SCOPE_HOST, RESOURCE_NS, RESOURCE});
+        super(
+                format,
+                null,
+                new String[] {NAMESPACE, NAME, SCOPE_HOST, RESOURCE_NS, RESOURCE, RESOURCE_TYPE});
     }
 
     public String[] formatScope(
@@ -39,9 +43,10 @@ public class KubernetesResourceScopeFormat extends ScopeFormat {
             String name,
             String hostname,
             String resourceNs,
-            String resourceName) {
+            String resourceName,
+            String resourceType) {
         final String[] template = copyTemplate();
-        final String[] values = {namespace, name, hostname, resourceNs, resourceName};
+        final String[] values = {namespace, name, hostname, resourceNs, resourceName, resourceType};
         return bindVariables(template, values);
     }
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
index fa09fb97..7c2e85b7 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
@@ -18,8 +18,9 @@
 package org.apache.flink.kubernetes.operator.metrics;
 
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
-import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
-import org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.MetricGroup;
@@ -28,15 +29,19 @@ import org.apache.flink.util.clock.Clock;
 import org.apache.flink.util.clock.SystemClock;
 
 import io.javaoperatorsdk.operator.api.monitoring.Metrics;
+import io.javaoperatorsdk.operator.api.reconciler.Constants;
 import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.processing.GroupVersionKind;
 import io.javaoperatorsdk.operator.processing.event.Event;
 import io.javaoperatorsdk.operator.processing.event.ResourceID;
 import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
+import org.jetbrains.annotations.NotNull;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
@@ -62,13 +67,6 @@ public class OperatorJosdkMetrics implements Metrics {
     private final Map<List<String>, Histogram> histograms = new ConcurrentHashMap<>();
     private final Map<List<String>, Counter> counters = new ConcurrentHashMap<>();
 
-    private static final Map<String, String> CONTROLLERS =
-            Map.of(
-                    FlinkDeploymentController.class.getSimpleName().toLowerCase(),
-                    "FlinkDeployment",
-                    FlinkSessionJobController.class.getSimpleName().toLowerCase(),
-                    "FlinkSessionJob");
-
     public OperatorJosdkMetrics(
             KubernetesOperatorMetricGroup operatorMetricGroup, FlinkConfigManager configManager) {
         this.operatorMetricGroup = operatorMetricGroup;
@@ -90,12 +88,13 @@ public class OperatorJosdkMetrics implements Metrics {
     }
 
     @Override
-    public void receivedEvent(Event event) {
+    public void receivedEvent(Event event, Map<String, Object> metadata) {
         if (event instanceof ResourceEvent) {
             var action = ((ResourceEvent) event).getAction();
-            counter(getResourceMg(event.getRelatedCustomResourceID()), RESOURCE, EVENT).inc();
+            counter(getResourceMg(event.getRelatedCustomResourceID(), metadata), RESOURCE, EVENT)
+                    .inc();
             counter(
-                            getResourceMg(event.getRelatedCustomResourceID()),
+                            getResourceMg(event.getRelatedCustomResourceID(), metadata),
                             RESOURCE,
                             EVENT,
                             action.name())
@@ -104,32 +103,34 @@ public class OperatorJosdkMetrics implements Metrics {
     }
 
     @Override
-    public void cleanupDoneFor(ResourceID resourceID) {
-        counter(getResourceMg(resourceID), RECONCILIATION, "cleanup").inc();
+    public void cleanupDoneFor(ResourceID resourceID, Map<String, Object> metadata) {
+        counter(getResourceMg(resourceID, metadata), RECONCILIATION, "cleanup").inc();
     }
 
     @Override
-    public void reconcileCustomResource(ResourceID resourceID, RetryInfo retryInfoNullable) {
-        counter(getResourceMg(resourceID), RECONCILIATION).inc();
+    public void reconcileCustomResource(
+            ResourceID resourceID, RetryInfo retryInfoNullable, Map<String, Object> metadata) {
+        counter(getResourceMg(resourceID, metadata), RECONCILIATION).inc();
 
         if (retryInfoNullable != null) {
-            counter(getResourceMg(resourceID), RECONCILIATION, "retries").inc();
+            counter(getResourceMg(resourceID, metadata), RECONCILIATION, "retries").inc();
         }
     }
 
     @Override
-    public void finishedReconciliation(ResourceID resourceID) {
-        counter(getResourceMg(resourceID), RECONCILIATION, "finished").inc();
+    public void finishedReconciliation(ResourceID resourceID, Map<String, Object> metadata) {
+        counter(getResourceMg(resourceID, metadata), RECONCILIATION, "finished").inc();
     }
 
     @Override
-    public void failedReconciliation(ResourceID resourceID, Exception exception) {
-        counter(getResourceMg(resourceID), RECONCILIATION, "failed").inc();
+    public void failedReconciliation(
+            ResourceID resourceID, Exception exception, Map<String, Object> metadata) {
+        counter(getResourceMg(resourceID, metadata), RECONCILIATION, "failed").inc();
     }
 
     @Override
     public <T extends Map<?, ?>> T monitorSizeOf(T map, String name) {
-        operatorMetricGroup.addGroup(name).gauge("size", map::size);
+        operatorMetricGroup.addGroup(OPERATOR_SDK_GROUP).addGroup(name).gauge("size", map::size);
         return map;
     }
 
@@ -138,7 +139,9 @@ public class OperatorJosdkMetrics implements Metrics {
         return histograms.computeIfAbsent(
                 groups,
                 k -> {
-                    var group = operatorMetricGroup.addGroup(OPERATOR_SDK_GROUP);
+                    var group =
+                            getResourceNsMg(execution.resourceID(), execution.metadata())
+                                    .addGroup(OPERATOR_SDK_GROUP);
                     for (String mg : groups) {
                         group = group.addGroup(mg);
                     }
@@ -151,8 +154,7 @@ public class OperatorJosdkMetrics implements Metrics {
     }
 
     private List<String> getHistoGroups(ControllerExecution<?> execution, String name) {
-        return List.of(
-                CONTROLLERS.get(execution.controllerName().toLowerCase()), execution.name(), name);
+        return List.of(execution.name(), name);
     }
 
     private long toSeconds(long startTime) {
@@ -176,20 +178,52 @@ public class OperatorJosdkMetrics implements Metrics {
                 });
     }
 
-    private KubernetesResourceNamespaceMetricGroup getResourceNsMg(ResourceID resourceID) {
+    private KubernetesResourceNamespaceMetricGroup getResourceNsMg(
+            ResourceID resourceID, Map<String, Object> metadata) {
+        Class<? extends AbstractFlinkResource<?, ?>> resourceClass =
+                getResourceClass(metadata)
+                        .orElseThrow(
+                                () ->
+                                        new RuntimeException(
+                                                "Unknown resource kind for " + resourceID));
+
         return resourceNsMetricGroups.computeIfAbsent(
                 resourceID,
                 rid ->
                         operatorMetricGroup.createResourceNamespaceGroup(
                                 configManager.getDefaultConfig(),
+                                resourceClass,
                                 rid.getNamespace().orElse("default")));
     }
 
-    private KubernetesResourceMetricGroup getResourceMg(ResourceID resourceID) {
+    @NotNull
+    private Optional<Class<? extends AbstractFlinkResource<?, ?>>> getResourceClass(
+            Map<String, Object> metadata) {
+        var resourceGvk = (GroupVersionKind) metadata.get(Constants.RESOURCE_GVK_KEY);
+
+        if (resourceGvk == null) {
+            return Optional.empty();
+        }
+
+        Class<? extends AbstractFlinkResource<?, ?>> resourceClass;
+
+        if (resourceGvk.kind.equals(FlinkDeployment.class.getSimpleName())) {
+            resourceClass = FlinkDeployment.class;
+        } else if (resourceGvk.kind.equals(FlinkSessionJob.class.getSimpleName())) {
+            resourceClass = FlinkSessionJob.class;
+        } else {
+            return Optional.empty();
+        }
+
+        return Optional.of(resourceClass);
+    }
+
+    private KubernetesResourceMetricGroup getResourceMg(
+            ResourceID resourceID, Map<String, Object> metadata) {
         return resourceMetricGroups.computeIfAbsent(
                 resourceID,
                 rid ->
-                        getResourceNsMg(rid)
+                        getResourceNsMg(rid, metadata)
                                 .createResourceNamespaceGroup(
                                         configManager.getDefaultConfig(), rid.getName()));
     }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
index 872e749c..dde85075 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
@@ -78,7 +78,7 @@ public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>>
     private Map<String, Tuple2<Histogram, Map<String, Histogram>>> transitionMetrics;
     private Map<ResourceLifecycleState, Tuple2<Histogram, Map<String, Histogram>>> stateTimeMetrics;
 
-    private Function<MetricGroup, MetricGroup> metricGroupFunction;
+    private Function<MetricGroup, MetricGroup> metricGroupFunction = mg -> mg.addGroup("Lifecycle");
 
     public LifecycleMetrics(
             FlinkConfigManager configManager, KubernetesOperatorMetricGroup operatorMetricGroup) {
@@ -106,7 +106,7 @@ public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>>
 
     private ResourceLifecycleMetricTracker getLifecycleMetricTracker(CR cr) {
         init(cr);
-        createNamespaceStateCountIfMissing(cr.getMetadata().getNamespace());
+        createNamespaceStateCountIfMissing(cr);
         return lifecycleTrackers.computeIfAbsent(
                 Tuple2.of(cr.getMetadata().getNamespace(), cr.getMetadata().getName()),
                 k -> {
@@ -123,7 +123,8 @@ public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>>
                 });
     }
 
-    private void createNamespaceStateCountIfMissing(String namespace) {
+    private void createNamespaceStateCountIfMissing(CR cr) {
+        var namespace = cr.getMetadata().getNamespace();
         if (!namespaces.add(namespace)) {
             return;
         }
@@ -131,7 +132,7 @@ public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>>
         MetricGroup lifecycleGroup =
                 metricGroupFunction.apply(
                         operatorMetricGroup.createResourceNamespaceGroup(
-                                configManager.getDefaultConfig(), namespace));
+                                configManager.getDefaultConfig(), cr.getClass(), namespace));
         for (ResourceLifecycleState state : ResourceLifecycleState.values()) {
             lifecycleGroup
                     .addGroup("State")
@@ -150,8 +151,6 @@ public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>>
         if (transitionMetrics != null) {
             return;
         }
-        this.metricGroupFunction =
-                mg -> mg.addGroup(cr.getClass().getSimpleName()).addGroup("Lifecycle");
 
         this.transitionMetrics = new ConcurrentHashMap<>();
         TRACKED_TRANSITIONS.forEach(
@@ -192,6 +191,8 @@ public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>>
                                                                                 .createResourceNamespaceGroup(
                                                                                         configManager
                                                                                                 .getDefaultConfig(),
+                                                                                        cr
+                                                                                                .getClass(),
                                                                                         ns))))
                                         : List.of(t.f0)));
         return histos;
@@ -215,6 +216,8 @@ public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>>
                                                                                 .createResourceNamespaceGroup(
                                                                                         configManager
                                                                                                 .getDefaultConfig(),
+                                                                                        cr
+                                                                                                .getClass(),
                                                                                         ns))))
                                         : List.of(t.f0)));
         return histos;
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
index 3c368fef..81624a57 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
@@ -27,7 +27,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.COUNTER_NAME;
-import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.DEPLOYMENT_GROUP_NAME;
 import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.STATUS_GROUP_NAME;
 import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -55,12 +54,16 @@ public class FlinkDeploymentMetricsTest {
         var deployment2 = TestUtils.buildApplicationCluster("deployment2", namespace);
 
         var counterId =
-                listener.getNamespaceMetricId(namespace, DEPLOYMENT_GROUP_NAME, COUNTER_NAME);
+                listener.getNamespaceMetricId(FlinkDeployment.class, namespace, COUNTER_NAME);
         assertTrue(listener.getGauge(counterId).isEmpty());
         for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) {
             var statusId =
                     listener.getNamespaceMetricId(
-                            namespace, STATUS_GROUP_NAME, status.name(), COUNTER_NAME);
+                            FlinkDeployment.class,
+                            namespace,
+                            STATUS_GROUP_NAME,
+                            status.name(),
+                            COUNTER_NAME);
             assertTrue(listener.getGauge(statusId).isEmpty());
         }
 
@@ -75,7 +78,11 @@ public class FlinkDeploymentMetricsTest {
 
             var statusId =
                     listener.getNamespaceMetricId(
-                            namespace, STATUS_GROUP_NAME, status.name(), COUNTER_NAME);
+                            FlinkDeployment.class,
+                            namespace,
+                            STATUS_GROUP_NAME,
+                            status.name(),
+                            COUNTER_NAME);
             assertEquals(2, listener.getGauge(statusId).get().getValue());
         }
 
@@ -85,7 +92,11 @@ public class FlinkDeploymentMetricsTest {
         for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) {
             var statusId =
                     listener.getNamespaceMetricId(
-                            namespace, STATUS_GROUP_NAME, status.name(), COUNTER_NAME);
+                            FlinkDeployment.class,
+                            namespace,
+                            STATUS_GROUP_NAME,
+                            status.name(),
+                            COUNTER_NAME);
             assertEquals(0, listener.getGauge(statusId).get().getValue());
         }
     }
@@ -98,18 +109,26 @@ public class FlinkDeploymentMetricsTest {
         var deployment2 = TestUtils.buildApplicationCluster("deployment", namespace2);
 
         var counterId1 =
-                listener.getNamespaceMetricId(namespace1, DEPLOYMENT_GROUP_NAME, COUNTER_NAME);
+                listener.getNamespaceMetricId(FlinkDeployment.class, namespace1, COUNTER_NAME);
         var counterId2 =
-                listener.getNamespaceMetricId(namespace2, DEPLOYMENT_GROUP_NAME, COUNTER_NAME);
+                listener.getNamespaceMetricId(FlinkDeployment.class, namespace2, COUNTER_NAME);
         assertTrue(listener.getGauge(counterId1).isEmpty());
         assertTrue(listener.getGauge(counterId2).isEmpty());
         for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) {
             var statusId1 =
                     listener.getNamespaceMetricId(
-                            namespace1, STATUS_GROUP_NAME, status.name(), COUNTER_NAME);
+                            FlinkDeployment.class,
+                            namespace1,
+                            STATUS_GROUP_NAME,
+                            status.name(),
+                            COUNTER_NAME);
             var statusId2 =
                     listener.getNamespaceMetricId(
-                            namespace2, STATUS_GROUP_NAME, status.name(), COUNTER_NAME);
+                            FlinkDeployment.class,
+                            namespace2,
+                            STATUS_GROUP_NAME,
+                            status.name(),
+                            COUNTER_NAME);
             assertTrue(listener.getGauge(statusId1).isEmpty());
             assertTrue(listener.getGauge(statusId2).isEmpty());
         }
@@ -125,10 +144,18 @@ public class FlinkDeploymentMetricsTest {
             metricManager.onUpdate(deployment2);
             var statusId1 =
                     listener.getNamespaceMetricId(
-                            namespace1, STATUS_GROUP_NAME, status.name(), COUNTER_NAME);
+                            FlinkDeployment.class,
+                            namespace1,
+                            STATUS_GROUP_NAME,
+                            status.name(),
+                            COUNTER_NAME);
             var statusId2 =
                     listener.getNamespaceMetricId(
-                            namespace2, STATUS_GROUP_NAME, status.name(), COUNTER_NAME);
+                            FlinkDeployment.class,
+                            namespace2,
+                            STATUS_GROUP_NAME,
+                            status.name(),
+                            COUNTER_NAME);
             assertEquals(1, listener.getGauge(statusId1).get().getValue());
             assertEquals(1, listener.getGauge(statusId2).get().getValue());
         }
@@ -143,10 +170,18 @@ public class FlinkDeploymentMetricsTest {
             deployment2.getStatus().setJobManagerDeploymentStatus(status);
             var statusId1 =
                     listener.getNamespaceMetricId(
-                            namespace1, STATUS_GROUP_NAME, status.name(), COUNTER_NAME);
+                            FlinkDeployment.class,
+                            namespace1,
+                            STATUS_GROUP_NAME,
+                            status.name(),
+                            COUNTER_NAME);
             var statusId2 =
                     listener.getNamespaceMetricId(
-                            namespace2, STATUS_GROUP_NAME, status.name(), COUNTER_NAME);
+                            FlinkDeployment.class,
+                            namespace2,
+                            STATUS_GROUP_NAME,
+                            status.name(),
+                            COUNTER_NAME);
             assertEquals(0, listener.getGauge(statusId1).get().getValue());
             assertEquals(0, listener.getGauge(statusId2).get().getValue());
         }
@@ -166,13 +201,17 @@ public class FlinkDeploymentMetricsTest {
         var deployment = TestUtils.buildApplicationCluster("deployment", namespace);
 
         var counterId =
-                listener.getNamespaceMetricId(namespace, DEPLOYMENT_GROUP_NAME, COUNTER_NAME);
+                listener.getNamespaceMetricId(FlinkDeployment.class, namespace, COUNTER_NAME);
         metricManager.onUpdate(deployment);
         assertTrue(listener.getGauge(counterId).isEmpty());
         for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) {
             var statusId =
                     listener.getNamespaceMetricId(
-                            namespace, STATUS_GROUP_NAME, status.name(), COUNTER_NAME);
+                            FlinkDeployment.class,
+                            namespace,
+                            STATUS_GROUP_NAME,
+                            status.name(),
+                            COUNTER_NAME);
             assertTrue(listener.getGauge(statusId).isEmpty());
         }
     }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetricsTest.java
index 284d6f42..4c813c11 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetricsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetricsTest.java
@@ -26,7 +26,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import static org.apache.flink.kubernetes.operator.metrics.FlinkSessionJobMetrics.COUNTER_NAME;
-import static org.apache.flink.kubernetes.operator.metrics.FlinkSessionJobMetrics.METRIC_GROUP_NAME;
 import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -51,7 +50,8 @@ public class FlinkSessionJobMetricsTest {
         var namespace = "test-ns";
         var job1 = TestUtils.buildSessionJob("job1", namespace);
         var job2 = TestUtils.buildSessionJob("job2", namespace);
-        var metricId = listener.getNamespaceMetricId(namespace, METRIC_GROUP_NAME, COUNTER_NAME);
+        var metricId =
+                listener.getNamespaceMetricId(FlinkSessionJob.class, namespace, COUNTER_NAME);
         assertTrue(listener.getGauge(namespace).isEmpty());
 
         metricManager.onUpdate(job1);
@@ -78,8 +78,10 @@ public class FlinkSessionJobMetricsTest {
         var job1 = TestUtils.buildSessionJob("job", namespace1);
         var job2 = TestUtils.buildSessionJob("job", namespace2);
 
-        var metricId1 = listener.getNamespaceMetricId(namespace1, METRIC_GROUP_NAME, COUNTER_NAME);
-        var metricId2 = listener.getNamespaceMetricId(namespace2, METRIC_GROUP_NAME, COUNTER_NAME);
+        var metricId1 =
+                listener.getNamespaceMetricId(FlinkSessionJob.class, namespace1, COUNTER_NAME);
+        var metricId2 =
+                listener.getNamespaceMetricId(FlinkSessionJob.class, namespace2, COUNTER_NAME);
 
         assertTrue(listener.getGauge(metricId1).isEmpty());
         assertTrue(listener.getGauge(metricId2).isEmpty());
@@ -118,8 +120,8 @@ public class FlinkSessionJobMetricsTest {
 
         var metricId =
                 listener.getNamespaceMetricId(
+                        FlinkSessionJob.class,
                         flinkSessionJob.getMetadata().getNamespace(),
-                        METRIC_GROUP_NAME,
                         COUNTER_NAME);
 
         assertTrue(listener.getGauge(metricId).isEmpty());
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroupTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroupTest.java
index dc2f28d5..35066040 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroupTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroupTest.java
@@ -18,13 +18,14 @@
 package org.apache.flink.kubernetes.operator.metrics;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
-
 import org.junit.jupiter.api.Test;
 
+import java.util.Map;
+
 import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.SCOPE_NAMING_KUBERNETES_OPERATOR;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -53,7 +54,7 @@ public class KubernetesOperatorMetricGroupTest {
                 group.getMetricIdentifier("test"));
 
         assertEquals(
-                ImmutableMap.of(
+                Map.of(
                         "<host>",
                         "localhost",
                         "<namespace>",
@@ -86,7 +87,7 @@ public class KubernetesOperatorMetricGroupTest {
                 group.getMetricIdentifier("test"));
 
         assertEquals(
-                ImmutableMap.of(
+                Map.of(
                         "<host>",
                         "localhost",
                         "<namespace>",
@@ -110,11 +111,13 @@ public class KubernetesOperatorMetricGroupTest {
                         "flink-kubernetes-operator",
                         "localhost");
 
-        var namespaceGroup = operatorMetricGroup.createResourceNamespaceGroup(configuration, "rns");
+        var namespaceGroup =
+                operatorMetricGroup.createResourceNamespaceGroup(
+                        configuration, FlinkSessionJob.class, "rns");
         var resourceGroup = namespaceGroup.createResourceNamespaceGroup(configuration, "rn");
 
         assertEquals(
-                ImmutableMap.of(
+                Map.of(
                         "<host>",
                         "localhost",
                         "<namespace>",
@@ -122,10 +125,13 @@ public class KubernetesOperatorMetricGroupTest {
                         "<name>",
                         "flink-kubernetes-operator",
                         "<resourcens>",
-                        "rns"),
+                        "rns",
+                        "<resourcetype>",
+                        "FlinkSessionJob"),
                 namespaceGroup.getAllVariables());
+
         assertEquals(
-                ImmutableMap.of(
+                Map.of(
                         "<host>",
                         "localhost",
                         "<namespace>",
@@ -135,7 +141,9 @@ public class KubernetesOperatorMetricGroupTest {
                         "<resourcens>",
                         "rns",
                         "<resourcename>",
-                        "rn"),
+                        "rn",
+                        "<resourcetype>",
+                        "FlinkSessionJob"),
                 resourceGroup.getAllVariables());
         registry.shutdown().get();
     }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java
index e38cffee..49cd3d44 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java
@@ -18,16 +18,11 @@
 package org.apache.flink.kubernetes.operator.metrics;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
 
 import io.javaoperatorsdk.operator.api.monitoring.Metrics;
 import io.javaoperatorsdk.operator.api.reconciler.Constants;
@@ -39,7 +34,6 @@ import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEv
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -50,40 +44,31 @@ public class OperatorJosdkMetricsTest {
 
     private static final ResourceID resourceId = new ResourceID("testname", "testns");
     private static final String controllerName = FlinkDeploymentController.class.getSimpleName();
-    private static final String resourcePrefix =
-            "testhost.k8soperator.flink-operator-test.testopname.resource.testns.testname.JOSDK.";
-    private static final String systemPrefix =
-            "testhost.k8soperator.flink-operator-test.testopname.system.";
-    private static final String executionPrefix = systemPrefix + "JOSDK.FlinkDeployment.";
+    private static final Map<String, Object> metadata =
+            Map.of(Constants.RESOURCE_GVK_KEY, GroupVersionKind.gvkFor(FlinkDeployment.class));
 
-    private Map<String, Metric> metrics = new HashMap<>();
     private OperatorJosdkMetrics operatorMetrics;
+    private TestingMetricListener listener;
 
     @BeforeEach
     public void setup() {
-        TestingMetricRegistry registry =
-                TestingMetricRegistry.builder()
-                        .setDelimiter(".".charAt(0))
-                        .setRegisterConsumer(
-                                (metric, name, group) ->
-                                        metrics.put(group.getMetricIdentifier(name), metric))
-                        .build();
+        listener = new TestingMetricListener(new Configuration());
         operatorMetrics =
                 new OperatorJosdkMetrics(
-                        TestUtils.createTestMetricGroup(registry, new Configuration()),
-                        new FlinkConfigManager(new Configuration()));
+                        listener.getMetricGroup(), new FlinkConfigManager(new Configuration()));
     }
 
     @Test
     public void testTimeControllerExecution() throws Exception {
         var successExecution = new TestingExecutionBase<>();
         operatorMetrics.timeControllerExecution(successExecution);
-        assertEquals(1, metrics.size());
+
+        assertEquals(1, listener.size());
         assertEquals(1, getHistogram("reconcile", "resource").getCount());
         assertEquals(1, getHistogram("reconcile", "resource").getStatistics().getMin());
         operatorMetrics.timeControllerExecution(successExecution);
         operatorMetrics.timeControllerExecution(successExecution);
-        assertEquals(1, metrics.size());
+        assertEquals(1, listener.size());
         assertEquals(3, getHistogram("reconcile", "resource").getCount());
         assertEquals(1, getHistogram("reconcile", "resource").getStatistics().getMin());
 
@@ -104,7 +89,7 @@ public class OperatorJosdkMetricsTest {
             operatorMetrics.timeControllerExecution(failureExecution);
             fail();
         } catch (Exception e) {
-            assertEquals(2, metrics.size());
+            assertEquals(2, listener.size());
             assertEquals(1, getHistogram("cleanup", "failed").getCount());
             assertEquals(1, getHistogram("cleanup", "failed").getStatistics().getMin());
         }
@@ -117,7 +102,7 @@ public class OperatorJosdkMetricsTest {
             operatorMetrics.timeControllerExecution(failureExecution);
             fail();
         } catch (Exception e) {
-            assertEquals(2, metrics.size());
+            assertEquals(2, listener.size());
             assertEquals(3, getHistogram("cleanup", "failed").getCount());
             assertEquals(1, getHistogram("cleanup", "failed").getStatistics().getMin());
         }
@@ -125,16 +110,16 @@ public class OperatorJosdkMetricsTest {
 
     @Test
     public void testMetrics() {
-        operatorMetrics.failedReconciliation(resourceId, null);
-        assertEquals(1, metrics.size());
+        operatorMetrics.failedReconciliation(resourceId, null, metadata);
+        assertEquals(1, listener.size());
         assertEquals(1, getCount("Reconciliation.failed"));
-        operatorMetrics.failedReconciliation(resourceId, null);
-        operatorMetrics.failedReconciliation(resourceId, null);
-        assertEquals(1, metrics.size());
+        operatorMetrics.failedReconciliation(resourceId, null, metadata);
+        operatorMetrics.failedReconciliation(resourceId, null, metadata);
+        assertEquals(1, listener.size());
         assertEquals(3, getCount("Reconciliation.failed"));
 
-        operatorMetrics.reconcileCustomResource(resourceId, null);
-        assertEquals(2, metrics.size());
+        operatorMetrics.reconcileCustomResource(resourceId, null, metadata);
+        assertEquals(2, listener.size());
         assertEquals(1, getCount("Reconciliation"));
 
         operatorMetrics.reconcileCustomResource(
@@ -149,45 +134,70 @@ public class OperatorJosdkMetricsTest {
                     public boolean isLastAttempt() {
                         return false;
                     }
-                });
-        assertEquals(3, metrics.size());
+                },
+                metadata);
+        assertEquals(3, listener.size());
         assertEquals(2, getCount("Reconciliation"));
         assertEquals(1, getCount("Reconciliation.retries"));
 
-        operatorMetrics.receivedEvent(new ResourceEvent(ResourceAction.ADDED, resourceId, null));
-        assertEquals(5, metrics.size());
+        operatorMetrics.receivedEvent(
+                new ResourceEvent(ResourceAction.ADDED, resourceId, null), metadata);
+        assertEquals(5, listener.size());
         assertEquals(1, getCount("Resource.Event"));
         assertEquals(1, getCount("Resource.Event.ADDED"));
 
-        operatorMetrics.cleanupDoneFor(resourceId);
-        assertEquals(6, metrics.size());
+        operatorMetrics.cleanupDoneFor(resourceId, metadata);
+        assertEquals(6, listener.size());
         assertEquals(1, getCount("Reconciliation.cleanup"));
 
-        operatorMetrics.finishedReconciliation(resourceId);
-        assertEquals(7, metrics.size());
+        operatorMetrics.finishedReconciliation(resourceId, metadata);
+        assertEquals(7, listener.size());
         assertEquals(1, getCount("Reconciliation.finished"));
 
         operatorMetrics.monitorSizeOf(Map.of("a", "b", "c", "d"), "mymap");
-        assertEquals(8, metrics.size());
-        assertEquals(2, ((Gauge<Integer>) metrics.get(systemPrefix + "mymap.size")).getValue());
+        assertEquals(8, listener.size());
+        assertEquals(
+                2,
+                listener.getGauge(listener.getMetricId("JOSDK", "mymap", "size")).get().getValue());
 
-        operatorMetrics.reconcileCustomResource(new ResourceID("other", "otherns"), null);
-        assertEquals(9, metrics.size());
+        operatorMetrics.reconcileCustomResource(new ResourceID("other", "otherns"), null, metadata);
+        assertEquals(9, listener.size());
         assertEquals(
                 1,
-                ((Counter)
-                                metrics.get(
-                                        "testhost.k8soperator.flink-operator-test.testopname.resource.otherns.other.JOSDK.Reconciliation.Count"))
+                listener.getCounter(
+                                listener.getResourceMetricId(
+                                        FlinkDeployment.class,
+                                        "otherns",
+                                        "other",
+                                        "JOSDK",
+                                        "Reconciliation",
+                                        "Count"))
+                        .get()
                         .getCount());
     }
 
     private Histogram getHistogram(String... names) {
-        return ((Histogram)
-                metrics.get(executionPrefix + String.join(".", names) + ".TimeSeconds"));
+        return listener.getHistogram(
+                        listener.getNamespaceMetricId(
+                                FlinkDeployment.class,
+                                "testns",
+                                "JOSDK",
+                                String.join(".", names),
+                                "TimeSeconds"))
+                .get();
     }
 
     private long getCount(String name) {
-        return ((Counter) metrics.get(resourcePrefix + name + ".Count")).getCount();
+        return listener.getCounter(
+                        listener.getResourceMetricId(
+                                FlinkDeployment.class,
+                                "testns",
+                                "testname",
+                                "JOSDK",
+                                name,
+                                "Count"))
+                .get()
+                .getCount();
     }
 
     private static class TestingExecutionBase<T> implements Metrics.ControllerExecution<T> {
@@ -208,8 +218,7 @@ public class OperatorJosdkMetricsTest {
 
         @Override
         public Map<String, Object> metadata() {
-            return Map.of(
-                    Constants.RESOURCE_GVK_KEY, GroupVersionKind.gvkFor(FlinkDeployment.class));
+            return metadata;
         }
 
         @Override
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/TestingMetricListener.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/TestingMetricListener.java
index 294822d2..3ce5f21b 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/TestingMetricListener.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/TestingMetricListener.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.metrics;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
@@ -83,9 +84,27 @@ public class TestingMetricListener {
         return metricGroup.getMetricIdentifier(String.join(DELIMITER, identifiers));
     }
 
-    public String getNamespaceMetricId(String resourceNs, String... identifiers) {
+    public String getNamespaceMetricId(
+            Class<? extends AbstractFlinkResource<?, ?>> resourceClass,
+            String resourceNs,
+            String... identifiers) {
         return metricGroup
-                .createResourceNamespaceGroup(configuration, resourceNs)
+                .createResourceNamespaceGroup(configuration, resourceClass, resourceNs)
                 .getMetricIdentifier(String.join(DELIMITER, identifiers));
     }
+
+    public String getResourceMetricId(
+            Class<? extends AbstractFlinkResource<?, ?>> resourceClass,
+            String resourceNs,
+            String resourceName,
+            String... identifiers) {
+        return metricGroup
+                .createResourceNamespaceGroup(configuration, resourceClass, resourceNs)
+                .createResourceNamespaceGroup(configuration, resourceName)
+                .getMetricIdentifier(String.join(DELIMITER, identifiers));
+    }
+
+    public int size() {
+        return metrics.size();
+    }
 }