You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/09/29 19:15:48 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-29474] Fix global lifecycle metrics group to include resource type

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ae66dbd [FLINK-29474] Fix global lifecycle metrics group to include resource type
3ae66dbd is described below

commit 3ae66dbddd4126a19c1e8f0105027bc370f5e753
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Sep 29 16:48:44 2022 +0200

    [FLINK-29474] Fix global lifecycle metrics group to include resource type
---
 .../metrics/lifecycle/LifecycleMetrics.java        |  8 ++-
 .../lifecycle/ResourceLifecycleMetricsTest.java    | 61 +++++++++++++++++++---
 2 files changed, 61 insertions(+), 8 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
index dde85075..8633a8f1 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
@@ -160,7 +160,9 @@ public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>>
                                 name ->
                                         Tuple2.of(
                                                 createTransitionHistogram(
-                                                        name, operatorMetricGroup),
+                                                        name,
+                                                        operatorMetricGroup.addGroup(
+                                                                cr.getClass().getSimpleName())),
                                                 new ConcurrentHashMap<>())));
 
         this.stateTimeMetrics = new ConcurrentHashMap<>();
@@ -168,7 +170,9 @@ public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>>
             stateTimeMetrics.put(
                     state,
                     Tuple2.of(
-                            createStateTimeHistogram(state, operatorMetricGroup),
+                            createStateTimeHistogram(
+                                    state,
+                                    operatorMetricGroup.addGroup(cr.getClass().getSimpleName())),
                             new ConcurrentHashMap<>()));
         }
     }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
index 3240dc6c..cc41bc80 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
@@ -23,13 +23,16 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.metrics.CustomResourceMetrics;
 import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions;
 import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
+import org.apache.flink.kubernetes.operator.metrics.TestingMetricListener;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.metrics.Histogram;
 
@@ -52,6 +55,7 @@ import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLif
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Test for resource lifecycle metrics. */
 public class ResourceLifecycleMetricsTest {
@@ -129,7 +133,7 @@ public class ResourceLifecycleMetricsTest {
         lifecycleTracker.onUpdate(UPGRADING, Instant.ofEpochMilli(ts += 1000));
         lifecycleTracker.onUpdate(DEPLOYED, Instant.ofEpochMilli(ts += 1000));
         lifecycleTracker.onUpdate(DEPLOYED, Instant.ofEpochMilli(ts += 1000));
-        lifecycleTracker.onUpdate(STABLE, Instant.ofEpochMilli(ts += 1000));
+        lifecycleTracker.onUpdate(STABLE, Instant.ofEpochMilli(ts + 1000));
 
         validateTransition(transitionHistos, "Resume", 1, 4);
         validateTransition(transitionHistos, "Upgrade", 1, 5);
@@ -225,12 +229,57 @@ public class ResourceLifecycleMetricsTest {
         metricManager.onUpdate(dep3);
     }
 
-    public static LifecycleMetrics<FlinkDeployment> getLifeCycleMetrics(
-            MetricManager<FlinkDeployment> metricManager) {
-        for (CustomResourceMetrics<FlinkDeployment> metrics :
-                metricManager.getRegisteredMetrics()) {
+    @Test
+    public void testGlobalHistoNames() {
+        var conf = new Configuration();
+        var testingMetricListener = new TestingMetricListener(new Configuration());
+        var deploymentMetricManager =
+                MetricManager.createFlinkDeploymentMetricManager(
+                        new FlinkConfigManager(conf), testingMetricListener.getMetricGroup());
+        var deploymentLifecycleMetrics = getLifeCycleMetrics(deploymentMetricManager);
+        deploymentLifecycleMetrics.onUpdate(TestUtils.buildApplicationCluster());
+        testGlobalHistoNames(testingMetricListener, FlinkDeployment.class);
+
+        var sessionJobMetricManager =
+                MetricManager.createFlinkSessionJobMetricManager(
+                        new FlinkConfigManager(conf), testingMetricListener.getMetricGroup());
+        var sessionJobLifecycleMetrics = getLifeCycleMetrics(sessionJobMetricManager);
+        sessionJobLifecycleMetrics.onUpdate(TestUtils.buildSessionJob());
+
+        testGlobalHistoNames(testingMetricListener, FlinkSessionJob.class);
+    }
+
+    private void testGlobalHistoNames(TestingMetricListener metricListener, Class<?> resoureClass) {
+        for (var state : ResourceLifecycleState.values()) {
+            assertTrue(
+                    metricListener
+                            .getHistogram(
+                                    String.format(
+                                            metricListener.getMetricId(
+                                                    "%s.Lifecycle.State.%s.TimeSeconds"),
+                                            resoureClass.getSimpleName(),
+                                            state))
+                            .isPresent());
+        }
+
+        for (var transition : LifecycleMetrics.TRACKED_TRANSITIONS) {
+            assertTrue(
+                    metricListener
+                            .getHistogram(
+                                    String.format(
+                                            metricListener.getMetricId(
+                                                    "%s.Lifecycle.Transition.%s.TimeSeconds"),
+                                            resoureClass.getSimpleName(),
+                                            transition.metricName))
+                            .isPresent());
+        }
+    }
+
+    public static <T extends AbstractFlinkResource<?, ?>> LifecycleMetrics<T> getLifeCycleMetrics(
+            MetricManager<T> metricManager) {
+        for (CustomResourceMetrics<?> metrics : metricManager.getRegisteredMetrics()) {
             if (metrics instanceof LifecycleMetrics) {
-                return (LifecycleMetrics<FlinkDeployment>) metrics;
+                return (LifecycleMetrics<T>) metrics;
             }
         }
         return null;