You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/09/29 19:15:48 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-29474] Fix global lifecycle metrics group to include resource type
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 3ae66dbd [FLINK-29474] Fix global lifecycle metrics group to include resource type
3ae66dbd is described below
commit 3ae66dbddd4126a19c1e8f0105027bc370f5e753
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Sep 29 16:48:44 2022 +0200
[FLINK-29474] Fix global lifecycle metrics group to include resource type
---
.../metrics/lifecycle/LifecycleMetrics.java | 8 ++-
.../lifecycle/ResourceLifecycleMetricsTest.java | 61 +++++++++++++++++++---
2 files changed, 61 insertions(+), 8 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
index dde85075..8633a8f1 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
@@ -160,7 +160,9 @@ public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>>
name ->
Tuple2.of(
createTransitionHistogram(
- name, operatorMetricGroup),
+ name,
+ operatorMetricGroup.addGroup(
+ cr.getClass().getSimpleName())),
new ConcurrentHashMap<>())));
this.stateTimeMetrics = new ConcurrentHashMap<>();
@@ -168,7 +170,9 @@ public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>>
stateTimeMetrics.put(
state,
Tuple2.of(
- createStateTimeHistogram(state, operatorMetricGroup),
+ createStateTimeHistogram(
+ state,
+ operatorMetricGroup.addGroup(cr.getClass().getSimpleName())),
new ConcurrentHashMap<>()));
}
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
index 3240dc6c..cc41bc80 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
@@ -23,13 +23,16 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.metrics.CustomResourceMetrics;
import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions;
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
+import org.apache.flink.kubernetes.operator.metrics.TestingMetricListener;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.metrics.Histogram;
@@ -52,6 +55,7 @@ import static org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLif
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/** Test for resource lifecycle metrics. */
public class ResourceLifecycleMetricsTest {
@@ -129,7 +133,7 @@ public class ResourceLifecycleMetricsTest {
lifecycleTracker.onUpdate(UPGRADING, Instant.ofEpochMilli(ts += 1000));
lifecycleTracker.onUpdate(DEPLOYED, Instant.ofEpochMilli(ts += 1000));
lifecycleTracker.onUpdate(DEPLOYED, Instant.ofEpochMilli(ts += 1000));
- lifecycleTracker.onUpdate(STABLE, Instant.ofEpochMilli(ts += 1000));
+ lifecycleTracker.onUpdate(STABLE, Instant.ofEpochMilli(ts + 1000));
validateTransition(transitionHistos, "Resume", 1, 4);
validateTransition(transitionHistos, "Upgrade", 1, 5);
@@ -225,12 +229,57 @@ public class ResourceLifecycleMetricsTest {
metricManager.onUpdate(dep3);
}
- public static LifecycleMetrics<FlinkDeployment> getLifeCycleMetrics(
- MetricManager<FlinkDeployment> metricManager) {
- for (CustomResourceMetrics<FlinkDeployment> metrics :
- metricManager.getRegisteredMetrics()) {
+ @Test
+ public void testGlobalHistoNames() {
+ var conf = new Configuration();
+ var testingMetricListener = new TestingMetricListener(new Configuration());
+ var deploymentMetricManager =
+ MetricManager.createFlinkDeploymentMetricManager(
+ new FlinkConfigManager(conf), testingMetricListener.getMetricGroup());
+ var deploymentLifecycleMetrics = getLifeCycleMetrics(deploymentMetricManager);
+ deploymentLifecycleMetrics.onUpdate(TestUtils.buildApplicationCluster());
+ testGlobalHistoNames(testingMetricListener, FlinkDeployment.class);
+
+ var sessionJobMetricManager =
+ MetricManager.createFlinkSessionJobMetricManager(
+ new FlinkConfigManager(conf), testingMetricListener.getMetricGroup());
+ var sessionJobLifecycleMetrics = getLifeCycleMetrics(sessionJobMetricManager);
+ sessionJobLifecycleMetrics.onUpdate(TestUtils.buildSessionJob());
+
+ testGlobalHistoNames(testingMetricListener, FlinkSessionJob.class);
+ }
+
+ private void testGlobalHistoNames(TestingMetricListener metricListener, Class<?> resoureClass) {
+ for (var state : ResourceLifecycleState.values()) {
+ assertTrue(
+ metricListener
+ .getHistogram(
+ String.format(
+ metricListener.getMetricId(
+ "%s.Lifecycle.State.%s.TimeSeconds"),
+ resoureClass.getSimpleName(),
+ state))
+ .isPresent());
+ }
+
+ for (var transition : LifecycleMetrics.TRACKED_TRANSITIONS) {
+ assertTrue(
+ metricListener
+ .getHistogram(
+ String.format(
+ metricListener.getMetricId(
+ "%s.Lifecycle.Transition.%s.TimeSeconds"),
+ resoureClass.getSimpleName(),
+ transition.metricName))
+ .isPresent());
+ }
+ }
+
+ public static <T extends AbstractFlinkResource<?, ?>> LifecycleMetrics<T> getLifeCycleMetrics(
+ MetricManager<T> metricManager) {
+ for (CustomResourceMetrics<?> metrics : metricManager.getRegisteredMetrics()) {
if (metrics instanceof LifecycleMetrics) {
- return (LifecycleMetrics<FlinkDeployment>) metrics;
+ return (LifecycleMetrics<T>) metrics;
}
}
return null;