You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/08/05 13:10:24 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-28749] Include resource type in KubernetesOperatorResourceNamespaceMetricGroup
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new c91ebbd1 [FLINK-28749] Include resource type in KubernetesOperatorResourceNamespaceMetricGroup
c91ebbd1 is described below
commit c91ebbd1ae989749948dc6de69255be70c5b7557
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Fri Aug 5 15:10:19 2022 +0200
[FLINK-28749] Include resource type in KubernetesOperatorResourceNamespaceMetricGroup
---
.../kubernetes_operator_metric_configuration.html | 4 +-
.../operator/metrics/FlinkDeploymentMetrics.java | 6 +-
.../operator/metrics/FlinkSessionJobMetrics.java | 4 +-
.../metrics/KubernetesOperatorMetricGroup.java | 11 +-
.../metrics/KubernetesOperatorMetricOptions.java | 5 +-
.../KubernetesResourceNamespaceMetricGroup.java | 9 +-
.../KubernetesResourceNamespaceScopeFormat.java | 12 ++-
.../metrics/KubernetesResourceScopeFormat.java | 11 +-
.../operator/metrics/OperatorJosdkMetrics.java | 90 +++++++++++-----
.../metrics/lifecycle/LifecycleMetrics.java | 15 +--
.../metrics/FlinkDeploymentMetricsTest.java | 69 ++++++++++---
.../metrics/FlinkSessionJobMetricsTest.java | 12 ++-
.../metrics/KubernetesOperatorMetricGroupTest.java | 26 +++--
.../operator/metrics/OperatorJosdkMetricsTest.java | 113 +++++++++++----------
.../operator/metrics/TestingMetricListener.java | 23 ++++-
15 files changed, 271 insertions(+), 139 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
index 74a02191..57859b88 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
@@ -52,13 +52,13 @@
</tr>
<tr>
<td><h5>metrics.scope.k8soperator.resource</h5></td>
- <td style="word-wrap: break-word;">"<host>.k8soperator.<namespace>.<name>.resource.<resourcens>.<resourcename>"</td>
+ <td style="word-wrap: break-word;">"<host>.k8soperator.<namespace>.<name>.resource.<resourcens>.<resourcename>.<resourcetype>"</td>
<td>String</td>
<td>Defines the scope format string that is applied to all metrics scoped to the kubernetes operator resource.</td>
</tr>
<tr>
<td><h5>metrics.scope.k8soperator.resourcens</h5></td>
- <td style="word-wrap: break-word;">"<host>.k8soperator.<namespace>.<name>.namespace.<resourcens>"</td>
+ <td style="word-wrap: break-word;">"<host>.k8soperator.<namespace>.<name>.namespace.<resourcens>.<resourcetype>"</td>
<td>String</td>
<td>Defines the scope format string that is applied to all metrics scoped to the kubernetes operator resource namespace.</td>
</tr>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
index 0c8f3826..5ec4f42f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetrics.java
@@ -32,7 +32,6 @@ public class FlinkDeploymentMetrics implements CustomResourceMetrics<FlinkDeploy
private final Configuration configuration;
private final Map<String, Map<JobManagerDeploymentStatus, Set<String>>> deployments =
new ConcurrentHashMap<>();
- public static final String DEPLOYMENT_GROUP_NAME = FlinkDeployment.class.getSimpleName();
public static final String STATUS_GROUP_NAME = "JmDeploymentStatus";
public static final String COUNTER_NAME = "Count";
@@ -68,8 +67,7 @@ public class FlinkDeploymentMetrics implements CustomResourceMetrics<FlinkDeploy
private void initNamespaceDeploymentCounts(String ns) {
parentMetricGroup
- .createResourceNamespaceGroup(configuration, ns)
- .addGroup(DEPLOYMENT_GROUP_NAME)
+ .createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns)
.gauge(
COUNTER_NAME,
() -> deployments.get(ns).values().stream().mapToInt(Set::size).sum());
@@ -78,7 +76,7 @@ public class FlinkDeploymentMetrics implements CustomResourceMetrics<FlinkDeploy
private void initNamespaceStatusCounts(String ns) {
for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) {
parentMetricGroup
- .createResourceNamespaceGroup(configuration, ns)
+ .createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns)
.addGroup(STATUS_GROUP_NAME)
.addGroup(status.toString())
.gauge(COUNTER_NAME, () -> deployments.get(ns).get(status).size());
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetrics.java
index 1d099519..cf43254b 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetrics.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetrics.java
@@ -30,7 +30,6 @@ public class FlinkSessionJobMetrics implements CustomResourceMetrics<FlinkSessio
private final KubernetesOperatorMetricGroup parentMetricGroup;
private final Configuration configuration;
private final Map<String, Set<String>> sessionJobs = new ConcurrentHashMap<>();
- public static final String METRIC_GROUP_NAME = FlinkSessionJob.class.getSimpleName();
public static final String COUNTER_NAME = "Count";
public FlinkSessionJobMetrics(
@@ -62,8 +61,7 @@ public class FlinkSessionJobMetrics implements CustomResourceMetrics<FlinkSessio
private void initNamespaceSessionJobCounts(String ns) {
parentMetricGroup
- .createResourceNamespaceGroup(configuration, ns)
- .addGroup(METRIC_GROUP_NAME)
+ .createResourceNamespaceGroup(configuration, FlinkSessionJob.class, ns)
.gauge(COUNTER_NAME, () -> sessionJobs.get(ns).size());
}
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
index a410d6f8..210af978 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.metrics;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
@@ -46,13 +47,17 @@ public class KubernetesOperatorMetricGroup extends AbstractMetricGroup<AbstractM
}
public KubernetesResourceNamespaceMetricGroup createResourceNamespaceGroup(
- Configuration config, String resourceNs) {
+ Configuration config,
+ Class<? extends AbstractFlinkResource> resourceClass,
+ String resourceNs) {
+ var resourceType = resourceClass.getSimpleName();
return new KubernetesResourceNamespaceMetricGroup(
registry,
this,
KubernetesResourceNamespaceScopeFormat.fromConfig(config)
- .formatScope(namespace, name, hostname, resourceNs),
- resourceNs);
+ .formatScope(namespace, name, hostname, resourceNs, resourceType),
+ resourceNs,
+ resourceType);
}
public static KubernetesOperatorMetricGroup create(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
index ae014b9f..9da0b360 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
@@ -82,7 +82,8 @@ public class KubernetesOperatorMetricOptions {
public static final ConfigOption<String> SCOPE_NAMING_KUBERNETES_OPERATOR_RESOURCENS =
ConfigOptions.key("metrics.scope.k8soperator.resourcens")
.stringType()
- .defaultValue("<host>.k8soperator.<namespace>.<name>.namespace.<resourcens>")
+ .defaultValue(
+ "<host>.k8soperator.<namespace>.<name>.namespace.<resourcens>.<resourcetype>")
.withDescription(
"Defines the scope format string that is applied to all metrics scoped to the kubernetes operator resource namespace.");
@@ -90,7 +91,7 @@ public class KubernetesOperatorMetricOptions {
ConfigOptions.key("metrics.scope.k8soperator.resource")
.stringType()
.defaultValue(
- "<host>.k8soperator.<namespace>.<name>.resource.<resourcens>.<resourcename>")
+ "<host>.k8soperator.<namespace>.<name>.resource.<resourcens>.<resourcename>.<resourcetype>")
.withDescription(
"Defines the scope format string that is applied to all metrics scoped to the kubernetes operator resource.");
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceMetricGroup.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceMetricGroup.java
index bd6f8e22..b3dd9513 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceMetricGroup.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceMetricGroup.java
@@ -30,14 +30,17 @@ public class KubernetesResourceNamespaceMetricGroup
extends AbstractMetricGroup<KubernetesOperatorMetricGroup> {
private final String resourceNs;
+ private final String resourceType;
protected KubernetesResourceNamespaceMetricGroup(
MetricRegistry registry,
KubernetesOperatorMetricGroup parent,
String[] scope,
- String resourceNs) {
+ String resourceNs,
+ String resourceType) {
super(registry, scope, parent);
this.resourceNs = resourceNs;
+ this.resourceType = resourceType;
}
public KubernetesResourceMetricGroup createResourceNamespaceGroup(
@@ -51,13 +54,15 @@ public class KubernetesResourceNamespaceMetricGroup
parent.name,
parent.hostname,
resourceNs,
- resourceName),
+ resourceName,
+ resourceType),
resourceName);
}
@Override
protected final void putVariables(Map<String, String> variables) {
variables.put(KubernetesResourceNamespaceScopeFormat.RESOURCE_NS, resourceNs);
+ variables.put(KubernetesResourceNamespaceScopeFormat.RESOURCE_TYPE, resourceType);
}
@Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceScopeFormat.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceScopeFormat.java
index 17b6ffe1..d421832f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceScopeFormat.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceScopeFormat.java
@@ -28,14 +28,20 @@ import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorSco
public class KubernetesResourceNamespaceScopeFormat extends ScopeFormat {
public static final String RESOURCE_NS = asVariable("resourcens");
+ public static final String RESOURCE_TYPE = asVariable("resourcetype");
public KubernetesResourceNamespaceScopeFormat(String format) {
- super(format, null, new String[] {NAMESPACE, NAME, SCOPE_HOST, RESOURCE_NS});
+ super(format, null, new String[] {NAMESPACE, NAME, SCOPE_HOST, RESOURCE_NS, RESOURCE_TYPE});
}
- public String[] formatScope(String namespace, String name, String hostname, String resourceNs) {
+ public String[] formatScope(
+ String namespace,
+ String name,
+ String hostname,
+ String resourceNs,
+ String resourceType) {
final String[] template = copyTemplate();
- final String[] values = {namespace, name, hostname, resourceNs};
+ final String[] values = {namespace, name, hostname, resourceNs, resourceType};
return bindVariables(template, values);
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceScopeFormat.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceScopeFormat.java
index 7f05ab63..0c93e9aa 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceScopeFormat.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceScopeFormat.java
@@ -24,6 +24,7 @@ import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMet
import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorScopeFormat.NAME;
import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorScopeFormat.NAMESPACE;
import static org.apache.flink.kubernetes.operator.metrics.KubernetesResourceNamespaceScopeFormat.RESOURCE_NS;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesResourceNamespaceScopeFormat.RESOURCE_TYPE;
/** Format for metrics. * */
public class KubernetesResourceScopeFormat extends ScopeFormat {
@@ -31,7 +32,10 @@ public class KubernetesResourceScopeFormat extends ScopeFormat {
public static final String RESOURCE = asVariable("resourcename");
public KubernetesResourceScopeFormat(String format) {
- super(format, null, new String[] {NAMESPACE, NAME, SCOPE_HOST, RESOURCE_NS, RESOURCE});
+ super(
+ format,
+ null,
+ new String[] {NAMESPACE, NAME, SCOPE_HOST, RESOURCE_NS, RESOURCE, RESOURCE_TYPE});
}
public String[] formatScope(
@@ -39,9 +43,10 @@ public class KubernetesResourceScopeFormat extends ScopeFormat {
String name,
String hostname,
String resourceNs,
- String resourceName) {
+ String resourceName,
+ String resourceType) {
final String[] template = copyTemplate();
- final String[] values = {namespace, name, hostname, resourceNs, resourceName};
+ final String[] values = {namespace, name, hostname, resourceNs, resourceName, resourceType};
return bindVariables(template, values);
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
index fa09fb97..7c2e85b7 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
@@ -18,8 +18,9 @@
package org.apache.flink.kubernetes.operator.metrics;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
-import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
-import org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MetricGroup;
@@ -28,15 +29,19 @@ import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
+import io.javaoperatorsdk.operator.api.reconciler.Constants;
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.processing.GroupVersionKind;
import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
+import org.jetbrains.annotations.NotNull;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -62,13 +67,6 @@ public class OperatorJosdkMetrics implements Metrics {
private final Map<List<String>, Histogram> histograms = new ConcurrentHashMap<>();
private final Map<List<String>, Counter> counters = new ConcurrentHashMap<>();
- private static final Map<String, String> CONTROLLERS =
- Map.of(
- FlinkDeploymentController.class.getSimpleName().toLowerCase(),
- "FlinkDeployment",
- FlinkSessionJobController.class.getSimpleName().toLowerCase(),
- "FlinkSessionJob");
-
public OperatorJosdkMetrics(
KubernetesOperatorMetricGroup operatorMetricGroup, FlinkConfigManager configManager) {
this.operatorMetricGroup = operatorMetricGroup;
@@ -90,12 +88,13 @@ public class OperatorJosdkMetrics implements Metrics {
}
@Override
- public void receivedEvent(Event event) {
+ public void receivedEvent(Event event, Map<String, Object> metadata) {
if (event instanceof ResourceEvent) {
var action = ((ResourceEvent) event).getAction();
- counter(getResourceMg(event.getRelatedCustomResourceID()), RESOURCE, EVENT).inc();
+ counter(getResourceMg(event.getRelatedCustomResourceID(), metadata), RESOURCE, EVENT)
+ .inc();
counter(
- getResourceMg(event.getRelatedCustomResourceID()),
+ getResourceMg(event.getRelatedCustomResourceID(), metadata),
RESOURCE,
EVENT,
action.name())
@@ -104,32 +103,34 @@ public class OperatorJosdkMetrics implements Metrics {
}
@Override
- public void cleanupDoneFor(ResourceID resourceID) {
- counter(getResourceMg(resourceID), RECONCILIATION, "cleanup").inc();
+ public void cleanupDoneFor(ResourceID resourceID, Map<String, Object> metadata) {
+ counter(getResourceMg(resourceID, metadata), RECONCILIATION, "cleanup").inc();
}
@Override
- public void reconcileCustomResource(ResourceID resourceID, RetryInfo retryInfoNullable) {
- counter(getResourceMg(resourceID), RECONCILIATION).inc();
+ public void reconcileCustomResource(
+ ResourceID resourceID, RetryInfo retryInfoNullable, Map<String, Object> metadata) {
+ counter(getResourceMg(resourceID, metadata), RECONCILIATION).inc();
if (retryInfoNullable != null) {
- counter(getResourceMg(resourceID), RECONCILIATION, "retries").inc();
+ counter(getResourceMg(resourceID, metadata), RECONCILIATION, "retries").inc();
}
}
@Override
- public void finishedReconciliation(ResourceID resourceID) {
- counter(getResourceMg(resourceID), RECONCILIATION, "finished").inc();
+ public void finishedReconciliation(ResourceID resourceID, Map<String, Object> metadata) {
+ counter(getResourceMg(resourceID, metadata), RECONCILIATION, "finished").inc();
}
@Override
- public void failedReconciliation(ResourceID resourceID, Exception exception) {
- counter(getResourceMg(resourceID), RECONCILIATION, "failed").inc();
+ public void failedReconciliation(
+ ResourceID resourceID, Exception exception, Map<String, Object> metadata) {
+ counter(getResourceMg(resourceID, metadata), RECONCILIATION, "failed").inc();
}
@Override
public <T extends Map<?, ?>> T monitorSizeOf(T map, String name) {
- operatorMetricGroup.addGroup(name).gauge("size", map::size);
+ operatorMetricGroup.addGroup(OPERATOR_SDK_GROUP).addGroup(name).gauge("size", map::size);
return map;
}
@@ -138,7 +139,9 @@ public class OperatorJosdkMetrics implements Metrics {
return histograms.computeIfAbsent(
groups,
k -> {
- var group = operatorMetricGroup.addGroup(OPERATOR_SDK_GROUP);
+ var group =
+ getResourceNsMg(execution.resourceID(), execution.metadata())
+ .addGroup(OPERATOR_SDK_GROUP);
for (String mg : groups) {
group = group.addGroup(mg);
}
@@ -151,8 +154,7 @@ public class OperatorJosdkMetrics implements Metrics {
}
private List<String> getHistoGroups(ControllerExecution<?> execution, String name) {
- return List.of(
- CONTROLLERS.get(execution.controllerName().toLowerCase()), execution.name(), name);
+ return List.of(execution.name(), name);
}
private long toSeconds(long startTime) {
@@ -176,20 +178,52 @@ public class OperatorJosdkMetrics implements Metrics {
});
}
- private KubernetesResourceNamespaceMetricGroup getResourceNsMg(ResourceID resourceID) {
+ private KubernetesResourceNamespaceMetricGroup getResourceNsMg(
+ ResourceID resourceID, Map<String, Object> metadata) {
+ Class<? extends AbstractFlinkResource<?, ?>> resourceClass =
+ getResourceClass(metadata)
+ .orElseThrow(
+ () ->
+ new RuntimeException(
+ "Unknown resource kind for " + resourceID));
+
return resourceNsMetricGroups.computeIfAbsent(
resourceID,
rid ->
operatorMetricGroup.createResourceNamespaceGroup(
configManager.getDefaultConfig(),
+ resourceClass,
rid.getNamespace().orElse("default")));
}
- private KubernetesResourceMetricGroup getResourceMg(ResourceID resourceID) {
+ @NotNull
+ private Optional<Class<? extends AbstractFlinkResource<?, ?>>> getResourceClass(
+ Map<String, Object> metadata) {
+ var resourceGvk = (GroupVersionKind) metadata.get(Constants.RESOURCE_GVK_KEY);
+
+ if (resourceGvk == null) {
+ return Optional.empty();
+ }
+
+ Class<? extends AbstractFlinkResource<?, ?>> resourceClass;
+
+ if (resourceGvk.kind.equals(FlinkDeployment.class.getSimpleName())) {
+ resourceClass = FlinkDeployment.class;
+ } else if (resourceGvk.kind.equals(FlinkSessionJob.class.getSimpleName())) {
+ resourceClass = FlinkSessionJob.class;
+ } else {
+ return Optional.empty();
+ }
+
+ return Optional.of(resourceClass);
+ }
+
+ private KubernetesResourceMetricGroup getResourceMg(
+ ResourceID resourceID, Map<String, Object> metadata) {
return resourceMetricGroups.computeIfAbsent(
resourceID,
rid ->
- getResourceNsMg(rid)
+ getResourceNsMg(rid, metadata)
.createResourceNamespaceGroup(
configManager.getDefaultConfig(), rid.getName()));
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
index 872e749c..dde85075 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java
@@ -78,7 +78,7 @@ public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>>
private Map<String, Tuple2<Histogram, Map<String, Histogram>>> transitionMetrics;
private Map<ResourceLifecycleState, Tuple2<Histogram, Map<String, Histogram>>> stateTimeMetrics;
- private Function<MetricGroup, MetricGroup> metricGroupFunction;
+ private Function<MetricGroup, MetricGroup> metricGroupFunction = mg -> mg.addGroup("Lifecycle");
public LifecycleMetrics(
FlinkConfigManager configManager, KubernetesOperatorMetricGroup operatorMetricGroup) {
@@ -106,7 +106,7 @@ public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>>
private ResourceLifecycleMetricTracker getLifecycleMetricTracker(CR cr) {
init(cr);
- createNamespaceStateCountIfMissing(cr.getMetadata().getNamespace());
+ createNamespaceStateCountIfMissing(cr);
return lifecycleTrackers.computeIfAbsent(
Tuple2.of(cr.getMetadata().getNamespace(), cr.getMetadata().getName()),
k -> {
@@ -123,7 +123,8 @@ public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>>
});
}
- private void createNamespaceStateCountIfMissing(String namespace) {
+ private void createNamespaceStateCountIfMissing(CR cr) {
+ var namespace = cr.getMetadata().getNamespace();
if (!namespaces.add(namespace)) {
return;
}
@@ -131,7 +132,7 @@ public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>>
MetricGroup lifecycleGroup =
metricGroupFunction.apply(
operatorMetricGroup.createResourceNamespaceGroup(
- configManager.getDefaultConfig(), namespace));
+ configManager.getDefaultConfig(), cr.getClass(), namespace));
for (ResourceLifecycleState state : ResourceLifecycleState.values()) {
lifecycleGroup
.addGroup("State")
@@ -150,8 +151,6 @@ public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>>
if (transitionMetrics != null) {
return;
}
- this.metricGroupFunction =
- mg -> mg.addGroup(cr.getClass().getSimpleName()).addGroup("Lifecycle");
this.transitionMetrics = new ConcurrentHashMap<>();
TRACKED_TRANSITIONS.forEach(
@@ -192,6 +191,8 @@ public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>>
.createResourceNamespaceGroup(
configManager
.getDefaultConfig(),
+ cr
+ .getClass(),
ns))))
: List.of(t.f0)));
return histos;
@@ -215,6 +216,8 @@ public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>>
.createResourceNamespaceGroup(
configManager
.getDefaultConfig(),
+ cr
+ .getClass(),
ns))))
: List.of(t.f0)));
return histos;
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
index 3c368fef..81624a57 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java
@@ -27,7 +27,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.COUNTER_NAME;
-import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.DEPLOYMENT_GROUP_NAME;
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.STATUS_GROUP_NAME;
import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -55,12 +54,16 @@ public class FlinkDeploymentMetricsTest {
var deployment2 = TestUtils.buildApplicationCluster("deployment2", namespace);
var counterId =
- listener.getNamespaceMetricId(namespace, DEPLOYMENT_GROUP_NAME, COUNTER_NAME);
+ listener.getNamespaceMetricId(FlinkDeployment.class, namespace, COUNTER_NAME);
assertTrue(listener.getGauge(counterId).isEmpty());
for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) {
var statusId =
listener.getNamespaceMetricId(
- namespace, STATUS_GROUP_NAME, status.name(), COUNTER_NAME);
+ FlinkDeployment.class,
+ namespace,
+ STATUS_GROUP_NAME,
+ status.name(),
+ COUNTER_NAME);
assertTrue(listener.getGauge(statusId).isEmpty());
}
@@ -75,7 +78,11 @@ public class FlinkDeploymentMetricsTest {
var statusId =
listener.getNamespaceMetricId(
- namespace, STATUS_GROUP_NAME, status.name(), COUNTER_NAME);
+ FlinkDeployment.class,
+ namespace,
+ STATUS_GROUP_NAME,
+ status.name(),
+ COUNTER_NAME);
assertEquals(2, listener.getGauge(statusId).get().getValue());
}
@@ -85,7 +92,11 @@ public class FlinkDeploymentMetricsTest {
for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) {
var statusId =
listener.getNamespaceMetricId(
- namespace, STATUS_GROUP_NAME, status.name(), COUNTER_NAME);
+ FlinkDeployment.class,
+ namespace,
+ STATUS_GROUP_NAME,
+ status.name(),
+ COUNTER_NAME);
assertEquals(0, listener.getGauge(statusId).get().getValue());
}
}
@@ -98,18 +109,26 @@ public class FlinkDeploymentMetricsTest {
var deployment2 = TestUtils.buildApplicationCluster("deployment", namespace2);
var counterId1 =
- listener.getNamespaceMetricId(namespace1, DEPLOYMENT_GROUP_NAME, COUNTER_NAME);
+ listener.getNamespaceMetricId(FlinkDeployment.class, namespace1, COUNTER_NAME);
var counterId2 =
- listener.getNamespaceMetricId(namespace2, DEPLOYMENT_GROUP_NAME, COUNTER_NAME);
+ listener.getNamespaceMetricId(FlinkDeployment.class, namespace2, COUNTER_NAME);
assertTrue(listener.getGauge(counterId1).isEmpty());
assertTrue(listener.getGauge(counterId2).isEmpty());
for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) {
var statusId1 =
listener.getNamespaceMetricId(
- namespace1, STATUS_GROUP_NAME, status.name(), COUNTER_NAME);
+ FlinkDeployment.class,
+ namespace1,
+ STATUS_GROUP_NAME,
+ status.name(),
+ COUNTER_NAME);
var statusId2 =
listener.getNamespaceMetricId(
- namespace2, STATUS_GROUP_NAME, status.name(), COUNTER_NAME);
+ FlinkDeployment.class,
+ namespace2,
+ STATUS_GROUP_NAME,
+ status.name(),
+ COUNTER_NAME);
assertTrue(listener.getGauge(statusId1).isEmpty());
assertTrue(listener.getGauge(statusId2).isEmpty());
}
@@ -125,10 +144,18 @@ public class FlinkDeploymentMetricsTest {
metricManager.onUpdate(deployment2);
var statusId1 =
listener.getNamespaceMetricId(
- namespace1, STATUS_GROUP_NAME, status.name(), COUNTER_NAME);
+ FlinkDeployment.class,
+ namespace1,
+ STATUS_GROUP_NAME,
+ status.name(),
+ COUNTER_NAME);
var statusId2 =
listener.getNamespaceMetricId(
- namespace2, STATUS_GROUP_NAME, status.name(), COUNTER_NAME);
+ FlinkDeployment.class,
+ namespace2,
+ STATUS_GROUP_NAME,
+ status.name(),
+ COUNTER_NAME);
assertEquals(1, listener.getGauge(statusId1).get().getValue());
assertEquals(1, listener.getGauge(statusId2).get().getValue());
}
@@ -143,10 +170,18 @@ public class FlinkDeploymentMetricsTest {
deployment2.getStatus().setJobManagerDeploymentStatus(status);
var statusId1 =
listener.getNamespaceMetricId(
- namespace1, STATUS_GROUP_NAME, status.name(), COUNTER_NAME);
+ FlinkDeployment.class,
+ namespace1,
+ STATUS_GROUP_NAME,
+ status.name(),
+ COUNTER_NAME);
var statusId2 =
listener.getNamespaceMetricId(
- namespace2, STATUS_GROUP_NAME, status.name(), COUNTER_NAME);
+ FlinkDeployment.class,
+ namespace2,
+ STATUS_GROUP_NAME,
+ status.name(),
+ COUNTER_NAME);
assertEquals(0, listener.getGauge(statusId1).get().getValue());
assertEquals(0, listener.getGauge(statusId2).get().getValue());
}
@@ -166,13 +201,17 @@ public class FlinkDeploymentMetricsTest {
var deployment = TestUtils.buildApplicationCluster("deployment", namespace);
var counterId =
- listener.getNamespaceMetricId(namespace, DEPLOYMENT_GROUP_NAME, COUNTER_NAME);
+ listener.getNamespaceMetricId(FlinkDeployment.class, namespace, COUNTER_NAME);
metricManager.onUpdate(deployment);
assertTrue(listener.getGauge(counterId).isEmpty());
for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) {
var statusId =
listener.getNamespaceMetricId(
- namespace, STATUS_GROUP_NAME, status.name(), COUNTER_NAME);
+ FlinkDeployment.class,
+ namespace,
+ STATUS_GROUP_NAME,
+ status.name(),
+ COUNTER_NAME);
assertTrue(listener.getGauge(statusId).isEmpty());
}
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetricsTest.java
index 284d6f42..4c813c11 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetricsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkSessionJobMetricsTest.java
@@ -26,7 +26,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.apache.flink.kubernetes.operator.metrics.FlinkSessionJobMetrics.COUNTER_NAME;
-import static org.apache.flink.kubernetes.operator.metrics.FlinkSessionJobMetrics.METRIC_GROUP_NAME;
import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -51,7 +50,8 @@ public class FlinkSessionJobMetricsTest {
var namespace = "test-ns";
var job1 = TestUtils.buildSessionJob("job1", namespace);
var job2 = TestUtils.buildSessionJob("job2", namespace);
- var metricId = listener.getNamespaceMetricId(namespace, METRIC_GROUP_NAME, COUNTER_NAME);
+ var metricId =
+ listener.getNamespaceMetricId(FlinkSessionJob.class, namespace, COUNTER_NAME);
assertTrue(listener.getGauge(namespace).isEmpty());
metricManager.onUpdate(job1);
@@ -78,8 +78,10 @@ public class FlinkSessionJobMetricsTest {
var job1 = TestUtils.buildSessionJob("job", namespace1);
var job2 = TestUtils.buildSessionJob("job", namespace2);
- var metricId1 = listener.getNamespaceMetricId(namespace1, METRIC_GROUP_NAME, COUNTER_NAME);
- var metricId2 = listener.getNamespaceMetricId(namespace2, METRIC_GROUP_NAME, COUNTER_NAME);
+ var metricId1 =
+ listener.getNamespaceMetricId(FlinkSessionJob.class, namespace1, COUNTER_NAME);
+ var metricId2 =
+ listener.getNamespaceMetricId(FlinkSessionJob.class, namespace2, COUNTER_NAME);
assertTrue(listener.getGauge(metricId1).isEmpty());
assertTrue(listener.getGauge(metricId2).isEmpty());
@@ -118,8 +120,8 @@ public class FlinkSessionJobMetricsTest {
var metricId =
listener.getNamespaceMetricId(
+ FlinkSessionJob.class,
flinkSessionJob.getMetadata().getNamespace(),
- METRIC_GROUP_NAME,
COUNTER_NAME);
assertTrue(listener.getGauge(metricId).isEmpty());
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroupTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroupTest.java
index dc2f28d5..35066040 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroupTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroupTest.java
@@ -18,13 +18,14 @@
package org.apache.flink.kubernetes.operator.metrics;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
-
import org.junit.jupiter.api.Test;
+import java.util.Map;
+
import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.SCOPE_NAMING_KUBERNETES_OPERATOR;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -53,7 +54,7 @@ public class KubernetesOperatorMetricGroupTest {
group.getMetricIdentifier("test"));
assertEquals(
- ImmutableMap.of(
+ Map.of(
"<host>",
"localhost",
"<namespace>",
@@ -86,7 +87,7 @@ public class KubernetesOperatorMetricGroupTest {
group.getMetricIdentifier("test"));
assertEquals(
- ImmutableMap.of(
+ Map.of(
"<host>",
"localhost",
"<namespace>",
@@ -110,11 +111,13 @@ public class KubernetesOperatorMetricGroupTest {
"flink-kubernetes-operator",
"localhost");
- var namespaceGroup = operatorMetricGroup.createResourceNamespaceGroup(configuration, "rns");
+ var namespaceGroup =
+ operatorMetricGroup.createResourceNamespaceGroup(
+ configuration, FlinkSessionJob.class, "rns");
var resourceGroup = namespaceGroup.createResourceNamespaceGroup(configuration, "rn");
assertEquals(
- ImmutableMap.of(
+ Map.of(
"<host>",
"localhost",
"<namespace>",
@@ -122,10 +125,13 @@ public class KubernetesOperatorMetricGroupTest {
"<name>",
"flink-kubernetes-operator",
"<resourcens>",
- "rns"),
+ "rns",
+ "<resourcetype>",
+ "FlinkSessionJob"),
namespaceGroup.getAllVariables());
+
assertEquals(
- ImmutableMap.of(
+ Map.of(
"<host>",
"localhost",
"<namespace>",
@@ -135,7 +141,9 @@ public class KubernetesOperatorMetricGroupTest {
"<resourcens>",
"rns",
"<resourcename>",
- "rn"),
+ "rn",
+ "<resourcetype>",
+ "FlinkSessionJob"),
resourceGroup.getAllVariables());
registry.shutdown().get();
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java
index e38cffee..49cd3d44 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java
@@ -18,16 +18,11 @@
package org.apache.flink.kubernetes.operator.metrics;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
import io.javaoperatorsdk.operator.api.reconciler.Constants;
@@ -39,7 +34,6 @@ import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEv
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -50,40 +44,31 @@ public class OperatorJosdkMetricsTest {
private static final ResourceID resourceId = new ResourceID("testname", "testns");
private static final String controllerName = FlinkDeploymentController.class.getSimpleName();
- private static final String resourcePrefix =
- "testhost.k8soperator.flink-operator-test.testopname.resource.testns.testname.JOSDK.";
- private static final String systemPrefix =
- "testhost.k8soperator.flink-operator-test.testopname.system.";
- private static final String executionPrefix = systemPrefix + "JOSDK.FlinkDeployment.";
+ private static final Map<String, Object> metadata =
+ Map.of(Constants.RESOURCE_GVK_KEY, GroupVersionKind.gvkFor(FlinkDeployment.class));
- private Map<String, Metric> metrics = new HashMap<>();
private OperatorJosdkMetrics operatorMetrics;
+ private TestingMetricListener listener;
@BeforeEach
public void setup() {
- TestingMetricRegistry registry =
- TestingMetricRegistry.builder()
- .setDelimiter(".".charAt(0))
- .setRegisterConsumer(
- (metric, name, group) ->
- metrics.put(group.getMetricIdentifier(name), metric))
- .build();
+ listener = new TestingMetricListener(new Configuration());
operatorMetrics =
new OperatorJosdkMetrics(
- TestUtils.createTestMetricGroup(registry, new Configuration()),
- new FlinkConfigManager(new Configuration()));
+ listener.getMetricGroup(), new FlinkConfigManager(new Configuration()));
}
@Test
public void testTimeControllerExecution() throws Exception {
var successExecution = new TestingExecutionBase<>();
operatorMetrics.timeControllerExecution(successExecution);
- assertEquals(1, metrics.size());
+
+ assertEquals(1, listener.size());
assertEquals(1, getHistogram("reconcile", "resource").getCount());
assertEquals(1, getHistogram("reconcile", "resource").getStatistics().getMin());
operatorMetrics.timeControllerExecution(successExecution);
operatorMetrics.timeControllerExecution(successExecution);
- assertEquals(1, metrics.size());
+ assertEquals(1, listener.size());
assertEquals(3, getHistogram("reconcile", "resource").getCount());
assertEquals(1, getHistogram("reconcile", "resource").getStatistics().getMin());
@@ -104,7 +89,7 @@ public class OperatorJosdkMetricsTest {
operatorMetrics.timeControllerExecution(failureExecution);
fail();
} catch (Exception e) {
- assertEquals(2, metrics.size());
+ assertEquals(2, listener.size());
assertEquals(1, getHistogram("cleanup", "failed").getCount());
assertEquals(1, getHistogram("cleanup", "failed").getStatistics().getMin());
}
@@ -117,7 +102,7 @@ public class OperatorJosdkMetricsTest {
operatorMetrics.timeControllerExecution(failureExecution);
fail();
} catch (Exception e) {
- assertEquals(2, metrics.size());
+ assertEquals(2, listener.size());
assertEquals(3, getHistogram("cleanup", "failed").getCount());
assertEquals(1, getHistogram("cleanup", "failed").getStatistics().getMin());
}
@@ -125,16 +110,16 @@ public class OperatorJosdkMetricsTest {
@Test
public void testMetrics() {
- operatorMetrics.failedReconciliation(resourceId, null);
- assertEquals(1, metrics.size());
+ operatorMetrics.failedReconciliation(resourceId, null, metadata);
+ assertEquals(1, listener.size());
assertEquals(1, getCount("Reconciliation.failed"));
- operatorMetrics.failedReconciliation(resourceId, null);
- operatorMetrics.failedReconciliation(resourceId, null);
- assertEquals(1, metrics.size());
+ operatorMetrics.failedReconciliation(resourceId, null, metadata);
+ operatorMetrics.failedReconciliation(resourceId, null, metadata);
+ assertEquals(1, listener.size());
assertEquals(3, getCount("Reconciliation.failed"));
- operatorMetrics.reconcileCustomResource(resourceId, null);
- assertEquals(2, metrics.size());
+ operatorMetrics.reconcileCustomResource(resourceId, null, metadata);
+ assertEquals(2, listener.size());
assertEquals(1, getCount("Reconciliation"));
operatorMetrics.reconcileCustomResource(
@@ -149,45 +134,70 @@ public class OperatorJosdkMetricsTest {
public boolean isLastAttempt() {
return false;
}
- });
- assertEquals(3, metrics.size());
+ },
+ metadata);
+ assertEquals(3, listener.size());
assertEquals(2, getCount("Reconciliation"));
assertEquals(1, getCount("Reconciliation.retries"));
- operatorMetrics.receivedEvent(new ResourceEvent(ResourceAction.ADDED, resourceId, null));
- assertEquals(5, metrics.size());
+ operatorMetrics.receivedEvent(
+ new ResourceEvent(ResourceAction.ADDED, resourceId, null), metadata);
+ assertEquals(5, listener.size());
assertEquals(1, getCount("Resource.Event"));
assertEquals(1, getCount("Resource.Event.ADDED"));
- operatorMetrics.cleanupDoneFor(resourceId);
- assertEquals(6, metrics.size());
+ operatorMetrics.cleanupDoneFor(resourceId, metadata);
+ assertEquals(6, listener.size());
assertEquals(1, getCount("Reconciliation.cleanup"));
- operatorMetrics.finishedReconciliation(resourceId);
- assertEquals(7, metrics.size());
+ operatorMetrics.finishedReconciliation(resourceId, metadata);
+ assertEquals(7, listener.size());
assertEquals(1, getCount("Reconciliation.finished"));
operatorMetrics.monitorSizeOf(Map.of("a", "b", "c", "d"), "mymap");
- assertEquals(8, metrics.size());
- assertEquals(2, ((Gauge<Integer>) metrics.get(systemPrefix + "mymap.size")).getValue());
+ assertEquals(8, listener.size());
+ assertEquals(
+ 2,
+ listener.getGauge(listener.getMetricId("JOSDK", "mymap", "size")).get().getValue());
- operatorMetrics.reconcileCustomResource(new ResourceID("other", "otherns"), null);
- assertEquals(9, metrics.size());
+ operatorMetrics.reconcileCustomResource(new ResourceID("other", "otherns"), null, metadata);
+ assertEquals(9, listener.size());
assertEquals(
1,
- ((Counter)
- metrics.get(
- "testhost.k8soperator.flink-operator-test.testopname.resource.otherns.other.JOSDK.Reconciliation.Count"))
+ listener.getCounter(
+ listener.getResourceMetricId(
+ FlinkDeployment.class,
+ "otherns",
+ "other",
+ "JOSDK",
+ "Reconciliation",
+ "Count"))
+ .get()
.getCount());
}
private Histogram getHistogram(String... names) {
- return ((Histogram)
- metrics.get(executionPrefix + String.join(".", names) + ".TimeSeconds"));
+ return listener.getHistogram(
+ listener.getNamespaceMetricId(
+ FlinkDeployment.class,
+ "testns",
+ "JOSDK",
+ String.join(".", names),
+ "TimeSeconds"))
+ .get();
}
private long getCount(String name) {
- return ((Counter) metrics.get(resourcePrefix + name + ".Count")).getCount();
+ return listener.getCounter(
+ listener.getResourceMetricId(
+ FlinkDeployment.class,
+ "testns",
+ "testname",
+ "JOSDK",
+ name,
+ "Count"))
+ .get()
+ .getCount();
}
private static class TestingExecutionBase<T> implements Metrics.ControllerExecution<T> {
@@ -208,8 +218,7 @@ public class OperatorJosdkMetricsTest {
@Override
public Map<String, Object> metadata() {
- return Map.of(
- Constants.RESOURCE_GVK_KEY, GroupVersionKind.gvkFor(FlinkDeployment.class));
+ return metadata;
}
@Override
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/TestingMetricListener.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/TestingMetricListener.java
index 294822d2..3ce5f21b 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/TestingMetricListener.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/TestingMetricListener.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.metrics;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
@@ -83,9 +84,27 @@ public class TestingMetricListener {
return metricGroup.getMetricIdentifier(String.join(DELIMITER, identifiers));
}
- public String getNamespaceMetricId(String resourceNs, String... identifiers) {
+ public String getNamespaceMetricId(
+ Class<? extends AbstractFlinkResource<?, ?>> resourceClass,
+ String resourceNs,
+ String... identifiers) {
return metricGroup
- .createResourceNamespaceGroup(configuration, resourceNs)
+ .createResourceNamespaceGroup(configuration, resourceClass, resourceNs)
.getMetricIdentifier(String.join(DELIMITER, identifiers));
}
+
+ public String getResourceMetricId(
+ Class<? extends AbstractFlinkResource<?, ?>> resourceClass,
+ String resourceNs,
+ String resourceName,
+ String... identifiers) {
+ return metricGroup
+ .createResourceNamespaceGroup(configuration, resourceClass, resourceNs)
+ .createResourceNamespaceGroup(configuration, resourceName)
+ .getMetricIdentifier(String.join(DELIMITER, identifiers));
+ }
+
+ public int size() {
+ return metrics.size();
+ }
}