You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/07/13 11:50:45 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-28479] Add metrics for resource lifecycle state transitions
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 924b4f0 [FLINK-28479] Add metrics for resource lifecycle state transitions
924b4f0 is described below
commit 924b4f0fb04eb6ca26dcedf83c1a852fc81807a5
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Wed Jul 13 13:12:43 2022 +0200
[FLINK-28479] Add metrics for resource lifecycle state transitions
---
docs/content/docs/operations/metrics-logging.md | 38 ++-
.../kubernetes_operator_metric_configuration.html | 12 +
.../flink/kubernetes/operator/FlinkOperator.java | 11 +-
.../operator/config/FlinkConfigManager.java | 2 +-
.../operator/crd/status/CommonStatus.java | 40 +++
.../operator/metrics/KubernetesClientMetrics.java | 42 ++--
.../metrics/KubernetesOperatorMetricOptions.java | 14 ++
.../kubernetes/operator/metrics/MetricManager.java | 42 +++-
.../operator/metrics/OperatorJosdkMetrics.java | 26 +-
.../operator/metrics/OperatorMetricUtils.java | 121 +++++++++
.../metrics/lifecycle/LifecycleMetrics.java | 278 +++++++++++++++++++++
.../lifecycle/ResourceLifecycleMetricTracker.java | 142 +++++++++++
.../metrics/lifecycle/ResourceLifecycleState.java | 60 +++++
.../flink/kubernetes/operator/TestUtils.java | 4 +-
.../operator/metrics/OperatorJosdkMetricsTest.java | 3 +-
.../lifecycle/ResourceLifecycleMetricsTest.java | 276 ++++++++++++++++++++
16 files changed, 1061 insertions(+), 50 deletions(-)
diff --git a/docs/content/docs/operations/metrics-logging.md b/docs/content/docs/operations/metrics-logging.md
index 4769381..5f01982 100644
--- a/docs/content/docs/operations/metrics-logging.md
+++ b/docs/content/docs/operations/metrics-logging.md
@@ -28,14 +28,40 @@ under the License.
The Flink Kubernetes Operator (Operator) extends the [Flink Metric System](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/) that allows gathering and exposing metrics to centralized monitoring solutions.
-## Deployment Metrics
+## Flink Resource Metrics
The Operator gathers aggregates metrics about managed resources.
-| Scope | Metrics | Description | Type |
-|-----------|---------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------|-------|
-| Namespace | FlinkDeployment.Count | Number of managed FlinkDeployment instances per namespace | Gauge |
-| Namespace | FlinkDeployment.JmDeploymentStatus.<Status>.Count | Number of managed FlinkDeployment resources per <Status> per namespace. <Status> can take values from: READY, DEPLOYED_NOT_READY, DEPLOYING, MISSING, ERROR | Gauge |
-| Namespace | FlinkSessionJob.Count | Number of managed FlinkSessionJob instances per namespace | Gauge |
+| Scope | Metrics | Description | Type |
+|------------------|-------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------|
+| Namespace | FlinkDeployment/FlinkSessionJob.Count | Number of managed FlinkDeployment/SessionJob instances per namespace | Gauge |
+| Namespace | FlinkDeployment.JmDeploymentStatus.<Status>.Count | Number of managed FlinkDeployment resources per <Status> per namespace. <Status> can take values from: READY, DEPLOYED_NOT_READY, DEPLOYING, MISSING, ERROR | Gauge |
+| Namespace | FlinkDeployment/FlinkSessionJob.Lifecycle.State.<State>.Count | Number of managed resources currently in state <State> per namespace. <State> can take values from: CREATED, SUSPENDED, UPGRADING, DEPLOYED, STABLE, ROLLING_BACK, ROLLED_BACK, FAILED | Gauge |
+| System/Namespace | FlinkDeployment/FlinkSessionJob.Lifecycle.State.<State>.TimeSeconds | Time spent in state <State> for a given resource. <State> can take values from: CREATED, SUSPENDED, UPGRADING, DEPLOYED, STABLE, ROLLING_BACK, ROLLED_BACK, FAILED | Histogram |
+| System/Namespace | FlinkDeployment/FlinkSessionJob.Lifecycle.Transition.<Transition>.TimeSeconds | Time statistics for selected lifecycle state transitions. <Transition> can take values from: Resume, Upgrade, Suspend, Stabilization, Rollback, Submission | Histogram |
+
+### Lifecycle metrics
+
+Based on the resource status the operator tracks the following resource lifecycle states:
+
+ - CREATED : The resource was created in Kubernetes but not yet handled by the operator
+ - SUSPENDED : The (job) resource has been suspended
+ - UPGRADING : The resource is suspended before upgrading to a new spec
+ - DEPLOYED : The resource is deployed/submitted to Kubernetes, but it's not yet considered to be stable and might be rolled back in the future
+ - STABLE : The resource deployment is considered to be stable and won't be rolled back
+ - ROLLING_BACK : The resource is being rolled back to the last stable spec
+ - ROLLED_BACK : The resource is deployed with the last stable spec
+ - FAILED : The job terminally failed
+
+The number of resources and time spend in each of these states at any given time is tracked by the `Lifecycle.<STATE>.Count` and `Lifecycle.<STATE>.TimeSeconds` metrics.
+
+In addition to the simple counts we further track a few selected state transitions:
+
+ - Upgrade : End-to-end resource upgrade time from stable to stable
+ - Resume : Time from suspended to stable
+ - Suspend : Time for any suspend operation
+ - Stabilization : Time from deployed to stable state
+ - Rollback : Time from deployed to rolled_back state if the resource was rolled back
+ - Submission: Flink resource submission time
## Kubernetes Client Metrics
diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
index 22fdd1d..9e934ea 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
@@ -26,6 +26,18 @@
<td>Integer</td>
<td>Defines the number of measured samples when calculating statistics.</td>
</tr>
+ <tr>
+ <td><h5>kubernetes.operator.resource.lifecycle.metrics.enabled</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Enable resource lifecycle state metrics. This enables both state and transition counts/histograms.</td>
+ </tr>
+ <tr>
+ <td><h5>kubernetes.operator.resource.lifecycle.namespace.histograms.enabled</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>In addition to the system level histograms, enable per namespace tracking of state and transition times.</td>
+ </tr>
<tr>
<td><h5>metrics.scope.k8soperator.resource</h5></td>
<td style="word-wrap: break-word;">"<host>.k8soperator.<namespace>.<name>.resource.<resourcens>.<resourcename>"</td>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 996e67b..c0bc279 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -113,17 +113,14 @@ public class FlinkOperator {
overrider.withConcurrentReconciliationThreads(parallelism);
}
if (configManager.getOperatorConfiguration().isJosdkMetricsEnabled()) {
- overrider.withMetrics(
- new OperatorJosdkMetrics(metricGroup, configManager.getDefaultConfig()));
+ overrider.withMetrics(new OperatorJosdkMetrics(metricGroup, configManager));
}
}
private void registerDeploymentController() {
var statusRecorder =
StatusRecorder.<FlinkDeploymentStatus>create(
- client,
- new MetricManager<>(metricGroup, configManager.getDefaultConfig()),
- listeners);
+ client, new MetricManager<>(metricGroup, configManager), listeners);
var eventRecorder = EventRecorder.create(client, listeners);
var reconcilerFactory =
new ReconcilerFactory(
@@ -146,9 +143,7 @@ public class FlinkOperator {
var eventRecorder = EventRecorder.create(client, listeners);
var statusRecorder =
StatusRecorder.<FlinkSessionJobStatus>create(
- client,
- new MetricManager<>(metricGroup, configManager.getDefaultConfig()),
- listeners);
+ client, new MetricManager<>(metricGroup, configManager), listeners);
var reconciler =
new SessionJobReconciler(
client, flinkService, configManager, eventRecorder, statusRecorder);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
index c70dbae..0340891 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
@@ -113,7 +113,7 @@ public class FlinkConfigManager {
}
public Configuration getDefaultConfig() {
- return defaultConfig;
+ return defaultConfig.clone();
}
@VisibleForTesting
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/CommonStatus.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/CommonStatus.java
index 7611b0e..76e0e02 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/CommonStatus.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/CommonStatus.java
@@ -19,7 +19,10 @@ package org.apache.flink.kubernetes.operator.crd.status;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -45,4 +48,41 @@ public abstract class CommonStatus<SPEC extends AbstractFlinkSpec> {
* @return Current {@link ReconciliationStatus}.
*/
public abstract ReconciliationStatus<SPEC> getReconciliationStatus();
+
+ @JsonIgnore
+ public ResourceLifecycleState getLifecycleState() {
+ var reconciliationStatus = getReconciliationStatus();
+
+ if (reconciliationStatus.isFirstDeployment()) {
+ return ResourceLifecycleState.CREATED;
+ }
+
+ switch (reconciliationStatus.getState()) {
+ case UPGRADING:
+ return ResourceLifecycleState.UPGRADING;
+ case ROLLING_BACK:
+ return ResourceLifecycleState.ROLLING_BACK;
+ }
+
+ var lastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec();
+ if (lastReconciledSpec.getJob() != null
+ && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED) {
+ return ResourceLifecycleState.SUSPENDED;
+ }
+
+ var jobState = getJobStatus().getState();
+ if (jobState != null
+ && org.apache.flink.api.common.JobStatus.valueOf(jobState)
+ == org.apache.flink.api.common.JobStatus.FAILED) {
+ return ResourceLifecycleState.FAILED;
+ }
+
+ if (reconciliationStatus.getState() == ReconciliationState.ROLLED_BACK) {
+ return ResourceLifecycleState.ROLLED_BACK;
+ } else if (reconciliationStatus.isLastReconciledSpecStable()) {
+ return ResourceLifecycleState.STABLE;
+ }
+
+ return ResourceLifecycleState.DEPLOYED;
+ }
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetrics.java
index 1a9733d..fe58226 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetrics.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetrics.java
@@ -19,11 +19,11 @@
package org.apache.flink.kubernetes.operator.metrics;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils.SynchronizedMeterView;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
import okhttp3.Interceptor;
import okhttp3.Request;
@@ -55,9 +55,9 @@ public class KubernetesClientMetrics implements Interceptor {
private final Counter failedRequestCounter;
private final Counter responseCounter;
- private final MeterView requestRateMeter;
- private final MeterView requestFailedRateMeter;
- private final MeterView responseRateMeter;
+ private final SynchronizedMeterView requestRateMeter;
+ private final SynchronizedMeterView requestFailedRateMeter;
+ private final SynchronizedMeterView responseRateMeter;
private final Map<Integer, Counter> responseCodeCounters = new ConcurrentHashMap<>();
private final Map<String, Counter> requestMethodCounter = new ConcurrentHashMap<>();
@@ -70,20 +70,26 @@ public class KubernetesClientMetrics implements Interceptor {
this.failedRequestMetricGroup = requestMetricGroup.addGroup(HTTP_REQUEST_FAILED_GROUP);
this.responseMetricGroup = metricGroup.addGroup(HTTP_RESPONSE_GROUP);
- this.requestCounter = requestMetricGroup.counter(COUNTER);
- this.failedRequestCounter = failedRequestMetricGroup.counter(COUNTER);
- this.responseCounter = responseMetricGroup.counter(COUNTER);
+ this.requestCounter =
+ OperatorMetricUtils.synchronizedCounter(requestMetricGroup.counter(COUNTER));
+ this.failedRequestCounter =
+ OperatorMetricUtils.synchronizedCounter(failedRequestMetricGroup.counter(COUNTER));
+ this.responseCounter =
+ OperatorMetricUtils.synchronizedCounter(responseMetricGroup.counter(COUNTER));
- this.requestRateMeter = requestMetricGroup.meter(METER, new MeterView(requestCounter));
+ this.requestRateMeter =
+ OperatorMetricUtils.synchronizedMeterView(
+ requestMetricGroup.meter(METER, new MeterView(requestCounter)));
this.requestFailedRateMeter =
- failedRequestMetricGroup.meter(METER, new MeterView(failedRequestCounter));
- this.responseRateMeter = responseMetricGroup.meter(METER, new MeterView(responseCounter));
+ OperatorMetricUtils.synchronizedMeterView(
+ failedRequestMetricGroup.meter(METER, new MeterView(failedRequestCounter)));
+ this.responseRateMeter =
+ OperatorMetricUtils.synchronizedMeterView(
+ responseMetricGroup.meter(METER, new MeterView(responseCounter)));
this.responseLatency =
responseMetricGroup.histogram(
- HISTO,
- new DescriptiveStatisticsHistogram(
- flinkOperatorConfiguration.getMetricsHistogramSampleSize()));
+ HISTO, OperatorMetricUtils.createHistogram(flinkOperatorConfiguration));
Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(this::updateMeters, 0, 1, TimeUnit.SECONDS);
@@ -121,12 +127,18 @@ public class KubernetesClientMetrics implements Interceptor {
private Counter getCounterByRequestMethod(String method) {
return requestMethodCounter.computeIfAbsent(
- method, key -> requestMetricGroup.addGroup(key).counter(COUNTER));
+ method,
+ key ->
+ OperatorMetricUtils.synchronizedCounter(
+ requestMetricGroup.addGroup(key).counter(COUNTER)));
}
private Counter getCounterByResponseCode(int code) {
return responseCodeCounters.computeIfAbsent(
- code, key -> responseMetricGroup.addGroup(key).counter(COUNTER));
+ code,
+ key ->
+ OperatorMetricUtils.synchronizedCounter(
+ responseMetricGroup.addGroup(key).counter(COUNTER)));
}
private void updateMeters() {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
index ff6bed7..c8628b3 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
@@ -37,6 +37,20 @@ public class KubernetesOperatorMetricOptions {
.withDescription(
"Enable KubernetesClient metrics for measuring the HTTP traffic to the Kubernetes API Server.");
+ public static final ConfigOption<Boolean> OPERATOR_LIFECYCLE_METRICS_ENABLED =
+ ConfigOptions.key("kubernetes.operator.resource.lifecycle.metrics.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Enable resource lifecycle state metrics. This enables both state and transition counts/histograms.");
+
+ public static final ConfigOption<Boolean> OPERATOR_LIFECYCLE_NAMESPACE_HISTOGRAMS_ENABLED =
+ ConfigOptions.key("kubernetes.operator.resource.lifecycle.namespace.histograms.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "In addition to the system level histograms, enable per namespace tracking of state and transition times.");
+
public static final ConfigOption<Integer> OPERATOR_METRICS_HISTOGRAM_SAMPLE_SIZE =
ConfigOptions.key("kubernetes.operator.metrics.histogram.sample.size")
.intType()
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
index 18b96b4..307019a 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
@@ -17,32 +17,52 @@
package org.apache.flink.kubernetes.operator.metrics;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.metrics.lifecycle.LifecycleMetrics;
-import io.fabric8.kubernetes.client.CustomResource;
-
+import java.time.Clock;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** Metric manager for Operator managed custom resources. */
-public class MetricManager<CR extends CustomResource<?, ?>> {
+public class MetricManager<CR extends AbstractFlinkResource<?, ?>> {
private final KubernetesOperatorMetricGroup opMetricGroup;
- private final Configuration conf;
+ private final FlinkConfigManager configManager;
private final Map<String, CustomResourceMetrics> metrics = new ConcurrentHashMap<>();
- public MetricManager(KubernetesOperatorMetricGroup opMetricGroup, Configuration conf) {
+ private final LifecycleMetrics<CR> lifeCycleMetrics;
+
+ public MetricManager(
+ KubernetesOperatorMetricGroup opMetricGroup, FlinkConfigManager configManager) {
this.opMetricGroup = opMetricGroup;
- this.conf = conf;
+ this.configManager = configManager;
+
+ if (configManager
+ .getDefaultConfig()
+ .get(KubernetesOperatorMetricOptions.OPERATOR_LIFECYCLE_METRICS_ENABLED)) {
+ this.lifeCycleMetrics =
+ new LifecycleMetrics<>(configManager, Clock.systemDefaultZone(), opMetricGroup);
+ } else {
+ this.lifeCycleMetrics = null;
+ }
}
public void onUpdate(CR cr) {
getCustomResourceMetrics(cr).onUpdate(cr);
+ if (lifeCycleMetrics != null) {
+ lifeCycleMetrics.onUpdate(cr);
+ }
}
public void onRemove(CR cr) {
getCustomResourceMetrics(cr).onRemove(cr);
+ if (lifeCycleMetrics != null) {
+ lifeCycleMetrics.onRemove(cr);
+ }
}
private CustomResourceMetrics getCustomResourceMetrics(CR cr) {
@@ -52,7 +72,8 @@ public class MetricManager<CR extends CustomResource<?, ?>> {
private CustomResourceMetrics getCustomResourceMetricsImpl(CR cr) {
var namespaceMg =
- opMetricGroup.createResourceNamespaceGroup(conf, cr.getMetadata().getNamespace());
+ opMetricGroup.createResourceNamespaceGroup(
+ configManager.getDefaultConfig(), cr.getMetadata().getNamespace());
if (cr instanceof FlinkDeployment) {
return new FlinkDeploymentMetrics(namespaceMg);
} else if (cr instanceof FlinkSessionJob) {
@@ -61,4 +82,9 @@ public class MetricManager<CR extends CustomResource<?, ?>> {
throw new IllegalArgumentException("Unknown CustomResource");
}
}
+
+ @VisibleForTesting
+ public LifecycleMetrics<CR> getLifeCycleMetrics() {
+ return lifeCycleMetrics;
+ }
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
index 538f01b..15a14ca 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
@@ -18,13 +18,12 @@
package org.apache.flink.kubernetes.operator.metrics;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;
@@ -51,10 +50,9 @@ public class OperatorJosdkMetrics implements Metrics {
private static final String RECONCILIATION = "Reconciliation";
private static final String RESOURCE = "Resource";
private static final String EVENT = "Event";
- private static final int WINDOW_SIZE = 1000;
private final KubernetesOperatorMetricGroup operatorMetricGroup;
- private final Configuration conf;
+ private final FlinkConfigManager configManager;
private final Clock clock;
private final Map<ResourceID, KubernetesResourceNamespaceMetricGroup> resourceNsMetricGroups =
@@ -73,9 +71,9 @@ public class OperatorJosdkMetrics implements Metrics {
"FlinkSessionJob");
public OperatorJosdkMetrics(
- KubernetesOperatorMetricGroup operatorMetricGroup, Configuration conf) {
+ KubernetesOperatorMetricGroup operatorMetricGroup, FlinkConfigManager configManager) {
this.operatorMetricGroup = operatorMetricGroup;
- this.conf = conf;
+ this.configManager = configManager;
this.clock = SystemClock.getInstance();
}
@@ -87,6 +85,7 @@ public class OperatorJosdkMetrics implements Metrics {
histogram(execution, execution.successTypeName(result)).update(toSeconds(startTime));
return result;
} catch (Exception e) {
+ var h = histogram(execution, "failed");
histogram(execution, "failed").update(toSeconds(startTime));
throw e;
}
@@ -159,7 +158,9 @@ public class OperatorJosdkMetrics implements Metrics {
String.join(".", group.getScopeComponents()),
s ->
finalGroup.histogram(
- "TimeSeconds", new DescriptiveStatisticsHistogram(WINDOW_SIZE)));
+ "TimeSeconds",
+ OperatorMetricUtils.createHistogram(
+ configManager.getOperatorConfiguration())));
}
private long toSeconds(long startTime) {
@@ -177,7 +178,8 @@ public class OperatorJosdkMetrics implements Metrics {
}
var finalGroup = group;
return counters.computeIfAbsent(
- String.join(".", group.getScopeComponents()), s -> finalGroup.counter("Count"));
+ String.join(".", group.getScopeComponents()),
+ s -> OperatorMetricUtils.synchronizedCounter(finalGroup.counter("Count")));
}
private KubernetesResourceNamespaceMetricGroup getResourceNsMg(ResourceID resourceID) {
@@ -185,12 +187,16 @@ public class OperatorJosdkMetrics implements Metrics {
resourceID,
rid ->
operatorMetricGroup.createResourceNamespaceGroup(
- conf, rid.getNamespace().orElse("default")));
+ configManager.getDefaultConfig(),
+ rid.getNamespace().orElse("default")));
}
private KubernetesResourceMetricGroup getResourceMg(ResourceID resourceID) {
return resourceMetricGroups.computeIfAbsent(
resourceID,
- rid -> getResourceNsMg(rid).createResourceNamespaceGroup(conf, rid.getName()));
+ rid ->
+ getResourceNsMg(rid)
+ .createResourceNamespaceGroup(
+ configManager.getDefaultConfig(), rid.getName()));
}
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
index 6040d39..0ff3f2b 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
@@ -21,8 +21,16 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.utils.EnvUtils;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
@@ -85,4 +93,117 @@ public class OperatorMetricUtils {
MetricRegistryConfiguration.fromConfiguration(configuration, Long.MAX_VALUE),
ReporterSetup.fromConfiguration(configuration, pluginManager));
}
+
+ public static Histogram synchronizedHistogram(Histogram histogram) {
+ return new SynchronizedHistogram(histogram);
+ }
+
+ public static Counter synchronizedCounter(Counter counter) {
+ return new SynchronizedCounter(counter);
+ }
+
+ public static SynchronizedMeterView synchronizedMeterView(MeterView meterView) {
+ return new SynchronizedMeterView(meterView);
+ }
+
+ public static Histogram createHistogram(FlinkOperatorConfiguration operatorConfiguration) {
+ return synchronizedHistogram(
+ new DescriptiveStatisticsHistogram(
+ operatorConfiguration.getMetricsHistogramSampleSize()));
+ }
+
+ /** Thread safe {@link Histogram} wrapper. */
+ public static class SynchronizedHistogram implements Histogram {
+
+ private final Histogram delegate;
+
+ public SynchronizedHistogram(Histogram delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public synchronized void update(long l) {
+ delegate.update(l);
+ }
+
+ @Override
+ public synchronized long getCount() {
+ return delegate.getCount();
+ }
+
+ @Override
+ public synchronized HistogramStatistics getStatistics() {
+ return delegate.getStatistics();
+ }
+ }
+
+ /** Thread safe {@link Counter} wrapper. */
+ public static class SynchronizedCounter implements Counter {
+
+ private final Counter delegate;
+
+ public SynchronizedCounter(Counter delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public synchronized void inc() {
+ delegate.inc();
+ }
+
+ @Override
+ public synchronized void inc(long l) {
+ delegate.inc(l);
+ }
+
+ @Override
+ public synchronized void dec() {
+ delegate.dec();
+ }
+
+ @Override
+ public synchronized void dec(long l) {
+ delegate.dec(l);
+ }
+
+ @Override
+ public synchronized long getCount() {
+ return delegate.getCount();
+ }
+ }
+
+ /** Thread safe {@link MeterView} wrapper. */
+ public static class SynchronizedMeterView implements Meter, View {
+
+ private final MeterView delegate;
+
+ public SynchronizedMeterView(MeterView delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public synchronized void markEvent() {
+ delegate.markEvent();
+ }
+
+ @Override
+ public synchronized void markEvent(long l) {
+ delegate.markEvent(l);
+ }
+
+ @Override
+ public synchronized double getRate() {
+ return delegate.getRate();
+ }
+
+ @Override
+ public synchronized long getCount() {
+ return delegate.getCount();
+ }
+
+ @Override
+ public synchronized void update() {
+ delegate.update();
+ }
+ }
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
new file mode 100644
index 0000000..c2ef91e
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics.lifecycle;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
+import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions;
+import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.CREATED;
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.DEPLOYED;
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.ROLLED_BACK;
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.ROLLING_BACK;
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.STABLE;
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.SUSPENDED;
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.UPGRADING;
+
+/**
+ * Utility for tracking resource lifecycle metrics globally and per namespace.
+ *
+ * @param <CR> Flink resource type.
+ */
+public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>> {
+
+ private static final String TRANSITION_RESUME = "Resume";
+ private static final String TRANSITION_UPGRADE = "Upgrade";
+ private static final String TRANSITION_SUSPEND = "Suspend";
+ private static final String TRANSITION_SUBMISSION = "Submission";
+ private static final String TRANSITION_STABILIZATION = "Stabilization";
+ private static final String TRANSITION_ROLLBACK = "Rollback";
+
+ public static final List<Transition> TRACKED_TRANSITIONS = getTrackedTransitions();
+
+ private final Map<Tuple2<String, String>, ResourceLifecycleMetricTracker> lifecycleTrackers =
+ new ConcurrentHashMap<>();
+ private final Set<String> namespaces = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+ private final FlinkConfigManager configManager;
+ private final Clock clock;
+ private final KubernetesOperatorMetricGroup operatorMetricGroup;
+ private final boolean namespaceHistosEnabled;
+
+ private Map<String, Tuple2<Histogram, Map<String, Histogram>>> transitionMetrics;
+ private Map<ResourceLifecycleState, Tuple2<Histogram, Map<String, Histogram>>> stateTimeMetrics;
+
+ private Function<MetricGroup, MetricGroup> metricGroupFunction;
+
+ public LifecycleMetrics(
+ FlinkConfigManager configManager,
+ Clock clock,
+ KubernetesOperatorMetricGroup operatorMetricGroup) {
+ this.configManager = configManager;
+ this.clock = clock;
+ this.operatorMetricGroup = operatorMetricGroup;
+ this.namespaceHistosEnabled =
+ configManager
+ .getDefaultConfig()
+ .get(
+ KubernetesOperatorMetricOptions
+ .OPERATOR_LIFECYCLE_NAMESPACE_HISTOGRAMS_ENABLED);
+ }
+
+ public void onUpdate(CR cr) {
+ getLifecycleMetricTracker(cr).onUpdate(cr.getStatus().getLifecycleState(), clock.instant());
+ }
+
+ public void onRemove(CR cr) {
+ lifecycleTrackers.remove(
+ Tuple2.of(cr.getMetadata().getNamespace(), cr.getMetadata().getName()));
+ }
+
+ private ResourceLifecycleMetricTracker getLifecycleMetricTracker(CR cr) {
+ init(cr);
+ createNamespaceStateCountIfMissing(cr.getMetadata().getNamespace());
+ return lifecycleTrackers.computeIfAbsent(
+ Tuple2.of(cr.getMetadata().getNamespace(), cr.getMetadata().getName()),
+ k -> {
+ var initialState = cr.getStatus().getLifecycleState();
+ var time =
+ initialState == CREATED
+ ? Instant.parse(cr.getMetadata().getCreationTimestamp())
+ : clock.instant();
+ return new ResourceLifecycleMetricTracker(
+ initialState,
+ time,
+ getTransitionHistograms(cr),
+ getStateTimeHistograms(cr));
+ });
+ }
+
+ private void createNamespaceStateCountIfMissing(String namespace) {
+ if (!namespaces.add(namespace)) {
+ return;
+ }
+
+ MetricGroup lifecycleGroup =
+ metricGroupFunction.apply(
+ operatorMetricGroup.createResourceNamespaceGroup(
+ configManager.getDefaultConfig(), namespace));
+ for (ResourceLifecycleState state : ResourceLifecycleState.values()) {
+ lifecycleGroup
+ .addGroup("State")
+ .addGroup(state.name())
+ .gauge(
+ "Count",
+ () ->
+ lifecycleTrackers.values().stream()
+ .map(ResourceLifecycleMetricTracker::getCurrentState)
+ .filter(s -> s == state)
+ .count());
+ }
+ }
+
+ private synchronized void init(CR cr) {
+ if (transitionMetrics != null) {
+ return;
+ }
+ this.metricGroupFunction =
+ mg -> mg.addGroup(cr.getClass().getSimpleName()).addGroup("Lifecycle");
+
+ this.transitionMetrics = new ConcurrentHashMap<>();
+ TRACKED_TRANSITIONS.forEach(
+ t ->
+ transitionMetrics.computeIfAbsent(
+ t.metricName,
+ name ->
+ Tuple2.of(
+ createTransitionHistogram(
+ name, operatorMetricGroup),
+ new ConcurrentHashMap<>())));
+
+ this.stateTimeMetrics = new ConcurrentHashMap<>();
+ for (ResourceLifecycleState state : ResourceLifecycleState.values()) {
+ stateTimeMetrics.put(
+ state,
+ Tuple2.of(
+ createStateTimeHistogram(state, operatorMetricGroup),
+ new ConcurrentHashMap<>()));
+ }
+ }
+
+ private Map<String, List<Histogram>> getTransitionHistograms(CR cr) {
+ var histos = new HashMap<String, List<Histogram>>();
+ transitionMetrics.forEach(
+ (metricName, t) -> {
+ histos.put(
+ metricName,
+ namespaceHistosEnabled
+ ? List.of(
+ t.f0,
+ t.f1.computeIfAbsent(
+ cr.getMetadata().getNamespace(),
+ ns ->
+ createTransitionHistogram(
+ metricName,
+ operatorMetricGroup
+ .createResourceNamespaceGroup(
+ configManager
+ .getDefaultConfig(),
+ ns))))
+ : List.of(t.f0));
+ });
+ return histos;
+ }
+
+ private Map<ResourceLifecycleState, List<Histogram>> getStateTimeHistograms(CR cr) {
+ var histos = new HashMap<ResourceLifecycleState, List<Histogram>>();
+ stateTimeMetrics.forEach(
+ (state, t) -> {
+ histos.put(
+ state,
+ namespaceHistosEnabled
+ ? List.of(
+ t.f0,
+ t.f1.computeIfAbsent(
+ cr.getMetadata().getNamespace(),
+ ns ->
+ createStateTimeHistogram(
+ state,
+ operatorMetricGroup
+ .createResourceNamespaceGroup(
+ configManager
+ .getDefaultConfig(),
+ ns))))
+ : List.of(t.f0));
+ });
+ return histos;
+ }
+
+ private Histogram createTransitionHistogram(String metricName, MetricGroup group) {
+ return metricGroupFunction
+ .apply(group)
+ .addGroup("Transition")
+ .addGroup(metricName)
+ .histogram(
+ "TimeSeconds",
+ OperatorMetricUtils.createHistogram(
+ configManager.getOperatorConfiguration()));
+ }
+
+ private Histogram createStateTimeHistogram(ResourceLifecycleState state, MetricGroup group) {
+ return metricGroupFunction
+ .apply(group)
+ .addGroup("State")
+ .addGroup(state.name())
+ .histogram(
+ "TimeSeconds",
+ OperatorMetricUtils.createHistogram(
+ configManager.getOperatorConfiguration()));
+ }
+
+ private static List<Transition> getTrackedTransitions() {
+ return List.of(
+ new Transition(SUSPENDED, STABLE, true, TRANSITION_RESUME),
+ new Transition(STABLE, STABLE, true, TRANSITION_UPGRADE),
+ new Transition(DEPLOYED, UPGRADING, true, TRANSITION_SUSPEND),
+ new Transition(STABLE, UPGRADING, true, TRANSITION_SUSPEND),
+ new Transition(ROLLED_BACK, UPGRADING, true, TRANSITION_SUSPEND),
+ new Transition(DEPLOYED, SUSPENDED, true, TRANSITION_SUSPEND),
+ new Transition(STABLE, SUSPENDED, true, TRANSITION_SUSPEND),
+ new Transition(ROLLED_BACK, SUSPENDED, true, TRANSITION_SUSPEND),
+ new Transition(DEPLOYED, STABLE, false, TRANSITION_STABILIZATION),
+ new Transition(DEPLOYED, ROLLED_BACK, false, TRANSITION_ROLLBACK),
+ new Transition(UPGRADING, DEPLOYED, true, TRANSITION_SUBMISSION),
+ new Transition(ROLLING_BACK, ROLLED_BACK, true, TRANSITION_SUBMISSION));
+ }
+
+ @VisibleForTesting
+ protected Map<Tuple2<String, String>, ResourceLifecycleMetricTracker> getLifecycleTrackers() {
+ return lifecycleTrackers;
+ }
+
+ /**
+ * Pojo for encapsulating state transitions and whether we should measure time from the
+ * beginning of from or since the last update.
+ */
+ @ToString
+ @RequiredArgsConstructor
+ protected static class Transition {
+ public final ResourceLifecycleState from;
+ public final ResourceLifecycleState to;
+ public final boolean measureFromLastUpdate;
+ public final String metricName;
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricTracker.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricTracker.java
new file mode 100644
index 0000000..bcba1e2
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricTracker.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics.lifecycle;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.Histogram;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Lifecycle state transition tracker for a single resource. */
+public class ResourceLifecycleMetricTracker {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ResourceLifecycleMetricTracker.class);
+
+ private final Map<String, List<Histogram>> transitionHistos;
+ private final Map<ResourceLifecycleState, List<Histogram>> stateTimeHistos;
+
+ private final Map<ResourceLifecycleState, Tuple2<Instant, Instant>> stateTimeMap =
+ new HashMap<>();
+
+ private ResourceLifecycleState currentState;
+
+ public ResourceLifecycleMetricTracker(
+ ResourceLifecycleState initialState,
+ Instant time,
+ Map<String, List<Histogram>> transitionHistos,
+ Map<ResourceLifecycleState, List<Histogram>> stateTimeHistos) {
+ this.transitionHistos = transitionHistos;
+ this.currentState = initialState;
+ this.stateTimeHistos = stateTimeHistos;
+ updateCurrentState(currentState, time);
+ }
+
+ public void onUpdate(ResourceLifecycleState newState, Instant time) {
+ if (newState == currentState) {
+ updateCurrentState(newState, time);
+ return;
+ }
+
+ LifecycleMetrics.TRACKED_TRANSITIONS.stream()
+ .filter(t -> t.to == newState)
+ .forEach(
+ transition -> {
+ var fromTimes = stateTimeMap.get(transition.from);
+ if (fromTimes != null) {
+ var transitionTime =
+ Duration.between(
+ transition.measureFromLastUpdate
+ ? fromTimes.f1
+ : fromTimes.f0,
+ time)
+ .toSeconds();
+
+ LOG.debug(
+ "Recording transition time {} for {} ({} -> {})",
+ transitionTime,
+ transition.metricName,
+ transition.from,
+ transition.to);
+ transitionHistos
+ .get(transition.metricName)
+ .forEach(h -> h.update(transitionTime));
+ }
+ });
+
+ updateCurrentState(newState, time);
+ }
+
+ private void updateCurrentState(ResourceLifecycleState newState, Instant time) {
+ stateTimeMap.compute(
+ newState,
+ (s, t) -> {
+ if (t == null) {
+ return Tuple2.of(time, time);
+ } else {
+ if (newState != currentState) {
+ t.f0 = time;
+ }
+ t.f1 = time;
+ return t;
+ }
+ });
+
+ if (newState == currentState) {
+ return;
+ }
+
+ var toClear = newState.getClearedStatesAfterTransition(currentState);
+ LOG.debug(
+ "Transitioned from {} to {}, clearing times for {}",
+ currentState,
+ newState,
+ toClear);
+
+ toClear.forEach(
+ state -> {
+ var times = stateTimeMap.remove(state);
+ if (times != null) {
+ var totalSeconds = Duration.between(times.f0, times.f1).toSeconds();
+ stateTimeHistos.get(state).forEach(h -> h.update(totalSeconds));
+ }
+ });
+ currentState = newState;
+ }
+
+ public ResourceLifecycleState getCurrentState() {
+ return currentState;
+ }
+
+ @VisibleForTesting
+ protected Map<String, List<Histogram>> getTransitionHistos() {
+ return transitionHistos;
+ }
+
+ @VisibleForTesting
+ protected Map<ResourceLifecycleState, List<Histogram>> getStateTimeHistos() {
+ return stateTimeHistos;
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleState.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleState.java
new file mode 100644
index 0000000..19c0da3
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleState.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics.lifecycle;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
+
+/** Enum encapsulating the lifecycle state of a Flink resource. */
+public enum ResourceLifecycleState {
+ CREATED(false),
+ SUSPENDED(true),
+ UPGRADING(false),
+ DEPLOYED(false),
+ STABLE(true),
+ ROLLING_BACK(false),
+ ROLLED_BACK(true),
+ FAILED(true);
+
+ private final boolean terminal;
+
+ ResourceLifecycleState(boolean terminal) {
+ this.terminal = terminal;
+ }
+
+ public Set<ResourceLifecycleState> getClearedStatesAfterTransition(
+ ResourceLifecycleState transitionFrom) {
+ if (this == transitionFrom) {
+ return Collections.emptySet();
+ }
+ var states = EnumSet.allOf(ResourceLifecycleState.class);
+ if (terminal) {
+ states.remove(this);
+ return states;
+ }
+
+ if (this == UPGRADING) {
+ states.remove(UPGRADING);
+ states.remove(transitionFrom);
+ return states;
+ }
+
+ return Collections.emptySet();
+ }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index bdefa4b..ebe8abc 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
@@ -443,7 +444,8 @@ public class TestUtils {
public static <T extends AbstractFlinkResource<?, ?>> MetricManager<T> createTestMetricManager(
MetricRegistry metricRegistry, Configuration conf) {
- return new MetricManager<>(createTestMetricGroup(metricRegistry, conf), conf);
+ var confManager = new FlinkConfigManager(conf);
+ return new MetricManager<>(createTestMetricGroup(metricRegistry, conf), confManager);
}
public static KubernetesOperatorMetricGroup createTestMetricGroup(
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java
index dba0a82..fc49c40 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.metrics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.metrics.Counter;
@@ -68,7 +69,7 @@ public class OperatorJosdkMetricsTest {
operatorMetrics =
new OperatorJosdkMetrics(
TestUtils.createTestMetricGroup(registry, new Configuration()),
- new Configuration());
+ new FlinkConfigManager(new Configuration()));
}
@Test
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
new file mode 100644
index 0000000..2e76430
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics.lifecycle;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions;
+import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.metrics.Histogram;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.LongStream;
+
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.CREATED;
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.DEPLOYED;
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.FAILED;
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.ROLLED_BACK;
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.ROLLING_BACK;
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.STABLE;
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.SUSPENDED;
+import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState.UPGRADING;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/** Test for resource lifecycle metrics. */
+public class ResourceLifecycleMetricsTest {
+
+ @Test
+ public void lifecycleStateTest() {
+ var application = TestUtils.buildApplicationCluster();
+ assertEquals(CREATED, application.getStatus().getLifecycleState());
+
+ ReconciliationUtils.updateStatusBeforeDeploymentAttempt(application, new Configuration());
+ assertEquals(UPGRADING, application.getStatus().getLifecycleState());
+
+ ReconciliationUtils.updateStatusForDeployedSpec(application, new Configuration());
+ assertEquals(DEPLOYED, application.getStatus().getLifecycleState());
+
+ application.getStatus().getReconciliationStatus().markReconciledSpecAsStable();
+ assertEquals(STABLE, application.getStatus().getLifecycleState());
+
+ application.getStatus().setError("errr");
+ assertEquals(STABLE, application.getStatus().getLifecycleState());
+
+ application.getStatus().getJobStatus().setState(JobStatus.FAILED.name());
+ assertEquals(FAILED, application.getStatus().getLifecycleState());
+
+ application.getStatus().setError("");
+
+ application
+ .getStatus()
+ .getReconciliationStatus()
+ .setState(ReconciliationState.ROLLING_BACK);
+ assertEquals(ROLLING_BACK, application.getStatus().getLifecycleState());
+
+ application.getStatus().getJobStatus().setState(JobStatus.RECONCILING.name());
+ application.getStatus().getReconciliationStatus().setState(ReconciliationState.ROLLED_BACK);
+ assertEquals(ROLLED_BACK, application.getStatus().getLifecycleState());
+
+ application.getStatus().getJobStatus().setState(JobStatus.FAILED.name());
+ assertEquals(FAILED, application.getStatus().getLifecycleState());
+
+ application.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
+ application.getSpec().getJob().setState(JobState.SUSPENDED);
+ ReconciliationUtils.updateStatusForDeployedSpec(application, new Configuration());
+ assertEquals(SUSPENDED, application.getStatus().getLifecycleState());
+ }
+
+ @Test
+ public void testLifecycleTracker() {
+ var transitionHistos = initTransitionHistos();
+ var timeHistos = initTimeHistos();
+
+ var lifecycleTracker =
+ new ResourceLifecycleMetricTracker(
+ CREATED, Instant.ofEpochMilli(0), transitionHistos, timeHistos);
+
+ long ts = 1000;
+ lifecycleTracker.onUpdate(CREATED, Instant.ofEpochMilli(ts));
+ lifecycleTracker.onUpdate(UPGRADING, Instant.ofEpochMilli(ts += 1000));
+ lifecycleTracker.onUpdate(UPGRADING, Instant.ofEpochMilli(ts += 1000));
+ lifecycleTracker.onUpdate(DEPLOYED, Instant.ofEpochMilli(ts += 1000));
+ lifecycleTracker.onUpdate(DEPLOYED, Instant.ofEpochMilli(ts += 1000));
+ lifecycleTracker.onUpdate(STABLE, Instant.ofEpochMilli(ts += 1000));
+
+ lifecycleTracker.onUpdate(UPGRADING, Instant.ofEpochMilli(ts += 1000));
+ lifecycleTracker.onUpdate(UPGRADING, Instant.ofEpochMilli(ts += 1000));
+ lifecycleTracker.onUpdate(DEPLOYED, Instant.ofEpochMilli(ts += 1000));
+ lifecycleTracker.onUpdate(ROLLING_BACK, Instant.ofEpochMilli(ts += 1000));
+ lifecycleTracker.onUpdate(ROLLED_BACK, Instant.ofEpochMilli(ts += 1000));
+
+ lifecycleTracker.onUpdate(SUSPENDED, Instant.ofEpochMilli(ts += 1000));
+ lifecycleTracker.onUpdate(UPGRADING, Instant.ofEpochMilli(ts += 1000));
+ lifecycleTracker.onUpdate(DEPLOYED, Instant.ofEpochMilli(ts += 1000));
+ lifecycleTracker.onUpdate(DEPLOYED, Instant.ofEpochMilli(ts += 1000));
+ lifecycleTracker.onUpdate(STABLE, Instant.ofEpochMilli(ts += 1000));
+ lifecycleTracker.onUpdate(UPGRADING, Instant.ofEpochMilli(ts += 1000));
+ lifecycleTracker.onUpdate(UPGRADING, Instant.ofEpochMilli(ts += 1000));
+ lifecycleTracker.onUpdate(DEPLOYED, Instant.ofEpochMilli(ts += 1000));
+ lifecycleTracker.onUpdate(DEPLOYED, Instant.ofEpochMilli(ts += 1000));
+ lifecycleTracker.onUpdate(STABLE, Instant.ofEpochMilli(ts += 1000));
+
+ validateTransition(transitionHistos, "Resume", 1, 4);
+ validateTransition(transitionHistos, "Upgrade", 1, 5);
+ validateTransition(transitionHistos, "Suspend", 3, 1);
+ validateTransition(transitionHistos, "Stabilization", 3, 2);
+ validateTransition(transitionHistos, "Rollback", 1, 2);
+ validateTransition(transitionHistos, "Submission", 5, 1);
+
+ validateTime(timeHistos, CREATED, 1, 1);
+ validateTime(timeHistos, UPGRADING, 4, 3);
+ }
+
+ @Test
+ public void testLifecycleMetricsConfig() {
+ var dep1 = TestUtils.buildApplicationCluster();
+ dep1.getMetadata().setNamespace("ns1");
+ dep1.getMetadata().setName("n1");
+ var dep2 = TestUtils.buildApplicationCluster();
+ dep2.getMetadata().setNamespace("ns1");
+ dep2.getMetadata().setName("n2");
+ var dep3 = TestUtils.buildApplicationCluster();
+ dep3.getMetadata().setNamespace("ns2");
+ dep3.getMetadata().setName("n3");
+
+ var conf = new Configuration();
+ var metricManager = TestUtils.createTestMetricManager(conf);
+ var lifeCycleMetrics = metricManager.getLifeCycleMetrics();
+
+ metricManager.onUpdate(dep1);
+ metricManager.onUpdate(dep2);
+ metricManager.onUpdate(dep3);
+
+ var trackers = lifeCycleMetrics.getLifecycleTrackers();
+ var tracker1 = trackers.get(Tuple2.of("ns1", "n1"));
+ var tracker2 = trackers.get(Tuple2.of("ns1", "n2"));
+ var tracker3 = trackers.get(Tuple2.of("ns2", "n3"));
+
+ assertEquals(tracker1.getStateTimeHistos(), tracker2.getStateTimeHistos());
+ assertEquals(tracker1.getTransitionHistos(), tracker2.getTransitionHistos());
+ assertNotEquals(tracker1.getStateTimeHistos(), tracker3.getStateTimeHistos());
+ assertNotEquals(tracker1.getTransitionHistos(), tracker3.getTransitionHistos());
+ tracker1.getStateTimeHistos()
+ .forEach(
+ (k, l) -> {
+ assertEquals(2, l.size());
+ assertEquals(l.get(0), tracker3.getStateTimeHistos().get(k).get(0));
+ });
+
+ tracker1.getTransitionHistos()
+ .forEach(
+ (k, l) -> {
+ assertEquals(2, l.size());
+ assertEquals(l.get(0), tracker3.getTransitionHistos().get(k).get(0));
+ });
+
+ conf.set(
+ KubernetesOperatorMetricOptions.OPERATOR_LIFECYCLE_NAMESPACE_HISTOGRAMS_ENABLED,
+ false);
+ metricManager = TestUtils.createTestMetricManager(conf);
+ lifeCycleMetrics = metricManager.getLifeCycleMetrics();
+
+ metricManager.onUpdate(dep1);
+ metricManager.onUpdate(dep2);
+ metricManager.onUpdate(dep3);
+
+ trackers = lifeCycleMetrics.getLifecycleTrackers();
+ assertEquals(
+ trackers.get(Tuple2.of("ns1", "n1")).getStateTimeHistos(),
+ trackers.get(Tuple2.of("ns2", "n3")).getStateTimeHistos());
+ assertEquals(
+ trackers.get(Tuple2.of("ns1", "n1")).getTransitionHistos(),
+ trackers.get(Tuple2.of("ns2", "n3")).getTransitionHistos());
+ trackers.get(Tuple2.of("ns1", "n1"))
+ .getStateTimeHistos()
+ .forEach(
+ (k, l) -> {
+ assertEquals(1, l.size());
+ });
+
+ trackers.get(Tuple2.of("ns1", "n1"))
+ .getTransitionHistos()
+ .forEach(
+ (k, l) -> {
+ assertEquals(1, l.size());
+ });
+
+ conf.set(KubernetesOperatorMetricOptions.OPERATOR_LIFECYCLE_METRICS_ENABLED, false);
+ metricManager = TestUtils.createTestMetricManager(conf);
+ assertNull(metricManager.getLifeCycleMetrics());
+
+ metricManager.onUpdate(dep1);
+ metricManager.onUpdate(dep2);
+ metricManager.onUpdate(dep3);
+ }
+
+ private void validateTransition(
+ Map<String, List<Histogram>> histos, String name, int size, long mean) {
+ histos.get(name)
+ .forEach(
+ h -> {
+ var stat = h.getStatistics();
+ assertEquals(size, stat.size());
+ assertEquals(mean, stat.getMean());
+ });
+ }
+
+ private void validateTime(
+ Map<ResourceLifecycleState, List<Histogram>> histos,
+ ResourceLifecycleState state,
+ int size,
+ long sum) {
+ histos.get(state)
+ .forEach(
+ h -> {
+ var stat = h.getStatistics();
+ assertEquals(size, stat.size());
+ assertEquals(sum, LongStream.of(stat.getValues()).sum());
+ });
+ }
+
+ private Map<String, List<Histogram>> initTransitionHistos() {
+ var histos = new ConcurrentHashMap<String, List<Histogram>>();
+ LifecycleMetrics.TRACKED_TRANSITIONS.forEach(
+ t ->
+ histos.computeIfAbsent(
+ t.metricName,
+ name ->
+ List.of(
+ OperatorMetricUtils.createHistogram(
+ FlinkOperatorConfiguration
+ .fromConfiguration(
+ new Configuration())))));
+ return histos;
+ }
+
+ private Map<ResourceLifecycleState, List<Histogram>> initTimeHistos() {
+ var histos = new ConcurrentHashMap<ResourceLifecycleState, List<Histogram>>();
+ for (ResourceLifecycleState state : ResourceLifecycleState.values()) {
+ histos.put(
+ state,
+ List.of(
+ OperatorMetricUtils.createHistogram(
+ FlinkOperatorConfiguration.fromConfiguration(
+ new Configuration()))));
+ }
+ return histos;
+ }
+}