You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/07/08 12:40:47 UTC

[flink-kubernetes-operator] 01/03: [FLINK-28297] Improve operator metric groups

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 1b3acc23868f9ad09ea73e7556a7b31176496783
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Jul 7 10:20:26 2022 +0200

    [FLINK-28297] Improve operator metric groups
---
 .../flink/kubernetes/operator/FlinkOperator.java   | 12 ++--
 .../metrics/KubernetesOperatorMetricGroup.java     | 24 ++++---
 .../metrics/KubernetesOperatorMetricOptions.java   | 18 +++++-
 ...oup.java => KubernetesResourceMetricGroup.java} | 47 ++++----------
 ...=> KubernetesResourceNamespaceMetricGroup.java} | 55 +++++++---------
 .../KubernetesResourceNamespaceScopeFormat.java    | 46 ++++++++++++++
 .../metrics/KubernetesResourceScopeFormat.java     | 52 +++++++++++++++
 .../kubernetes/operator/metrics/MetricManager.java | 19 +++---
 .../operator/metrics/OperatorMetricUtils.java      |  2 +-
 .../flink/kubernetes/operator/TestUtils.java       | 28 +++++++++
 .../kubernetes/operator/TestingStatusRecorder.java |  5 +-
 .../TestingFlinkDeploymentController.java          |  5 +-
 .../listener/FlinkResourceListenerTest.java        |  5 +-
 .../metrics/KubernetesOperatorMetricGroupTest.java | 48 +++++++++++++-
 .../operator/utils/StatusRecorderTest.java         | 73 ++++++++++++++--------
 15 files changed, 309 insertions(+), 130 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 590af2c..419702d 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -28,6 +28,7 @@ import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
 import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
 import org.apache.flink.kubernetes.operator.listener.ListenerUtils;
+import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
 import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
 import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
@@ -40,7 +41,6 @@ import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
 import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
-import org.apache.flink.metrics.MetricGroup;
 
 import io.fabric8.kubernetes.client.DefaultKubernetesClient;
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -70,7 +70,7 @@ public class FlinkOperator {
     private final FlinkConfigManager configManager;
     private final Set<FlinkResourceValidator> validators;
     private final Set<RegisteredController> registeredControllers = new HashSet<>();
-    private final MetricGroup metricGroup;
+    private final KubernetesOperatorMetricGroup metricGroup;
     private final Collection<FlinkResourceListener> listeners;
 
     public FlinkOperator(@Nullable Configuration conf) {
@@ -114,7 +114,9 @@ public class FlinkOperator {
     private void registerDeploymentController() {
         var statusRecorder =
                 StatusRecorder.<FlinkDeploymentStatus>create(
-                        client, new MetricManager<>(metricGroup), listeners);
+                        client,
+                        new MetricManager<>(metricGroup, configManager.getDefaultConfig()),
+                        listeners);
         var eventRecorder = EventRecorder.create(client, listeners);
         var reconcilerFactory =
                 new ReconcilerFactory(
@@ -137,7 +139,9 @@ public class FlinkOperator {
         var eventRecorder = EventRecorder.create(client, listeners);
         var statusRecorder =
                 StatusRecorder.<FlinkSessionJobStatus>create(
-                        client, new MetricManager<>(metricGroup), listeners);
+                        client,
+                        new MetricManager<>(metricGroup, configManager.getDefaultConfig()),
+                        listeners);
         var reconciler =
                 new SessionJobReconciler(
                         client, flinkService, configManager, eventRecorder, statusRecorder);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
index 3b1c088..a410d6f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
@@ -26,14 +26,12 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 
 import java.util.Map;
 
-/** Flink based operator metric group. */
-public class KubernetesOperatorMetricGroup
-        extends AbstractMetricGroup<KubernetesOperatorMetricGroup> {
+/** Base metric group for Flink Operator System metrics. */
+public class KubernetesOperatorMetricGroup extends AbstractMetricGroup<AbstractMetricGroup<?>> {
 
-    private static final String GROUP_NAME = "k8soperator";
-    private final String namespace;
-    private final String name;
-    private final String hostname;
+    protected final String namespace;
+    protected final String name;
+    protected final String hostname;
 
     private KubernetesOperatorMetricGroup(
             MetricRegistry registry,
@@ -47,6 +45,16 @@ public class KubernetesOperatorMetricGroup
         this.hostname = hostname;
     }
 
+    public KubernetesResourceNamespaceMetricGroup createResourceNamespaceGroup(
+            Configuration config, String resourceNs) {
+        return new KubernetesResourceNamespaceMetricGroup(
+                registry,
+                this,
+                KubernetesResourceNamespaceScopeFormat.fromConfig(config)
+                        .formatScope(namespace, name, hostname, resourceNs),
+                resourceNs);
+    }
+
     public static KubernetesOperatorMetricGroup create(
             MetricRegistry metricRegistry,
             Configuration configuration,
@@ -71,7 +79,7 @@ public class KubernetesOperatorMetricGroup
 
     @Override
     protected final String getGroupName(CharacterFilter filter) {
-        return GROUP_NAME;
+        return "k8soperator";
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
index 42e1d40..30b02f9 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
@@ -23,8 +23,22 @@ import org.apache.flink.configuration.ConfigOptions;
 /** Configuration options for metrics. */
 public class KubernetesOperatorMetricOptions {
     public static final ConfigOption<String> SCOPE_NAMING_KUBERNETES_OPERATOR =
-            ConfigOptions.key("metrics.scope.k8soperator")
-                    .defaultValue("<host>.k8soperator.<namespace>.<name>")
+            ConfigOptions.key("metrics.scope.k8soperator.system")
+                    .defaultValue("<host>.k8soperator.<namespace>.<name>.system")
+                    .withDeprecatedKeys("metrics.scope.k8soperator")
                     .withDescription(
                             "Defines the scope format string that is applied to all metrics scoped to the kubernetes operator.");
+
+    public static final ConfigOption<String> SCOPE_NAMING_KUBERNETES_OPERATOR_RESOURCENS =
+            ConfigOptions.key("metrics.scope.k8soperator.resourcens")
+                    .defaultValue("<host>.k8soperator.<namespace>.<name>.namespace.<resourcens>.")
+                    .withDescription(
+                            "Defines the scope format string that is applied to all metrics scoped to the kubernetes operator resource namespace.");
+
+    public static final ConfigOption<String> SCOPE_NAMING_KUBERNETES_OPERATOR_RESOURCE =
+            ConfigOptions.key("metrics.scope.k8soperator.resource")
+                    .defaultValue(
+                            "<host>.k8soperator.<namespace>.<name>.resource.<resourcens>.<resourcename>")
+                    .withDescription(
+                            "Defines the scope format string that is applied to all metrics scoped to the kubernetes operator resource.");
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceMetricGroup.java
similarity index 51%
copy from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
copy to flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceMetricGroup.java
index 3b1c088..b76a594 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceMetricGroup.java
@@ -17,61 +17,36 @@
 
 package org.apache.flink.kubernetes.operator.metrics;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
-import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 
 import java.util.Map;
 
-/** Flink based operator metric group. */
-public class KubernetesOperatorMetricGroup
-        extends AbstractMetricGroup<KubernetesOperatorMetricGroup> {
+/** Base metric group for Flink Operator Resource level metrics. */
+public class KubernetesResourceMetricGroup
+        extends AbstractMetricGroup<KubernetesResourceNamespaceMetricGroup> {
 
-    private static final String GROUP_NAME = "k8soperator";
-    private final String namespace;
-    private final String name;
-    private final String hostname;
+    private final String resourceName;
 
-    private KubernetesOperatorMetricGroup(
+    protected KubernetesResourceMetricGroup(
             MetricRegistry registry,
+            KubernetesResourceNamespaceMetricGroup parent,
             String[] scope,
-            String namespace,
-            String name,
-            String hostname) {
-        super(registry, scope, null);
-        this.namespace = namespace;
-        this.name = name;
-        this.hostname = hostname;
-    }
-
-    public static KubernetesOperatorMetricGroup create(
-            MetricRegistry metricRegistry,
-            Configuration configuration,
-            String namespace,
-            String name,
-            String hostname) {
-        return new KubernetesOperatorMetricGroup(
-                metricRegistry,
-                KubernetesOperatorScopeFormat.fromConfig(configuration)
-                        .formatScope(namespace, name, hostname),
-                namespace,
-                name,
-                hostname);
+            String resourceName) {
+        super(registry, scope, parent);
+        this.resourceName = resourceName;
     }
 
     @Override
     protected final void putVariables(Map<String, String> variables) {
-        variables.put(KubernetesOperatorScopeFormat.NAMESPACE, namespace);
-        variables.put(KubernetesOperatorScopeFormat.NAME, name);
-        variables.put(ScopeFormat.SCOPE_HOST, hostname);
+        variables.put(KubernetesResourceScopeFormat.RESOURCE, resourceName);
     }
 
     @Override
     protected final String getGroupName(CharacterFilter filter) {
-        return GROUP_NAME;
+        return "resource";
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceMetricGroup.java
similarity index 56%
copy from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
copy to flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceMetricGroup.java
index 3b1c088..bd6f8e2 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceMetricGroup.java
@@ -22,56 +22,47 @@ import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
-import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 
 import java.util.Map;
 
-/** Flink based operator metric group. */
-public class KubernetesOperatorMetricGroup
+/** Base metric group for Flink Operator Resource namespace level metrics. */
+public class KubernetesResourceNamespaceMetricGroup
         extends AbstractMetricGroup<KubernetesOperatorMetricGroup> {
 
-    private static final String GROUP_NAME = "k8soperator";
-    private final String namespace;
-    private final String name;
-    private final String hostname;
+    private final String resourceNs;
 
-    private KubernetesOperatorMetricGroup(
+    protected KubernetesResourceNamespaceMetricGroup(
             MetricRegistry registry,
+            KubernetesOperatorMetricGroup parent,
             String[] scope,
-            String namespace,
-            String name,
-            String hostname) {
-        super(registry, scope, null);
-        this.namespace = namespace;
-        this.name = name;
-        this.hostname = hostname;
+            String resourceNs) {
+        super(registry, scope, parent);
+        this.resourceNs = resourceNs;
     }
 
-    public static KubernetesOperatorMetricGroup create(
-            MetricRegistry metricRegistry,
-            Configuration configuration,
-            String namespace,
-            String name,
-            String hostname) {
-        return new KubernetesOperatorMetricGroup(
-                metricRegistry,
-                KubernetesOperatorScopeFormat.fromConfig(configuration)
-                        .formatScope(namespace, name, hostname),
-                namespace,
-                name,
-                hostname);
+    public KubernetesResourceMetricGroup createResourceNamespaceGroup(
+            Configuration config, String resourceName) {
+        return new KubernetesResourceMetricGroup(
+                registry,
+                this,
+                KubernetesResourceScopeFormat.fromConfig(config)
+                        .formatScope(
+                                parent.namespace,
+                                parent.name,
+                                parent.hostname,
+                                resourceNs,
+                                resourceName),
+                resourceName);
     }
 
     @Override
     protected final void putVariables(Map<String, String> variables) {
-        variables.put(KubernetesOperatorScopeFormat.NAMESPACE, namespace);
-        variables.put(KubernetesOperatorScopeFormat.NAME, name);
-        variables.put(ScopeFormat.SCOPE_HOST, hostname);
+        variables.put(KubernetesResourceNamespaceScopeFormat.RESOURCE_NS, resourceNs);
     }
 
     @Override
     protected final String getGroupName(CharacterFilter filter) {
-        return GROUP_NAME;
+        return "namespace";
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceScopeFormat.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceScopeFormat.java
new file mode 100644
index 0000000..17b6ffe
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceScopeFormat.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.SCOPE_NAMING_KUBERNETES_OPERATOR_RESOURCENS;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorScopeFormat.NAME;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorScopeFormat.NAMESPACE;
+
+/** Format for metrics. * */
+public class KubernetesResourceNamespaceScopeFormat extends ScopeFormat {
+
+    public static final String RESOURCE_NS = asVariable("resourcens");
+
+    public KubernetesResourceNamespaceScopeFormat(String format) {
+        super(format, null, new String[] {NAMESPACE, NAME, SCOPE_HOST, RESOURCE_NS});
+    }
+
+    public String[] formatScope(String namespace, String name, String hostname, String resourceNs) {
+        final String[] template = copyTemplate();
+        final String[] values = {namespace, name, hostname, resourceNs};
+        return bindVariables(template, values);
+    }
+
+    public static KubernetesResourceNamespaceScopeFormat fromConfig(Configuration config) {
+        String format = config.getString(SCOPE_NAMING_KUBERNETES_OPERATOR_RESOURCENS);
+        return new KubernetesResourceNamespaceScopeFormat(format);
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceScopeFormat.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceScopeFormat.java
new file mode 100644
index 0000000..7f05ab6
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceScopeFormat.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.SCOPE_NAMING_KUBERNETES_OPERATOR_RESOURCE;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorScopeFormat.NAME;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorScopeFormat.NAMESPACE;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesResourceNamespaceScopeFormat.RESOURCE_NS;
+
+/** Format for metrics. * */
+public class KubernetesResourceScopeFormat extends ScopeFormat {
+
+    public static final String RESOURCE = asVariable("resourcename");
+
+    public KubernetesResourceScopeFormat(String format) {
+        super(format, null, new String[] {NAMESPACE, NAME, SCOPE_HOST, RESOURCE_NS, RESOURCE});
+    }
+
+    public String[] formatScope(
+            String namespace,
+            String name,
+            String hostname,
+            String resourceNs,
+            String resourceName) {
+        final String[] template = copyTemplate();
+        final String[] values = {namespace, name, hostname, resourceNs, resourceName};
+        return bindVariables(template, values);
+    }
+
+    public static KubernetesResourceScopeFormat fromConfig(Configuration config) {
+        String format = config.getString(SCOPE_NAMING_KUBERNETES_OPERATOR_RESOURCE);
+        return new KubernetesResourceScopeFormat(format);
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
index 8cd9c3d..18b96b4 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
@@ -17,9 +17,9 @@
 
 package org.apache.flink.kubernetes.operator.metrics;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
-import org.apache.flink.metrics.MetricGroup;
 
 import io.fabric8.kubernetes.client.CustomResource;
 
@@ -28,12 +28,13 @@ import java.util.concurrent.ConcurrentHashMap;
 
 /** Metric manager for Operator managed custom resources. */
 public class MetricManager<CR extends CustomResource<?, ?>> {
-    public static final String NS_SCOPE_KEY = "resourcens";
-    private final MetricGroup metricGroup;
+    private final KubernetesOperatorMetricGroup opMetricGroup;
+    private final Configuration conf;
     private final Map<String, CustomResourceMetrics> metrics = new ConcurrentHashMap<>();
 
-    public MetricManager(MetricGroup metricGroup) {
-        this.metricGroup = metricGroup;
+    public MetricManager(KubernetesOperatorMetricGroup opMetricGroup, Configuration conf) {
+        this.opMetricGroup = opMetricGroup;
+        this.conf = conf;
     }
 
     public void onUpdate(CR cr) {
@@ -50,12 +51,12 @@ public class MetricManager<CR extends CustomResource<?, ?>> {
     }
 
     private CustomResourceMetrics getCustomResourceMetricsImpl(CR cr) {
+        var namespaceMg =
+                opMetricGroup.createResourceNamespaceGroup(conf, cr.getMetadata().getNamespace());
         if (cr instanceof FlinkDeployment) {
-            return new FlinkDeploymentMetrics(
-                    metricGroup.addGroup(NS_SCOPE_KEY, cr.getMetadata().getNamespace()));
+            return new FlinkDeploymentMetrics(namespaceMg);
         } else if (cr instanceof FlinkSessionJob) {
-            return new FlinkSessionJobMetrics(
-                    metricGroup.addGroup(NS_SCOPE_KEY, cr.getMetadata().getNamespace()));
+            return new FlinkSessionJobMetrics(namespaceMg);
         } else {
             throw new IllegalArgumentException("Unknown CustomResource");
         }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
index 0e53caf..6040d39 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
@@ -43,7 +43,7 @@ public class OperatorMetricUtils {
     private static final String OPERATOR_METRICS_PREFIX = "kubernetes.operator.metrics.";
     private static final String METRICS_PREFIX = "metrics.";
 
-    public static MetricGroup initOperatorMetrics(Configuration defaultConfig) {
+    public static KubernetesOperatorMetricGroup initOperatorMetrics(Configuration defaultConfig) {
         Configuration metricConfig = createMetricConfig(defaultConfig);
         LOG.info("Initializing operator metrics using conf: {}", metricConfig);
         PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(metricConfig);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index f8b6290..038ffd7 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -18,9 +18,11 @@
 package org.apache.flink.kubernetes.operator;
 
 import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
@@ -36,6 +38,10 @@ import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
 
 import io.fabric8.kubernetes.api.model.Container;
 import io.fabric8.kubernetes.api.model.ContainerStatus;
@@ -421,6 +427,28 @@ public class TestUtils {
         }
     }
 
+    public static <T extends AbstractFlinkResource<?, ?>> MetricManager<T> createTestMetricManager(
+            Configuration conf) {
+        return createTestMetricManager(
+                TestingMetricRegistry.builder()
+                        .setDelimiter(".".charAt(0))
+                        .setRegisterConsumer((metric, name, group) -> {})
+                        .build(),
+                conf);
+    }
+
+    public static <T extends AbstractFlinkResource<?, ?>> MetricManager<T> createTestMetricManager(
+            MetricRegistry metricRegistry, Configuration conf) {
+
+        return new MetricManager<>(createTestMetricGroup(metricRegistry, conf), conf);
+    }
+
+    public static KubernetesOperatorMetricGroup createTestMetricGroup(
+            MetricRegistry metricRegistry, Configuration conf) {
+        return KubernetesOperatorMetricGroup.create(
+                metricRegistry, conf, TEST_NAMESPACE, "testopname", "testhost");
+    }
+
     /** Testing ResponseProvider. */
     public static class ValidatingResponseProvider<T> implements ResponseProvider<Object> {
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
index 8306761..5e9cfcc 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.kubernetes.operator;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
-import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
-import org.apache.flink.metrics.testutils.MetricListener;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
@@ -30,7 +29,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 public class TestingStatusRecorder<STATUS extends CommonStatus<?>> extends StatusRecorder<STATUS> {
 
     public TestingStatusRecorder() {
-        super(null, new MetricManager<>(new MetricListener().getMetricGroup()), (r, s) -> {});
+        super(null, TestUtils.createTestMetricManager(new Configuration()), (r, s) -> {});
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index f78b769..68a29f7 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -17,19 +17,18 @@
 
 package org.apache.flink.kubernetes.operator.controller;
 
+import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
-import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
-import org.apache.flink.metrics.testutils.MetricListener;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
@@ -67,7 +66,7 @@ public class TestingFlinkDeploymentController
         statusRecorder =
                 new StatusRecorder<>(
                         kubernetesClient,
-                        new MetricManager<>(new MetricListener().getMetricGroup()),
+                        TestUtils.createTestMetricManager(configManager.getDefaultConfig()),
                         statusUpdateCounter);
         flinkDeploymentController =
                 new FlinkDeploymentController(
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
index 10a25da..e68c910 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
@@ -17,14 +17,13 @@
 
 package org.apache.flink.kubernetes.operator.listener;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
-import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
-import org.apache.flink.metrics.testutils.MetricListener;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
@@ -56,7 +55,7 @@ public class FlinkResourceListenerTest {
         var statusRecorder =
                 StatusRecorder.<FlinkDeploymentStatus>create(
                         kubernetesClient,
-                        new MetricManager<>(new MetricListener().getMetricGroup()),
+                        TestUtils.createTestMetricManager(new Configuration()),
                         listeners);
         var eventRecorder = EventRecorder.create(kubernetesClient, listeners);
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroupTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroupTest.java
index 2e1bfc3..b02f112 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroupTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroupTest.java
@@ -44,10 +44,12 @@ public class KubernetesOperatorMetricGroupTest {
                         "flink-kubernetes-operator",
                         "localhost");
         assertArrayEquals(
-                new String[] {"localhost", "k8soperator", "default", "flink-kubernetes-operator"},
+                new String[] {
+                    "localhost", "k8soperator", "default", "flink-kubernetes-operator", "system"
+                },
                 group.getScopeComponents());
         assertEquals(
-                "localhost.k8soperator.default.flink-kubernetes-operator.test",
+                "localhost.k8soperator.default.flink-kubernetes-operator.system.test",
                 group.getMetricIdentifier("test"));
 
         assertEquals(
@@ -96,6 +98,48 @@ public class KubernetesOperatorMetricGroupTest {
         registry.shutdown().get();
     }
 
+    @Test
+    public void testSubGroupVariables() throws Exception {
+        var configuration = new Configuration();
+        var registry = new MetricRegistryImpl(fromConfiguration(configuration));
+        var operatorMetricGroup =
+                KubernetesOperatorMetricGroup.create(
+                        registry,
+                        configuration,
+                        "default",
+                        "flink-kubernetes-operator",
+                        "localhost");
+
+        var namespaceGroup = operatorMetricGroup.createResourceNamespaceGroup(configuration, "rns");
+        var resourceGroup = namespaceGroup.createResourceNamespaceGroup(configuration, "rn");
+
+        assertEquals(
+                ImmutableMap.of(
+                        "<host>",
+                        "localhost",
+                        "<namespace>",
+                        "default",
+                        "<name>",
+                        "flink-kubernetes-operator",
+                        "<resourcens>",
+                        "rns"),
+                namespaceGroup.getAllVariables());
+        assertEquals(
+                ImmutableMap.of(
+                        "<host>",
+                        "localhost",
+                        "<namespace>",
+                        "default",
+                        "<name>",
+                        "flink-kubernetes-operator",
+                        "<resourcens>",
+                        "rns",
+                        "<resourcename>",
+                        "rn"),
+                resourceGroup.getAllVariables());
+        registry.shutdown().get();
+    }
+
     private static MetricRegistryConfiguration fromConfiguration(Configuration configuration) {
         return MetricRegistryConfiguration.fromConfiguration(configuration, Long.MAX_VALUE);
     }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
index e3cf2eb..52a2f45 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
@@ -17,21 +17,25 @@
 
 package org.apache.flink.kubernetes.operator.utils;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.metrics.MetricManager;
-import org.apache.flink.metrics.testutils.MetricListener;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
 import org.junit.jupiter.api.Test;
 
+import java.util.HashMap;
+
 import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.METRIC_GROUP_NAME;
-import static org.apache.flink.kubernetes.operator.metrics.MetricManager.NS_SCOPE_KEY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -47,7 +51,7 @@ public class StatusRecorderTest {
         var helper =
                 new StatusRecorder<FlinkDeploymentStatus>(
                         kubernetesClient,
-                        new MetricManager<>(new MetricListener().getMetricGroup()),
+                        TestUtils.createTestMetricManager(new Configuration()),
                         (e, s) -> {});
         var deployment = TestUtils.buildApplicationCluster();
         kubernetesClient.resource(deployment).createOrReplace();
@@ -70,49 +74,64 @@ public class StatusRecorderTest {
 
     @Test
     public void testFlinkDeploymentMetrics() throws InterruptedException {
-        var metricListener = new MetricListener();
+        var metrics = new HashMap<String, Metric>();
+        TestingMetricRegistry registry =
+                TestingMetricRegistry.builder()
+                        .setDelimiter(".".charAt(0))
+                        .setRegisterConsumer(
+                                (metric, name, group) -> {
+                                    metrics.put(group.getMetricIdentifier(name), metric);
+                                })
+                        .build();
+
+        var metricManager =
+                (MetricManager) TestUtils.createTestMetricManager(registry, new Configuration());
         var helper =
                 new StatusRecorder<FlinkDeploymentStatus>(
-                        kubernetesClient,
-                        new MetricManager<>(metricListener.getMetricGroup()),
-                        (e, s) -> {});
+                        kubernetesClient, metricManager, (e, s) -> {});
 
         var deployment = TestUtils.buildApplicationCluster();
         kubernetesClient.resource(deployment).createOrReplace();
 
         helper.updateStatusFromCache(deployment);
-        assertEquals(1, metricListener.getGauge(totalIdentifier(deployment)).get().getValue());
-        assertEquals(1, metricListener.getGauge(perStatusIdentifier(deployment)).get().getValue());
+        assertEquals(1, ((Gauge) metrics.get(totalIdentifier(deployment))).getValue());
+        assertEquals(1, ((Gauge) metrics.get(perStatusIdentifier(deployment))).getValue());
 
         for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) {
             deployment.getStatus().setJobManagerDeploymentStatus(status);
             helper.patchAndCacheStatus(deployment);
-            assertEquals(1, metricListener.getGauge(totalIdentifier(deployment)).get().getValue());
-            assertEquals(
-                    1, metricListener.getGauge(perStatusIdentifier(deployment)).get().getValue());
+            assertEquals(1, ((Gauge) metrics.get(totalIdentifier(deployment))).getValue());
+            assertEquals(1, ((Gauge) metrics.get(perStatusIdentifier(deployment))).getValue());
         }
 
         helper.removeCachedStatus(deployment);
-        assertEquals(0, metricListener.getGauge(totalIdentifier(deployment)).get().getValue());
+        assertEquals(0, ((Gauge) metrics.get(totalIdentifier(deployment))).getValue());
         for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) {
-            assertEquals(
-                    0, metricListener.getGauge(perStatusIdentifier(deployment)).get().getValue());
+            assertEquals(0, ((Gauge) metrics.get(perStatusIdentifier(deployment))).getValue());
         }
     }
 
-    private String[] totalIdentifier(FlinkDeployment deployment) {
-        return new String[] {
-            NS_SCOPE_KEY, deployment.getMetadata().getNamespace(), METRIC_GROUP_NAME, "Count"
-        };
+    private String totalIdentifier(FlinkDeployment deployment) {
+        String baseScope = "testhost.k8soperator.flink-operator-test.testopname.";
+        String[] metricScope =
+                new String[] {
+                    "namespace", deployment.getMetadata().getNamespace(), METRIC_GROUP_NAME, "Count"
+                };
+        return baseScope + String.join(".", metricScope);
     }
 
-    private String[] perStatusIdentifier(FlinkDeployment deployment) {
-        return new String[] {
-            NS_SCOPE_KEY,
-            deployment.getMetadata().getNamespace(),
-            METRIC_GROUP_NAME,
-            deployment.getStatus().getJobManagerDeploymentStatus().name(),
-            "Count"
-        };
+    private String perStatusIdentifier(FlinkDeployment deployment) {
+
+        String baseScope = "testhost.k8soperator.flink-operator-test.testopname.";
+        String[] metricScope =
+                new String[] {
+                    "namespace",
+                    deployment.getMetadata().getNamespace(),
+                    METRIC_GROUP_NAME,
+                    deployment.getStatus().getJobManagerDeploymentStatus().name(),
+                    "Count"
+                };
+
+        return baseScope + String.join(".", metricScope);
     }
 }