You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/07/13 11:50:45 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-28479] Add metrics for resource lifecycle state transitions

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 924b4f0  [FLINK-28479] Add metrics for resource lifecycle state transitions
924b4f0 is described below

commit 924b4f0fb04eb6ca26dcedf83c1a852fc81807a5
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Wed Jul 13 13:12:43 2022 +0200

    [FLINK-28479] Add metrics for resource lifecycle state transitions
---
 docs/content/docs/operations/metrics-logging.md    |  38 ++-
 .../kubernetes_operator_metric_configuration.html  |  12 +
 .../flink/kubernetes/operator/FlinkOperator.java   |  11 +-
 .../operator/config/FlinkConfigManager.java        |   2 +-
 .../operator/crd/status/CommonStatus.java          |  40 +++
 .../operator/metrics/KubernetesClientMetrics.java  |  42 ++--
 .../metrics/KubernetesOperatorMetricOptions.java   |  14 ++
 .../kubernetes/operator/metrics/MetricManager.java |  42 +++-
 .../operator/metrics/OperatorJosdkMetrics.java     |  26 +-
 .../operator/metrics/OperatorMetricUtils.java      | 121 +++++++++
 .../metrics/lifecycle/LifecycleMetrics.java        | 278 +++++++++++++++++++++
 .../lifecycle/ResourceLifecycleMetricTracker.java  | 142 +++++++++++
 .../metrics/lifecycle/ResourceLifecycleState.java  |  60 +++++
 .../flink/kubernetes/operator/TestUtils.java       |   4 +-
 .../operator/metrics/OperatorJosdkMetricsTest.java |   3 +-
 .../lifecycle/ResourceLifecycleMetricsTest.java    | 276 ++++++++++++++++++++
 16 files changed, 1061 insertions(+), 50 deletions(-)

diff --git a/docs/content/docs/operations/metrics-logging.md b/docs/content/docs/operations/metrics-logging.md
index 4769381..5f01982 100644
--- a/docs/content/docs/operations/metrics-logging.md
+++ b/docs/content/docs/operations/metrics-logging.md
@@ -28,14 +28,40 @@ under the License.
 
 The Flink Kubernetes Operator (Operator) extends the [Flink Metric System](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/) that allows gathering and exposing metrics to centralized monitoring solutions. 
 
-## Deployment Metrics
+## Flink Resource Metrics
 The Operator gathers aggregates metrics about managed resources.
 
-| Scope     | Metrics                                           | Description                                                                                                                                                 | Type  |
-|-----------|---------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------|-------|
-| Namespace | FlinkDeployment.Count                             | Number of managed FlinkDeployment instances per namespace                                                                                                   | Gauge |
-| Namespace | FlinkDeployment.JmDeploymentStatus.<Status>.Count | Number of managed FlinkDeployment resources per <Status> per namespace. <Status> can take values from: READY, DEPLOYED_NOT_READY, DEPLOYING, MISSING, ERROR | Gauge |
-| Namespace | FlinkSessionJob.Count                             | Number of managed FlinkSessionJob instances per namespace                                                                                                   | Gauge |
+| Scope            | Metrics                                                                       | Description                                                                                                                                                                            | Type      |
+|------------------|-------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------|
+| Namespace        | FlinkDeployment/FlinkSessionJob.Count                                         | Number of managed FlinkDeployment/SessionJob instances per namespace                                                                                                                   | Gauge     |
+| Namespace        | FlinkDeployment.JmDeploymentStatus.<Status>.Count                             | Number of managed FlinkDeployment resources per <Status> per namespace. <Status> can take values from: READY, DEPLOYED_NOT_READY, DEPLOYING, MISSING, ERROR                            | Gauge     |
+| Namespace        | FlinkDeployment/FlinkSessionJob.Lifecycle.State.<State>.Count                 | Number of managed resources currently in state <State> per namespace. <State> can take values from: CREATED, SUSPENDED, UPGRADING, DEPLOYED, STABLE, ROLLING_BACK, ROLLED_BACK, FAILED | Gauge     |
+| System/Namespace | FlinkDeployment/FlinkSessionJob.Lifecycle.State.<State>.TimeSeconds           | Time spent in state <State> for a given resource. <State> can take values from: CREATED, SUSPENDED, UPGRADING, DEPLOYED, STABLE, ROLLING_BACK, ROLLED_BACK, FAILED                     | Histogram |
+| System/Namespace | FlinkDeployment/FlinkSessionJob.Lifecycle.Transition.<Transition>.TimeSeconds | Time statistics for selected lifecycle state transitions. <Transition> can take values from: Resume, Upgrade, Suspend, Stabilization, Rollback, Submission                             | Histogram |
+
+### Lifecycle metrics
+
+Based on the resource status the operator tracks the following resource lifecycle states:
+
+ - CREATED : The resource was created in Kubernetes but not yet handled by the operator
+ - SUSPENDED : The (job) resource has been suspended
+ - UPGRADING : The resource is suspended before upgrading to a new spec
+ - DEPLOYED : The resource is deployed/submitted to Kubernetes, but it's not yet considered to be stable and might be rolled back in the future
+ - STABLE : The resource deployment is considered to be stable and won't be rolled back
+ - ROLLING_BACK : The resource is being rolled back to the last stable spec
+ - ROLLED_BACK : The resource is deployed with the last stable spec
+ - FAILED : The job terminally failed
+
+The number of resources and time spend in each of these states at any given time is tracked by the `Lifecycle.<STATE>.Count` and `Lifecycle.<STATE>.TimeSeconds` metrics.
+
+In addition to the simple counts we further track a few selected state transitions:
+
+ - Upgrade : End-to-end resource upgrade time from stable to stable
+ - Resume : Time from suspended to stable
+ - Suspend : Time for any suspend operation
+ - Stabilization : Time from deployed to stable state
+ - Rollback : Time from deployed to rolled_back state if the resource was rolled back
+ - Submission: Flink resource submission time
 
 ## Kubernetes Client Metrics
 
diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
index 22fdd1d..9e934ea 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
@@ -26,6 +26,18 @@
             <td>Integer</td>
             <td>Defines the number of measured samples when calculating statistics.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.resource.lifecycle.metrics.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Enable resource lifecycle state metrics. This enables both state and transition counts/histograms.</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.resource.lifecycle.namespace.histograms.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>In addition to the system level histograms, enable per namespace tracking of state and transition times.</td>
+        </tr>
         <tr>
             <td><h5>metrics.scope.k8soperator.resource</h5></td>
             <td style="word-wrap: break-word;">"&lt;host&gt;.k8soperator.&lt;namespace&gt;.&lt;name&gt;.resource.&lt;resourcens&gt;.&lt;resourcename&gt;"</td>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 996e67b..c0bc279 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -113,17 +113,14 @@ public class FlinkOperator {
             overrider.withConcurrentReconciliationThreads(parallelism);
         }
         if (configManager.getOperatorConfiguration().isJosdkMetricsEnabled()) {
-            overrider.withMetrics(
-                    new OperatorJosdkMetrics(metricGroup, configManager.getDefaultConfig()));
+            overrider.withMetrics(new OperatorJosdkMetrics(metricGroup, configManager));
         }
     }
 
     private void registerDeploymentController() {
         var statusRecorder =
                 StatusRecorder.<FlinkDeploymentStatus>create(
-                        client,
-                        new MetricManager<>(metricGroup, configManager.getDefaultConfig()),
-                        listeners);
+                        client, new MetricManager<>(metricGroup, configManager), listeners);
         var eventRecorder = EventRecorder.create(client, listeners);
         var reconcilerFactory =
                 new ReconcilerFactory(
@@ -146,9 +143,7 @@ public class FlinkOperator {
         var eventRecorder = EventRecorder.create(client, listeners);
         var statusRecorder =
                 StatusRecorder.<FlinkSessionJobStatus>create(
-                        client,
-                        new MetricManager<>(metricGroup, configManager.getDefaultConfig()),
-                        listeners);
+                        client, new MetricManager<>(metricGroup, configManager), listeners);
         var reconciler =
                 new SessionJobReconciler(
                         client, flinkService, configManager, eventRecorder, statusRecorder);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
index c70dbae..0340891 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
@@ -113,7 +113,7 @@ public class FlinkConfigManager {
     }
 
     public Configuration getDefaultConfig() {
-        return defaultConfig;
+        return defaultConfig.clone();
     }
 
     @VisibleForTesting
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/CommonStatus.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/CommonStatus.java
index 7611b0e..76e0e02 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/CommonStatus.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/CommonStatus.java
@@ -19,7 +19,10 @@ package org.apache.flink.kubernetes.operator.crd.status;
 
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
@@ -45,4 +48,41 @@ public abstract class CommonStatus<SPEC extends AbstractFlinkSpec> {
      * @return Current {@link ReconciliationStatus}.
      */
     public abstract ReconciliationStatus<SPEC> getReconciliationStatus();
+
+    @JsonIgnore
+    public ResourceLifecycleState getLifecycleState() {
+        var reconciliationStatus = getReconciliationStatus();
+
+        if (reconciliationStatus.isFirstDeployment()) {
+            return ResourceLifecycleState.CREATED;
+        }
+
+        switch (reconciliationStatus.getState()) {
+            case UPGRADING:
+                return ResourceLifecycleState.UPGRADING;
+            case ROLLING_BACK:
+                return ResourceLifecycleState.ROLLING_BACK;
+        }
+
+        var lastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec();
+        if (lastReconciledSpec.getJob() != null
+                && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED) {
+            return ResourceLifecycleState.SUSPENDED;
+        }
+
+        var jobState = getJobStatus().getState();
+        if (jobState != null
+                && org.apache.flink.api.common.JobStatus.valueOf(jobState)
+                        == org.apache.flink.api.common.JobStatus.FAILED) {
+            return ResourceLifecycleState.FAILED;
+        }
+
+        if (reconciliationStatus.getState() == ReconciliationState.ROLLED_BACK) {
+            return ResourceLifecycleState.ROLLED_BACK;
+        } else if (reconciliationStatus.isLastReconciledSpecStable()) {
+            return ResourceLifecycleState.STABLE;
+        }
+
+        return ResourceLifecycleState.DEPLOYED;
+    }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetrics.java
index 1a9733d..fe58226 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetrics.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetrics.java
@@ -19,11 +19,11 @@
 package org.apache.flink.kubernetes.operator.metrics;
 
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils.SynchronizedMeterView;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
 
 import okhttp3.Interceptor;
 import okhttp3.Request;
@@ -55,9 +55,9 @@ public class KubernetesClientMetrics implements Interceptor {
     private final Counter failedRequestCounter;
     private final Counter responseCounter;
 
-    private final MeterView requestRateMeter;
-    private final MeterView requestFailedRateMeter;
-    private final MeterView responseRateMeter;
+    private final SynchronizedMeterView requestRateMeter;
+    private final SynchronizedMeterView requestFailedRateMeter;
+    private final SynchronizedMeterView responseRateMeter;
 
     private final Map<Integer, Counter> responseCodeCounters = new ConcurrentHashMap<>();
     private final Map<String, Counter> requestMethodCounter = new ConcurrentHashMap<>();
@@ -70,20 +70,26 @@ public class KubernetesClientMetrics implements Interceptor {
         this.failedRequestMetricGroup = requestMetricGroup.addGroup(HTTP_REQUEST_FAILED_GROUP);
         this.responseMetricGroup = metricGroup.addGroup(HTTP_RESPONSE_GROUP);
 
-        this.requestCounter = requestMetricGroup.counter(COUNTER);
-        this.failedRequestCounter = failedRequestMetricGroup.counter(COUNTER);
-        this.responseCounter = responseMetricGroup.counter(COUNTER);
+        this.requestCounter =
+                OperatorMetricUtils.synchronizedCounter(requestMetricGroup.counter(COUNTER));
+        this.failedRequestCounter =
+                OperatorMetricUtils.synchronizedCounter(failedRequestMetricGroup.counter(COUNTER));
+        this.responseCounter =
+                OperatorMetricUtils.synchronizedCounter(responseMetricGroup.counter(COUNTER));
 
-        this.requestRateMeter = requestMetricGroup.meter(METER, new MeterView(requestCounter));
+        this.requestRateMeter =
+                OperatorMetricUtils.synchronizedMeterView(
+                        requestMetricGroup.meter(METER, new MeterView(requestCounter)));
         this.requestFailedRateMeter =
-                failedRequestMetricGroup.meter(METER, new MeterView(failedRequestCounter));
-        this.responseRateMeter = responseMetricGroup.meter(METER, new MeterView(responseCounter));
+                OperatorMetricUtils.synchronizedMeterView(
+                        failedRequestMetricGroup.meter(METER, new MeterView(failedRequestCounter)));
+        this.responseRateMeter =
+                OperatorMetricUtils.synchronizedMeterView(
+                        responseMetricGroup.meter(METER, new MeterView(responseCounter)));
 
         this.responseLatency =
                 responseMetricGroup.histogram(
-                        HISTO,
-                        new DescriptiveStatisticsHistogram(
-                                flinkOperatorConfiguration.getMetricsHistogramSampleSize()));
+                        HISTO, OperatorMetricUtils.createHistogram(flinkOperatorConfiguration));
 
         Executors.newSingleThreadScheduledExecutor()
                 .scheduleAtFixedRate(this::updateMeters, 0, 1, TimeUnit.SECONDS);
@@ -121,12 +127,18 @@ public class KubernetesClientMetrics implements Interceptor {
 
     private Counter getCounterByRequestMethod(String method) {
         return requestMethodCounter.computeIfAbsent(
-                method, key -> requestMetricGroup.addGroup(key).counter(COUNTER));
+                method,
+                key ->
+                        OperatorMetricUtils.synchronizedCounter(
+                                requestMetricGroup.addGroup(key).counter(COUNTER)));
     }
 
     private Counter getCounterByResponseCode(int code) {
         return responseCodeCounters.computeIfAbsent(
-                code, key -> responseMetricGroup.addGroup(key).counter(COUNTER));
+                code,
+                key ->
+                        OperatorMetricUtils.synchronizedCounter(
+                                responseMetricGroup.addGroup(key).counter(COUNTER)));
     }
 
     private void updateMeters() {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
index ff6bed7..c8628b3 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
@@ -37,6 +37,20 @@ public class KubernetesOperatorMetricOptions {
                     .withDescription(
                             "Enable KubernetesClient metrics for measuring the HTTP traffic to the Kubernetes API Server.");
 
+    public static final ConfigOption<Boolean> OPERATOR_LIFECYCLE_METRICS_ENABLED =
+            ConfigOptions.key("kubernetes.operator.resource.lifecycle.metrics.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Enable resource lifecycle state metrics. This enables both state and transition counts/histograms.");
+
+    public static final ConfigOption<Boolean> OPERATOR_LIFECYCLE_NAMESPACE_HISTOGRAMS_ENABLED =
+            ConfigOptions.key("kubernetes.operator.resource.lifecycle.namespace.histograms.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "In addition to the system level histograms, enable per namespace tracking of state and transition times.");
+
     public static final ConfigOption<Integer> OPERATOR_METRICS_HISTOGRAM_SAMPLE_SIZE =
             ConfigOptions.key("kubernetes.operator.metrics.histogram.sample.size")
                     .intType()
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
index 18b96b4..307019a 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
@@ -17,32 +17,52 @@
 
 package org.apache.flink.kubernetes.operator.metrics;
 
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.metrics.lifecycle.LifecycleMetrics;
 
-import io.fabric8.kubernetes.client.CustomResource;
-
+import java.time.Clock;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 /** Metric manager for Operator managed custom resources. */
-public class MetricManager<CR extends CustomResource<?, ?>> {
+public class MetricManager<CR extends AbstractFlinkResource<?, ?>> {
     private final KubernetesOperatorMetricGroup opMetricGroup;
-    private final Configuration conf;
+    private final FlinkConfigManager configManager;
     private final Map<String, CustomResourceMetrics> metrics = new ConcurrentHashMap<>();
 
-    public MetricManager(KubernetesOperatorMetricGroup opMetricGroup, Configuration conf) {
+    private final LifecycleMetrics<CR> lifeCycleMetrics;
+
+    public MetricManager(
+            KubernetesOperatorMetricGroup opMetricGroup, FlinkConfigManager configManager) {
         this.opMetricGroup = opMetricGroup;
-        this.conf = conf;
+        this.configManager = configManager;
+
+        if (configManager
+                .getDefaultConfig()
+                .get(KubernetesOperatorMetricOptions.OPERATOR_LIFECYCLE_METRICS_ENABLED)) {
+            this.lifeCycleMetrics =
+                    new LifecycleMetrics<>(configManager, Clock.systemDefaultZone(), opMetricGroup);
+        } else {
+            this.lifeCycleMetrics = null;
+        }
     }
 
     public void onUpdate(CR cr) {
         getCustomResourceMetrics(cr).onUpdate(cr);
+        if (lifeCycleMetrics != null) {
+            lifeCycleMetrics.onUpdate(cr);
+        }
     }
 
     public void onRemove(CR cr) {
         getCustomResourceMetrics(cr).onRemove(cr);
+        if (lifeCycleMetrics != null) {
+            lifeCycleMetrics.onRemove(cr);
+        }
     }
 
     private CustomResourceMetrics getCustomResourceMetrics(CR cr) {
@@ -52,7 +72,8 @@ public class MetricManager<CR extends CustomResource<?, ?>> {
 
     private CustomResourceMetrics getCustomResourceMetricsImpl(CR cr) {
         var namespaceMg =
-                opMetricGroup.createResourceNamespaceGroup(conf, cr.getMetadata().getNamespace());
+                opMetricGroup.createResourceNamespaceGroup(
+                        configManager.getDefaultConfig(), cr.getMetadata().getNamespace());
         if (cr instanceof FlinkDeployment) {
             return new FlinkDeploymentMetrics(namespaceMg);
         } else if (cr instanceof FlinkSessionJob) {
@@ -61,4 +82,9 @@ public class MetricManager<CR extends CustomResource<?, ?>> {
             throw new IllegalArgumentException("Unknown CustomResource");
         }
     }
+
+    @VisibleForTesting
+    public LifecycleMetrics<CR> getLifeCycleMetrics() {
+        return lifeCycleMetrics;
+    }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
index 538f01b..15a14ca 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
@@ -18,13 +18,12 @@
 package org.apache.flink.kubernetes.operator.metrics;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.util.clock.Clock;
 import org.apache.flink.util.clock.SystemClock;
@@ -51,10 +50,9 @@ public class OperatorJosdkMetrics implements Metrics {
     private static final String RECONCILIATION = "Reconciliation";
     private static final String RESOURCE = "Resource";
     private static final String EVENT = "Event";
-    private static final int WINDOW_SIZE = 1000;
 
     private final KubernetesOperatorMetricGroup operatorMetricGroup;
-    private final Configuration conf;
+    private final FlinkConfigManager configManager;
     private final Clock clock;
 
     private final Map<ResourceID, KubernetesResourceNamespaceMetricGroup> resourceNsMetricGroups =
@@ -73,9 +71,9 @@ public class OperatorJosdkMetrics implements Metrics {
                     "FlinkSessionJob");
 
     public OperatorJosdkMetrics(
-            KubernetesOperatorMetricGroup operatorMetricGroup, Configuration conf) {
+            KubernetesOperatorMetricGroup operatorMetricGroup, FlinkConfigManager configManager) {
         this.operatorMetricGroup = operatorMetricGroup;
-        this.conf = conf;
+        this.configManager = configManager;
         this.clock = SystemClock.getInstance();
     }
 
@@ -87,6 +85,7 @@ public class OperatorJosdkMetrics implements Metrics {
             histogram(execution, execution.successTypeName(result)).update(toSeconds(startTime));
             return result;
         } catch (Exception e) {
+            var h = histogram(execution, "failed");
             histogram(execution, "failed").update(toSeconds(startTime));
             throw e;
         }
@@ -159,7 +158,9 @@ public class OperatorJosdkMetrics implements Metrics {
                 String.join(".", group.getScopeComponents()),
                 s ->
                         finalGroup.histogram(
-                                "TimeSeconds", new DescriptiveStatisticsHistogram(WINDOW_SIZE)));
+                                "TimeSeconds",
+                                OperatorMetricUtils.createHistogram(
+                                        configManager.getOperatorConfiguration())));
     }
 
     private long toSeconds(long startTime) {
@@ -177,7 +178,8 @@ public class OperatorJosdkMetrics implements Metrics {
         }
         var finalGroup = group;
         return counters.computeIfAbsent(
-                String.join(".", group.getScopeComponents()), s -> finalGroup.counter("Count"));
+                String.join(".", group.getScopeComponents()),
+                s -> OperatorMetricUtils.synchronizedCounter(finalGroup.counter("Count")));
     }
 
     private KubernetesResourceNamespaceMetricGroup getResourceNsMg(ResourceID resourceID) {
@@ -185,12 +187,16 @@ public class OperatorJosdkMetrics implements Metrics {
                 resourceID,
                 rid ->
                         operatorMetricGroup.createResourceNamespaceGroup(
-                                conf, rid.getNamespace().orElse("default")));
+                                configManager.getDefaultConfig(),
+                                rid.getNamespace().orElse("default")));
     }
 
     private KubernetesResourceMetricGroup getResourceMg(ResourceID resourceID) {
         return resourceMetricGroups.computeIfAbsent(
                 resourceID,
-                rid -> getResourceNsMg(rid).createResourceNamespaceGroup(conf, rid.getName()));
+                rid ->
+                        getResourceNsMg(rid)
+                                .createResourceNamespaceGroup(
+                                        configManager.getDefaultConfig(), rid.getName()));
     }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
index 6040d39..0ff3f2b 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
@@ -21,8 +21,16 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.utils.EnvUtils;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
@@ -85,4 +93,117 @@ public class OperatorMetricUtils {
                 MetricRegistryConfiguration.fromConfiguration(configuration, Long.MAX_VALUE),
                 ReporterSetup.fromConfiguration(configuration, pluginManager));
     }
+
+    public static Histogram synchronizedHistogram(Histogram histogram) {
+        return new SynchronizedHistogram(histogram);
+    }
+
+    public static Counter synchronizedCounter(Counter counter) {
+        return new SynchronizedCounter(counter);
+    }
+
+    public static SynchronizedMeterView synchronizedMeterView(MeterView meterView) {
+        return new SynchronizedMeterView(meterView);
+    }
+
+    public static Histogram createHistogram(FlinkOperatorConfiguration operatorConfiguration) {
+        return synchronizedHistogram(
+                new DescriptiveStatisticsHistogram(
+                        operatorConfiguration.getMetricsHistogramSampleSize()));
+    }
+
+    /** Thread safe {@link Histogram} wrapper. */
+    public static class SynchronizedHistogram implements Histogram {
+
+        private final Histogram delegate;
+
+        public SynchronizedHistogram(Histogram delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public synchronized void update(long l) {
+            delegate.update(l);
+        }
+
+        @Override
+        public synchronized long getCount() {
+            return delegate.getCount();
+        }
+
+        @Override
+        public synchronized HistogramStatistics getStatistics() {
+            return delegate.getStatistics();
+        }
+    }
+
+    /** Thread safe {@link Counter} wrapper. */
+    public static class SynchronizedCounter implements Counter {
+
+        private final Counter delegate;
+
+        public SynchronizedCounter(Counter delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public synchronized void inc() {
+            delegate.inc();
+        }
+
+        @Override
+        public synchronized void inc(long l) {
+            delegate.inc(l);
+        }
+
+        @Override
+        public synchronized void dec() {
+            delegate.dec();
+        }
+
+        @Override
+        public synchronized void dec(long l) {
+            delegate.dec(l);
+        }
+
+        @Override
+        public synchronized long getCount() {
+            return delegate.getCount();
+        }
+    }
+
+    /** Thread safe {@link MeterView} wrapper. */
+    public static class SynchronizedMeterView implements Meter, View {
+
+        private final MeterView delegate;
+
+        public SynchronizedMeterView(MeterView delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public synchronized void markEvent() {
+            delegate.markEvent();
+        }
+
+        @Override
+        public synchronized void markEvent(long l) {
+            delegate.markEvent(l);
+        }
+
+        @Override
+        public synchronized double getRate() {
+            return delegate.getRate();
+        }
+
+        @Override
+        public synchronized long getCount() {
+            return delegate.getCount();
+        }
+
+        @Override
+        public synchronized void update() {
+            delegate.update();
+        }
+    }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
new file mode 100644
index 0000000..c2ef91e
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics.lifecycle;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
+import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions;
+import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.CREATED;
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.DEPLOYED;
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.ROLLED_BACK;
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.ROLLING_BACK;
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.STABLE;
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.SUSPENDED;
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.UPGRADING;
+
+/**
+ * Utility for tracking resource lifecycle metrics globally and per namespace.
+ *
+ * @param <CR> Flink resource type.
+ */
+public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>> {
+
+    private static final String TRANSITION_RESUME = "Resume";
+    private static final String TRANSITION_UPGRADE = "Upgrade";
+    private static final String TRANSITION_SUSPEND = "Suspend";
+    private static final String TRANSITION_SUBMISSION = "Submission";
+    private static final String TRANSITION_STABILIZATION = "Stabilization";
+    private static final String TRANSITION_ROLLBACK = "Rollback";
+
+    public static final List<Transition> TRACKED_TRANSITIONS = getTrackedTransitions();
+
+    private final Map<Tuple2<String, String>, ResourceLifecycleMetricTracker> lifecycleTrackers =
+            new ConcurrentHashMap<>();
+    private final Set<String> namespaces = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+    private final FlinkConfigManager configManager;
+    private final Clock clock;
+    private final KubernetesOperatorMetricGroup operatorMetricGroup;
+    private final boolean namespaceHistosEnabled;
+
+    private Map<String, Tuple2<Histogram, Map<String, Histogram>>> transitionMetrics;
+    private Map<ResourceLifecycleState, Tuple2<Histogram, Map<String, Histogram>>> stateTimeMetrics;
+
+    private Function<MetricGroup, MetricGroup> metricGroupFunction;
+
+    public LifecycleMetrics(
+            FlinkConfigManager configManager,
+            Clock clock,
+            KubernetesOperatorMetricGroup operatorMetricGroup) {
+        this.configManager = configManager;
+        this.clock = clock;
+        this.operatorMetricGroup = operatorMetricGroup;
+        this.namespaceHistosEnabled =
+                configManager
+                        .getDefaultConfig()
+                        .get(
+                                KubernetesOperatorMetricOptions
+                                        .OPERATOR_LIFECYCLE_NAMESPACE_HISTOGRAMS_ENABLED);
+    }
+
+    public void onUpdate(CR cr) {
+        getLifecycleMetricTracker(cr).onUpdate(cr.getStatus().getLifecycleState(), clock.instant());
+    }
+
+    public void onRemove(CR cr) {
+        lifecycleTrackers.remove(
+                Tuple2.of(cr.getMetadata().getNamespace(), cr.getMetadata().getName()));
+    }
+
+    private ResourceLifecycleMetricTracker getLifecycleMetricTracker(CR cr) {
+        init(cr);
+        createNamespaceStateCountIfMissing(cr.getMetadata().getNamespace());
+        return lifecycleTrackers.computeIfAbsent(
+                Tuple2.of(cr.getMetadata().getNamespace(), cr.getMetadata().getName()),
+                k -> {
+                    var initialState = cr.getStatus().getLifecycleState();
+                    var time =
+                            initialState == CREATED
+                                    ? Instant.parse(cr.getMetadata().getCreationTimestamp())
+                                    : clock.instant();
+                    return new ResourceLifecycleMetricTracker(
+                            initialState,
+                            time,
+                            getTransitionHistograms(cr),
+                            getStateTimeHistograms(cr));
+                });
+    }
+
+    private void createNamespaceStateCountIfMissing(String namespace) {
+        if (!namespaces.add(namespace)) {
+            return;
+        }
+
+        MetricGroup lifecycleGroup =
+                metricGroupFunction.apply(
+                        operatorMetricGroup.createResourceNamespaceGroup(
+                                configManager.getDefaultConfig(), namespace));
+        for (ResourceLifecycleState state : ResourceLifecycleState.values()) {
+            lifecycleGroup
+                    .addGroup("State")
+                    .addGroup(state.name())
+                    .gauge(
+                            "Count",
+                            () ->
+                                    lifecycleTrackers.values().stream()
+                                            .map(ResourceLifecycleMetricTracker::getCurrentState)
+                                            .filter(s -> s == state)
+                                            .count());
+        }
+    }
+
+    private synchronized void init(CR cr) {
+        if (transitionMetrics != null) {
+            return;
+        }
+        this.metricGroupFunction =
+                mg -> mg.addGroup(cr.getClass().getSimpleName()).addGroup("Lifecycle");
+
+        this.transitionMetrics = new ConcurrentHashMap<>();
+        TRACKED_TRANSITIONS.forEach(
+                t ->
+                        transitionMetrics.computeIfAbsent(
+                                t.metricName,
+                                name ->
+                                        Tuple2.of(
+                                                createTransitionHistogram(
+                                                        name, operatorMetricGroup),
+                                                new ConcurrentHashMap<>())));
+
+        this.stateTimeMetrics = new ConcurrentHashMap<>();
+        for (ResourceLifecycleState state : ResourceLifecycleState.values()) {
+            stateTimeMetrics.put(
+                    state,
+                    Tuple2.of(
+                            createStateTimeHistogram(state, operatorMetricGroup),
+                            new ConcurrentHashMap<>()));
+        }
+    }
+
+    private Map<String, List<Histogram>> getTransitionHistograms(CR cr) {
+        var histos = new HashMap<String, List<Histogram>>();
+        transitionMetrics.forEach(
+                (metricName, t) -> {
+                    histos.put(
+                            metricName,
+                            namespaceHistosEnabled
+                                    ? List.of(
+                                            t.f0,
+                                            t.f1.computeIfAbsent(
+                                                    cr.getMetadata().getNamespace(),
+                                                    ns ->
+                                                            createTransitionHistogram(
+                                                                    metricName,
+                                                                    operatorMetricGroup
+                                                                            .createResourceNamespaceGroup(
+                                                                                    configManager
+                                                                                            .getDefaultConfig(),
+                                                                                    ns))))
+                                    : List.of(t.f0));
+                });
+        return histos;
+    }
+
+    private Map<ResourceLifecycleState, List<Histogram>> getStateTimeHistograms(CR cr) {
+        var histos = new HashMap<ResourceLifecycleState, List<Histogram>>();
+        stateTimeMetrics.forEach(
+                (state, t) -> {
+                    histos.put(
+                            state,
+                            namespaceHistosEnabled
+                                    ? List.of(
+                                            t.f0,
+                                            t.f1.computeIfAbsent(
+                                                    cr.getMetadata().getNamespace(),
+                                                    ns ->
+                                                            createStateTimeHistogram(
+                                                                    state,
+                                                                    operatorMetricGroup
+                                                                            .createResourceNamespaceGroup(
+                                                                                    configManager
+                                                                                            .getDefaultConfig(),
+                                                                                    ns))))
+                                    : List.of(t.f0));
+                });
+        return histos;
+    }
+
+    private Histogram createTransitionHistogram(String metricName, MetricGroup group) {
+        return metricGroupFunction
+                .apply(group)
+                .addGroup("Transition")
+                .addGroup(metricName)
+                .histogram(
+                        "TimeSeconds",
+                        OperatorMetricUtils.createHistogram(
+                                configManager.getOperatorConfiguration()));
+    }
+
+    private Histogram createStateTimeHistogram(ResourceLifecycleState state, MetricGroup group) {
+        return metricGroupFunction
+                .apply(group)
+                .addGroup("State")
+                .addGroup(state.name())
+                .histogram(
+                        "TimeSeconds",
+                        OperatorMetricUtils.createHistogram(
+                                configManager.getOperatorConfiguration()));
+    }
+
+    private static List<Transition> getTrackedTransitions() {
+        return List.of(
+                new Transition(SUSPENDED, STABLE, true, TRANSITION_RESUME),
+                new Transition(STABLE, STABLE, true, TRANSITION_UPGRADE),
+                new Transition(DEPLOYED, UPGRADING, true, TRANSITION_SUSPEND),
+                new Transition(STABLE, UPGRADING, true, TRANSITION_SUSPEND),
+                new Transition(ROLLED_BACK, UPGRADING, true, TRANSITION_SUSPEND),
+                new Transition(DEPLOYED, SUSPENDED, true, TRANSITION_SUSPEND),
+                new Transition(STABLE, SUSPENDED, true, TRANSITION_SUSPEND),
+                new Transition(ROLLED_BACK, SUSPENDED, true, TRANSITION_SUSPEND),
+                new Transition(DEPLOYED, STABLE, false, TRANSITION_STABILIZATION),
+                new Transition(DEPLOYED, ROLLED_BACK, false, TRANSITION_ROLLBACK),
+                new Transition(UPGRADING, DEPLOYED, true, TRANSITION_SUBMISSION),
+                new Transition(ROLLING_BACK, ROLLED_BACK, true, TRANSITION_SUBMISSION));
+    }
+
+    @VisibleForTesting
+    protected Map<Tuple2<String, String>, ResourceLifecycleMetricTracker> getLifecycleTrackers() {
+        return lifecycleTrackers;
+    }
+
+    /**
+     * Pojo for encapsulating state transitions and whether we should measure time from the
+     * beginning of from or since the last update.
+     */
+    @ToString
+    @RequiredArgsConstructor
+    protected static class Transition {
+        public final ResourceLifecycleState from;
+        public final ResourceLifecycleState to;
+        public final boolean measureFromLastUpdate;
+        public final String metricName;
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricTracker.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricTracker.java
new file mode 100644
index 0000000..bcba1e2
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricTracker.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics.lifecycle;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.Histogram;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Lifecycle state transition tracker for a single resource. */
+public class ResourceLifecycleMetricTracker {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ResourceLifecycleMetricTracker.class);
+
+    private final Map<String, List<Histogram>> transitionHistos;
+    private final Map<ResourceLifecycleState, List<Histogram>> stateTimeHistos;
+
+    private final Map<ResourceLifecycleState, Tuple2<Instant, Instant>> stateTimeMap =
+            new HashMap<>();
+
+    private ResourceLifecycleState currentState;
+
+    public ResourceLifecycleMetricTracker(
+            ResourceLifecycleState initialState,
+            Instant time,
+            Map<String, List<Histogram>> transitionHistos,
+            Map<ResourceLifecycleState, List<Histogram>> stateTimeHistos) {
+        this.transitionHistos = transitionHistos;
+        this.currentState = initialState;
+        this.stateTimeHistos = stateTimeHistos;
+        updateCurrentState(currentState, time);
+    }
+
+    public void onUpdate(ResourceLifecycleState newState, Instant time) {
+        if (newState == currentState) {
+            updateCurrentState(newState, time);
+            return;
+        }
+
+        LifecycleMetrics.TRACKED_TRANSITIONS.stream()
+                .filter(t -> t.to == newState)
+                .forEach(
+                        transition -> {
+                            var fromTimes = stateTimeMap.get(transition.from);
+                            if (fromTimes != null) {
+                                var transitionTime =
+                                        Duration.between(
+                                                        transition.measureFromLastUpdate
+                                                                ? fromTimes.f1
+                                                                : fromTimes.f0,
+                                                        time)
+                                                .toSeconds();
+
+                                LOG.debug(
+                                        "Recording transition time {} for {} ({} -> {})",
+                                        transitionTime,
+                                        transition.metricName,
+                                        transition.from,
+                                        transition.to);
+                                transitionHistos
+                                        .get(transition.metricName)
+                                        .forEach(h -> h.update(transitionTime));
+                            }
+                        });
+
+        updateCurrentState(newState, time);
+    }
+
+    private void updateCurrentState(ResourceLifecycleState newState, Instant time) {
+        stateTimeMap.compute(
+                newState,
+                (s, t) -> {
+                    if (t == null) {
+                        return Tuple2.of(time, time);
+                    } else {
+                        if (newState != currentState) {
+                            t.f0 = time;
+                        }
+                        t.f1 = time;
+                        return t;
+                    }
+                });
+
+        if (newState == currentState) {
+            return;
+        }
+
+        var toClear = newState.getClearedStatesAfterTransition(currentState);
+        LOG.debug(
+                "Transitioned from {} to {}, clearing times for {}",
+                currentState,
+                newState,
+                toClear);
+
+        toClear.forEach(
+                state -> {
+                    var times = stateTimeMap.remove(state);
+                    if (times != null) {
+                        var totalSeconds = Duration.between(times.f0, times.f1).toSeconds();
+                        stateTimeHistos.get(state).forEach(h -> h.update(totalSeconds));
+                    }
+                });
+        currentState = newState;
+    }
+
+    public ResourceLifecycleState getCurrentState() {
+        return currentState;
+    }
+
+    @VisibleForTesting
+    protected Map<String, List<Histogram>> getTransitionHistos() {
+        return transitionHistos;
+    }
+
+    @VisibleForTesting
+    protected Map<ResourceLifecycleState, List<Histogram>> getStateTimeHistos() {
+        return stateTimeHistos;
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleState.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleState.java
new file mode 100644
index 0000000..19c0da3
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleState.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics.lifecycle;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
+
+/** Enum encapsulating the lifecycle state of a Flink resource. */
+public enum ResourceLifecycleState {
+    CREATED(false),
+    SUSPENDED(true),
+    UPGRADING(false),
+    DEPLOYED(false),
+    STABLE(true),
+    ROLLING_BACK(false),
+    ROLLED_BACK(true),
+    FAILED(true);
+
+    private final boolean terminal;
+
+    ResourceLifecycleState(boolean terminal) {
+        this.terminal = terminal;
+    }
+
+    public Set<ResourceLifecycleState> getClearedStatesAfterTransition(
+            ResourceLifecycleState transitionFrom) {
+        if (this == transitionFrom) {
+            return Collections.emptySet();
+        }
+        var states = EnumSet.allOf(ResourceLifecycleState.class);
+        if (terminal) {
+            states.remove(this);
+            return states;
+        }
+
+        if (this == UPGRADING) {
+            states.remove(UPGRADING);
+            states.remove(transitionFrom);
+            return states;
+        }
+
+        return Collections.emptySet();
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index bdefa4b..ebe8abc 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
@@ -443,7 +444,8 @@ public class TestUtils {
     public static <T extends AbstractFlinkResource<?, ?>> MetricManager<T> createTestMetricManager(
             MetricRegistry metricRegistry, Configuration conf) {
 
-        return new MetricManager<>(createTestMetricGroup(metricRegistry, conf), conf);
+        var confManager = new FlinkConfigManager(conf);
+        return new MetricManager<>(createTestMetricGroup(metricRegistry, conf), confManager);
     }
 
     public static KubernetesOperatorMetricGroup createTestMetricGroup(
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java
index dba0a82..fc49c40 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.metrics;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
 import org.apache.flink.metrics.Counter;
@@ -68,7 +69,7 @@ public class OperatorJosdkMetricsTest {
         operatorMetrics =
                 new OperatorJosdkMetrics(
                         TestUtils.createTestMetricGroup(registry, new Configuration()),
-                        new Configuration());
+                        new FlinkConfigManager(new Configuration()));
     }
 
     @Test
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
new file mode 100644
index 0000000..2e76430
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics.lifecycle;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions;
+import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.metrics.Histogram;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.LongStream;
+
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.CREATED;
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.DEPLOYED;
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.FAILED;
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.ROLLED_BACK;
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.ROLLING_BACK;
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.STABLE;
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.SUSPENDED;
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.UPGRADING;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/** Test for resource lifecycle metrics. */
+public class ResourceLifecycleMetricsTest {
+
+    @Test
+    public void lifecycleStateTest() {
+        var application = TestUtils.buildApplicationCluster();
+        assertEquals(CREATED, application.getStatus().getLifecycleState());
+
+        ReconciliationUtils.updateStatusBeforeDeploymentAttempt(application, new Configuration());
+        assertEquals(UPGRADING, application.getStatus().getLifecycleState());
+
+        ReconciliationUtils.updateStatusForDeployedSpec(application, new Configuration());
+        assertEquals(DEPLOYED, application.getStatus().getLifecycleState());
+
+        application.getStatus().getReconciliationStatus().markReconciledSpecAsStable();
+        assertEquals(STABLE, application.getStatus().getLifecycleState());
+
+        application.getStatus().setError("errr");
+        assertEquals(STABLE, application.getStatus().getLifecycleState());
+
+        application.getStatus().getJobStatus().setState(JobStatus.FAILED.name());
+        assertEquals(FAILED, application.getStatus().getLifecycleState());
+
+        application.getStatus().setError("");
+
+        application
+                .getStatus()
+                .getReconciliationStatus()
+                .setState(ReconciliationState.ROLLING_BACK);
+        assertEquals(ROLLING_BACK, application.getStatus().getLifecycleState());
+
+        application.getStatus().getJobStatus().setState(JobStatus.RECONCILING.name());
+        application.getStatus().getReconciliationStatus().setState(ReconciliationState.ROLLED_BACK);
+        assertEquals(ROLLED_BACK, application.getStatus().getLifecycleState());
+
+        application.getStatus().getJobStatus().setState(JobStatus.FAILED.name());
+        assertEquals(FAILED, application.getStatus().getLifecycleState());
+
+        application.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
+        application.getSpec().getJob().setState(JobState.SUSPENDED);
+        ReconciliationUtils.updateStatusForDeployedSpec(application, new Configuration());
+        assertEquals(SUSPENDED, application.getStatus().getLifecycleState());
+    }
+
+    @Test
+    public void testLifecycleTracker() {
+        var transitionHistos = initTransitionHistos();
+        var timeHistos = initTimeHistos();
+
+        var lifecycleTracker =
+                new ResourceLifecycleMetricTracker(
+                        CREATED, Instant.ofEpochMilli(0), transitionHistos, timeHistos);
+
+        long ts = 1000;
+        lifecycleTracker.onUpdate(CREATED, Instant.ofEpochMilli(ts));
+        lifecycleTracker.onUpdate(UPGRADING, Instant.ofEpochMilli(ts += 1000));
+        lifecycleTracker.onUpdate(UPGRADING, Instant.ofEpochMilli(ts += 1000));
+        lifecycleTracker.onUpdate(DEPLOYED, Instant.ofEpochMilli(ts += 1000));
+        lifecycleTracker.onUpdate(DEPLOYED, Instant.ofEpochMilli(ts += 1000));
+        lifecycleTracker.onUpdate(STABLE, Instant.ofEpochMilli(ts += 1000));
+
+        lifecycleTracker.onUpdate(UPGRADING, Instant.ofEpochMilli(ts += 1000));
+        lifecycleTracker.onUpdate(UPGRADING, Instant.ofEpochMilli(ts += 1000));
+        lifecycleTracker.onUpdate(DEPLOYED, Instant.ofEpochMilli(ts += 1000));
+        lifecycleTracker.onUpdate(ROLLING_BACK, Instant.ofEpochMilli(ts += 1000));
+        lifecycleTracker.onUpdate(ROLLED_BACK, Instant.ofEpochMilli(ts += 1000));
+
+        lifecycleTracker.onUpdate(SUSPENDED, Instant.ofEpochMilli(ts += 1000));
+        lifecycleTracker.onUpdate(UPGRADING, Instant.ofEpochMilli(ts += 1000));
+        lifecycleTracker.onUpdate(DEPLOYED, Instant.ofEpochMilli(ts += 1000));
+        lifecycleTracker.onUpdate(DEPLOYED, Instant.ofEpochMilli(ts += 1000));
+        lifecycleTracker.onUpdate(STABLE, Instant.ofEpochMilli(ts += 1000));
+        lifecycleTracker.onUpdate(UPGRADING, Instant.ofEpochMilli(ts += 1000));
+        lifecycleTracker.onUpdate(UPGRADING, Instant.ofEpochMilli(ts += 1000));
+        lifecycleTracker.onUpdate(DEPLOYED, Instant.ofEpochMilli(ts += 1000));
+        lifecycleTracker.onUpdate(DEPLOYED, Instant.ofEpochMilli(ts += 1000));
+        lifecycleTracker.onUpdate(STABLE, Instant.ofEpochMilli(ts += 1000));
+
+        validateTransition(transitionHistos, "Resume", 1, 4);
+        validateTransition(transitionHistos, "Upgrade", 1, 5);
+        validateTransition(transitionHistos, "Suspend", 3, 1);
+        validateTransition(transitionHistos, "Stabilization", 3, 2);
+        validateTransition(transitionHistos, "Rollback", 1, 2);
+        validateTransition(transitionHistos, "Submission", 5, 1);
+
+        validateTime(timeHistos, CREATED, 1, 1);
+        validateTime(timeHistos, UPGRADING, 4, 3);
+    }
+
+    @Test
+    public void testLifecycleMetricsConfig() {
+        var dep1 = TestUtils.buildApplicationCluster();
+        dep1.getMetadata().setNamespace("ns1");
+        dep1.getMetadata().setName("n1");
+        var dep2 = TestUtils.buildApplicationCluster();
+        dep2.getMetadata().setNamespace("ns1");
+        dep2.getMetadata().setName("n2");
+        var dep3 = TestUtils.buildApplicationCluster();
+        dep3.getMetadata().setNamespace("ns2");
+        dep3.getMetadata().setName("n3");
+
+        var conf = new Configuration();
+        var metricManager = TestUtils.createTestMetricManager(conf);
+        var lifeCycleMetrics = metricManager.getLifeCycleMetrics();
+
+        metricManager.onUpdate(dep1);
+        metricManager.onUpdate(dep2);
+        metricManager.onUpdate(dep3);
+
+        var trackers = lifeCycleMetrics.getLifecycleTrackers();
+        var tracker1 = trackers.get(Tuple2.of("ns1", "n1"));
+        var tracker2 = trackers.get(Tuple2.of("ns1", "n2"));
+        var tracker3 = trackers.get(Tuple2.of("ns2", "n3"));
+
+        assertEquals(tracker1.getStateTimeHistos(), tracker2.getStateTimeHistos());
+        assertEquals(tracker1.getTransitionHistos(), tracker2.getTransitionHistos());
+        assertNotEquals(tracker1.getStateTimeHistos(), tracker3.getStateTimeHistos());
+        assertNotEquals(tracker1.getTransitionHistos(), tracker3.getTransitionHistos());
+        tracker1.getStateTimeHistos()
+                .forEach(
+                        (k, l) -> {
+                            assertEquals(2, l.size());
+                            assertEquals(l.get(0), tracker3.getStateTimeHistos().get(k).get(0));
+                        });
+
+        tracker1.getTransitionHistos()
+                .forEach(
+                        (k, l) -> {
+                            assertEquals(2, l.size());
+                            assertEquals(l.get(0), tracker3.getTransitionHistos().get(k).get(0));
+                        });
+
+        conf.set(
+                KubernetesOperatorMetricOptions.OPERATOR_LIFECYCLE_NAMESPACE_HISTOGRAMS_ENABLED,
+                false);
+        metricManager = TestUtils.createTestMetricManager(conf);
+        lifeCycleMetrics = metricManager.getLifeCycleMetrics();
+
+        metricManager.onUpdate(dep1);
+        metricManager.onUpdate(dep2);
+        metricManager.onUpdate(dep3);
+
+        trackers = lifeCycleMetrics.getLifecycleTrackers();
+        assertEquals(
+                trackers.get(Tuple2.of("ns1", "n1")).getStateTimeHistos(),
+                trackers.get(Tuple2.of("ns2", "n3")).getStateTimeHistos());
+        assertEquals(
+                trackers.get(Tuple2.of("ns1", "n1")).getTransitionHistos(),
+                trackers.get(Tuple2.of("ns2", "n3")).getTransitionHistos());
+        trackers.get(Tuple2.of("ns1", "n1"))
+                .getStateTimeHistos()
+                .forEach(
+                        (k, l) -> {
+                            assertEquals(1, l.size());
+                        });
+
+        trackers.get(Tuple2.of("ns1", "n1"))
+                .getTransitionHistos()
+                .forEach(
+                        (k, l) -> {
+                            assertEquals(1, l.size());
+                        });
+
+        conf.set(KubernetesOperatorMetricOptions.OPERATOR_LIFECYCLE_METRICS_ENABLED, false);
+        metricManager = TestUtils.createTestMetricManager(conf);
+        assertNull(metricManager.getLifeCycleMetrics());
+
+        metricManager.onUpdate(dep1);
+        metricManager.onUpdate(dep2);
+        metricManager.onUpdate(dep3);
+    }
+
+    private void validateTransition(
+            Map<String, List<Histogram>> histos, String name, int size, long mean) {
+        histos.get(name)
+                .forEach(
+                        h -> {
+                            var stat = h.getStatistics();
+                            assertEquals(size, stat.size());
+                            assertEquals(mean, stat.getMean());
+                        });
+    }
+
+    private void validateTime(
+            Map<ResourceLifecycleState, List<Histogram>> histos,
+            ResourceLifecycleState state,
+            int size,
+            long sum) {
+        histos.get(state)
+                .forEach(
+                        h -> {
+                            var stat = h.getStatistics();
+                            assertEquals(size, stat.size());
+                            assertEquals(sum, LongStream.of(stat.getValues()).sum());
+                        });
+    }
+
+    private Map<String, List<Histogram>> initTransitionHistos() {
+        var histos = new ConcurrentHashMap<String, List<Histogram>>();
+        LifecycleMetrics.TRACKED_TRANSITIONS.forEach(
+                t ->
+                        histos.computeIfAbsent(
+                                t.metricName,
+                                name ->
+                                        List.of(
+                                                OperatorMetricUtils.createHistogram(
+                                                        FlinkOperatorConfiguration
+                                                                .fromConfiguration(
+                                                                        new Configuration())))));
+        return histos;
+    }
+
+    private Map<ResourceLifecycleState, List<Histogram>> initTimeHistos() {
+        var histos = new ConcurrentHashMap<ResourceLifecycleState, List<Histogram>>();
+        for (ResourceLifecycleState state : ResourceLifecycleState.values()) {
+            histos.put(
+                    state,
+                    List.of(
+                            OperatorMetricUtils.createHistogram(
+                                    FlinkOperatorConfiguration.fromConfiguration(
+                                            new Configuration()))));
+        }
+        return histos;
+    }
+}