You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2022/11/02 12:20:52 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-29655][hotfix] Move ResourceLifecycleState to api module

This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new b87391f7 [FLINK-29655][hotfix] Move ResourceLifecycleState to api module
b87391f7 is described below

commit b87391f771bf55418df712ce59d31b93d588ad81
Author: Márton Balassi <ma...@apple.com>
AuthorDate: Wed Nov 2 13:20:47 2022 +0100

    [FLINK-29655][hotfix] Move ResourceLifecycleState to api module
---
 .../api}/lifecycle/ResourceLifecycleState.java     |  2 +-
 .../operator/api/status/CommonStatus.java          | 43 +++++++++++++++++++++
 .../kubernetes/operator/listener/AuditUtils.java   |  3 +-
 .../operator/metrics/OperatorMetricUtils.java      | 44 ----------------------
 .../metrics/lifecycle/LifecycleMetrics.java        | 20 +++++-----
 .../lifecycle/ResourceLifecycleMetricTracker.java  |  1 +
 .../kubernetes/operator/utils/StatusRecorder.java  |  6 +--
 .../controller/FlinkDeploymentControllerTest.java  |  6 +--
 .../lifecycle/ResourceLifecycleMetricsTest.java    | 37 +++++++++---------
 9 files changed, 79 insertions(+), 83 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleState.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/lifecycle/ResourceLifecycleState.java
similarity index 97%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleState.java
rename to flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/lifecycle/ResourceLifecycleState.java
index 324f5346..51d52a4b 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleState.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/lifecycle/ResourceLifecycleState.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.metrics.lifecycle;
+package org.apache.flink.kubernetes.operator.api.lifecycle;
 
 import lombok.Getter;
 
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
index 165fb1cd..8e7dfdce 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
@@ -18,12 +18,16 @@
 package org.apache.flink.kubernetes.operator.api.status;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
 import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.api.spec.JobState;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.experimental.SuperBuilder;
+import org.apache.commons.lang3.StringUtils;
 
 /** Last observed common status of the Flink deployment/Flink SessionJob. */
 @Experimental
@@ -45,4 +49,43 @@ public abstract class CommonStatus<SPEC extends AbstractFlinkSpec> {
      * @return Current {@link ReconciliationStatus}.
      */
     public abstract ReconciliationStatus<SPEC> getReconciliationStatus();
+
+    @JsonIgnore
+    public <STATUS extends CommonStatus<?>> ResourceLifecycleState getLifecycleState() {
+        var reconciliationStatus = getReconciliationStatus();
+
+        if (reconciliationStatus.isBeforeFirstDeployment()) {
+            return StringUtils.isEmpty(error)
+                    ? ResourceLifecycleState.CREATED
+                    : ResourceLifecycleState.FAILED;
+        }
+
+        switch (reconciliationStatus.getState()) {
+            case UPGRADING:
+                return ResourceLifecycleState.UPGRADING;
+            case ROLLING_BACK:
+                return ResourceLifecycleState.ROLLING_BACK;
+        }
+
+        var lastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec();
+        if (lastReconciledSpec.getJob() != null
+                && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED) {
+            return ResourceLifecycleState.SUSPENDED;
+        }
+
+        var jobState = getJobStatus().getState();
+        /* Ideally we should compare via org.apache.flink.api.common.JobStatus.valueOf(jobState),
+        however this would introduce a dependency on flink-core */
+        if (jobState != null && jobState.equals("FAILED")) {
+            return ResourceLifecycleState.FAILED;
+        }
+
+        if (reconciliationStatus.getState() == ReconciliationState.ROLLED_BACK) {
+            return ResourceLifecycleState.ROLLED_BACK;
+        } else if (reconciliationStatus.isLastReconciledSpecStable()) {
+            return ResourceLifecycleState.STABLE;
+        }
+
+        return ResourceLifecycleState.DEPLOYED;
+    }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java
index 743d8008..942d7a9c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java
@@ -19,7 +19,6 @@ package org.apache.flink.kubernetes.operator.listener;
 
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
-import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
 
 import io.fabric8.kubernetes.api.model.Event;
 import lombok.NonNull;
@@ -42,7 +41,7 @@ public class AuditUtils {
     }
 
     private static String format(@NonNull CommonStatus<?> status) {
-        var lifeCycleState = OperatorMetricUtils.getLifecycleState(status);
+        var lifeCycleState = status.getLifecycleState();
         return String.format(
                 ">>> Status | %-7s | %-15s | %s ",
                 StringUtils.isEmpty(status.getError()) ? "Info" : "Error",
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
index c51b8dc9..155817a8 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
@@ -21,11 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.core.plugin.PluginUtils;
-import org.apache.flink.kubernetes.operator.api.spec.JobState;
-import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
-import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
-import org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState;
 import org.apache.flink.kubernetes.operator.utils.EnvUtils;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Histogram;
@@ -41,7 +37,6 @@ import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.ReporterSetup;
 import org.apache.flink.runtime.metrics.util.MetricUtils;
 
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -98,45 +93,6 @@ public class OperatorMetricUtils {
         return Configuration.fromMap(metricConf);
     }
 
-    public static <STATUS extends CommonStatus<?>> ResourceLifecycleState getLifecycleState(
-            STATUS status) {
-        var reconciliationStatus = status.getReconciliationStatus();
-
-        if (reconciliationStatus.isBeforeFirstDeployment()) {
-            return StringUtils.isEmpty(status.getError())
-                    ? ResourceLifecycleState.CREATED
-                    : ResourceLifecycleState.FAILED;
-        }
-
-        switch (reconciliationStatus.getState()) {
-            case UPGRADING:
-                return ResourceLifecycleState.UPGRADING;
-            case ROLLING_BACK:
-                return ResourceLifecycleState.ROLLING_BACK;
-        }
-
-        var lastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec();
-        if (lastReconciledSpec.getJob() != null
-                && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED) {
-            return ResourceLifecycleState.SUSPENDED;
-        }
-
-        var jobState = status.getJobStatus().getState();
-        /* Ideally we should compare via org.apache.flink.api.common.JobStatus.valueOf(jobState),
-        however this would introduce a dependency on flink-core */
-        if (jobState != null && jobState.equals("FAILED")) {
-            return ResourceLifecycleState.FAILED;
-        }
-
-        if (reconciliationStatus.getState() == ReconciliationState.ROLLED_BACK) {
-            return ResourceLifecycleState.ROLLED_BACK;
-        } else if (reconciliationStatus.isLastReconciledSpecStable()) {
-            return ResourceLifecycleState.STABLE;
-        }
-
-        return ResourceLifecycleState.DEPLOYED;
-    }
-
     private static MetricRegistryImpl createMetricRegistry(
             Configuration configuration, PluginManager pluginManager) {
         return new MetricRegistryImpl(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
index 084a664f..0a5a2f5d 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.operator.metrics.lifecycle;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.metrics.CustomResourceMetrics;
 import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
@@ -41,13 +42,13 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 
-import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.CREATED;
-import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.DEPLOYED;
-import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.ROLLED_BACK;
-import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.ROLLING_BACK;
-import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.STABLE;
-import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.SUSPENDED;
-import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.UPGRADING;
+import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.CREATED;
+import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.DEPLOYED;
+import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.ROLLED_BACK;
+import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.ROLLING_BACK;
+import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.STABLE;
+import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.SUSPENDED;
+import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.UPGRADING;
 
 /**
  * Utility for tracking resource lifecycle metrics globally and per namespace.
@@ -96,8 +97,7 @@ public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>>
     @Override
     public void onUpdate(CR cr) {
         var status = cr.getStatus();
-        getLifecycleMetricTracker(cr)
-                .onUpdate(OperatorMetricUtils.getLifecycleState(status), clock.instant());
+        getLifecycleMetricTracker(cr).onUpdate(status.getLifecycleState(), clock.instant());
     }
 
     @Override
@@ -112,7 +112,7 @@ public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>>
         return lifecycleTrackers.computeIfAbsent(
                 Tuple2.of(cr.getMetadata().getNamespace(), cr.getMetadata().getName()),
                 k -> {
-                    var initialState = OperatorMetricUtils.getLifecycleState(cr.getStatus());
+                    var initialState = cr.getStatus().getLifecycleState();
                     var time =
                             initialState == CREATED
                                     ? Instant.parse(cr.getMetadata().getCreationTimestamp())
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricTracker.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricTracker.java
index bcba1e2a..deec24f9 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricTracker.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricTracker.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.metrics.lifecycle;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
 import org.apache.flink.metrics.Histogram;
 
 import org.slf4j.Logger;
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
index 26ca5671..c0b48faf 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
@@ -21,14 +21,13 @@ package org.apache.flink.kubernetes.operator.utils;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
 import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
 import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
 import org.apache.flink.kubernetes.operator.listener.AuditUtils;
 import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
 import org.apache.flink.kubernetes.operator.metrics.MetricManager;
-import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
-import org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -142,8 +141,7 @@ public class StatusRecorder<
         } else {
             // Initialize cache with current status copy
             statusCache.put(key, objectMapper.convertValue(resource.getStatus(), ObjectNode.class));
-            if (ResourceLifecycleState.CREATED.equals(
-                    OperatorMetricUtils.getLifecycleState(resource.getStatus()))) {
+            if (ResourceLifecycleState.CREATED.equals(resource.getStatus().getLifecycleState())) {
                 statusUpdateListener.accept(resource, resource.getStatus());
             }
         }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 2be7d715..2bb1f02c 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.IngressSpec;
@@ -34,8 +35,6 @@ import org.apache.flink.kubernetes.operator.api.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.api.status.TaskManagerInfo;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
-import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
-import org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
@@ -988,8 +987,7 @@ public class FlinkDeploymentControllerTest {
 
         assertEquals(1, testController.events().size());
         assertEquals(
-                ResourceLifecycleState.FAILED,
-                OperatorMetricUtils.getLifecycleState(flinkDeployment.getStatus()));
+                ResourceLifecycleState.FAILED, flinkDeployment.getStatus().getLifecycleState());
 
         var event = testController.events().remove();
         assertEquals("Warning", event.getType());
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
index 52452d9c..90e72524 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
 import org.apache.flink.kubernetes.operator.api.spec.JobState;
 import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
@@ -44,14 +45,14 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.LongStream;
 
-import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.CREATED;
-import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.DEPLOYED;
-import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.FAILED;
-import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.ROLLED_BACK;
-import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.ROLLING_BACK;
-import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.STABLE;
-import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.SUSPENDED;
-import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.UPGRADING;
+import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.CREATED;
+import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.DEPLOYED;
+import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.FAILED;
+import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.ROLLED_BACK;
+import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.ROLLING_BACK;
+import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.STABLE;
+import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.SUSPENDED;
+import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.UPGRADING;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -63,22 +64,22 @@ public class ResourceLifecycleMetricsTest {
     @Test
     public void lifecycleStateTest() {
         var application = TestUtils.buildApplicationCluster();
-        assertEquals(CREATED, OperatorMetricUtils.getLifecycleState(application.getStatus()));
+        assertEquals(CREATED, application.getStatus().getLifecycleState());
 
         ReconciliationUtils.updateStatusBeforeDeploymentAttempt(application, new Configuration());
-        assertEquals(UPGRADING, OperatorMetricUtils.getLifecycleState(application.getStatus()));
+        assertEquals(UPGRADING, application.getStatus().getLifecycleState());
 
         ReconciliationUtils.updateStatusForDeployedSpec(application, new Configuration());
-        assertEquals(DEPLOYED, OperatorMetricUtils.getLifecycleState(application.getStatus()));
+        assertEquals(DEPLOYED, application.getStatus().getLifecycleState());
 
         application.getStatus().getReconciliationStatus().markReconciledSpecAsStable();
-        assertEquals(STABLE, OperatorMetricUtils.getLifecycleState(application.getStatus()));
+        assertEquals(STABLE, application.getStatus().getLifecycleState());
 
         application.getStatus().setError("errr");
-        assertEquals(STABLE, OperatorMetricUtils.getLifecycleState(application.getStatus()));
+        assertEquals(STABLE, application.getStatus().getLifecycleState());
 
         application.getStatus().getJobStatus().setState(JobStatus.FAILED.name());
-        assertEquals(FAILED, OperatorMetricUtils.getLifecycleState(application.getStatus()));
+        assertEquals(FAILED, application.getStatus().getLifecycleState());
 
         application.getStatus().setError("");
 
@@ -86,19 +87,19 @@ public class ResourceLifecycleMetricsTest {
                 .getStatus()
                 .getReconciliationStatus()
                 .setState(ReconciliationState.ROLLING_BACK);
-        assertEquals(ROLLING_BACK, OperatorMetricUtils.getLifecycleState(application.getStatus()));
+        assertEquals(ROLLING_BACK, application.getStatus().getLifecycleState());
 
         application.getStatus().getJobStatus().setState(JobStatus.RECONCILING.name());
         application.getStatus().getReconciliationStatus().setState(ReconciliationState.ROLLED_BACK);
-        assertEquals(ROLLED_BACK, OperatorMetricUtils.getLifecycleState(application.getStatus()));
+        assertEquals(ROLLED_BACK, application.getStatus().getLifecycleState());
 
         application.getStatus().getJobStatus().setState(JobStatus.FAILED.name());
-        assertEquals(FAILED, OperatorMetricUtils.getLifecycleState(application.getStatus()));
+        assertEquals(FAILED, application.getStatus().getLifecycleState());
 
         application.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
         application.getSpec().getJob().setState(JobState.SUSPENDED);
         ReconciliationUtils.updateStatusForDeployedSpec(application, new Configuration());
-        assertEquals(SUSPENDED, OperatorMetricUtils.getLifecycleState(application.getStatus()));
+        assertEquals(SUSPENDED, application.getStatus().getLifecycleState());
     }
 
     @Test