You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/07/08 12:40:47 UTC
[flink-kubernetes-operator] 01/03: [FLINK-28297] Improve operator metric groups
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 1b3acc23868f9ad09ea73e7556a7b31176496783
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Jul 7 10:20:26 2022 +0200
[FLINK-28297] Improve operator metric groups
---
.../flink/kubernetes/operator/FlinkOperator.java | 12 ++--
.../metrics/KubernetesOperatorMetricGroup.java | 24 ++++---
.../metrics/KubernetesOperatorMetricOptions.java | 18 +++++-
...oup.java => KubernetesResourceMetricGroup.java} | 47 ++++----------
...=> KubernetesResourceNamespaceMetricGroup.java} | 55 +++++++---------
.../KubernetesResourceNamespaceScopeFormat.java | 46 ++++++++++++++
.../metrics/KubernetesResourceScopeFormat.java | 52 +++++++++++++++
.../kubernetes/operator/metrics/MetricManager.java | 19 +++---
.../operator/metrics/OperatorMetricUtils.java | 2 +-
.../flink/kubernetes/operator/TestUtils.java | 28 +++++++++
.../kubernetes/operator/TestingStatusRecorder.java | 5 +-
.../TestingFlinkDeploymentController.java | 5 +-
.../listener/FlinkResourceListenerTest.java | 5 +-
.../metrics/KubernetesOperatorMetricGroupTest.java | 48 +++++++++++++-
.../operator/utils/StatusRecorderTest.java | 73 ++++++++++++++--------
15 files changed, 309 insertions(+), 130 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 590af2c..419702d 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -28,6 +28,7 @@ import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
import org.apache.flink.kubernetes.operator.listener.ListenerUtils;
+import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
@@ -40,7 +41,6 @@ import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
-import org.apache.flink.metrics.MetricGroup;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -70,7 +70,7 @@ public class FlinkOperator {
private final FlinkConfigManager configManager;
private final Set<FlinkResourceValidator> validators;
private final Set<RegisteredController> registeredControllers = new HashSet<>();
- private final MetricGroup metricGroup;
+ private final KubernetesOperatorMetricGroup metricGroup;
private final Collection<FlinkResourceListener> listeners;
public FlinkOperator(@Nullable Configuration conf) {
@@ -114,7 +114,9 @@ public class FlinkOperator {
private void registerDeploymentController() {
var statusRecorder =
StatusRecorder.<FlinkDeploymentStatus>create(
- client, new MetricManager<>(metricGroup), listeners);
+ client,
+ new MetricManager<>(metricGroup, configManager.getDefaultConfig()),
+ listeners);
var eventRecorder = EventRecorder.create(client, listeners);
var reconcilerFactory =
new ReconcilerFactory(
@@ -137,7 +139,9 @@ public class FlinkOperator {
var eventRecorder = EventRecorder.create(client, listeners);
var statusRecorder =
StatusRecorder.<FlinkSessionJobStatus>create(
- client, new MetricManager<>(metricGroup), listeners);
+ client,
+ new MetricManager<>(metricGroup, configManager.getDefaultConfig()),
+ listeners);
var reconciler =
new SessionJobReconciler(
client, flinkService, configManager, eventRecorder, statusRecorder);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
index 3b1c088..a410d6f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
@@ -26,14 +26,12 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import java.util.Map;
-/** Flink based operator metric group. */
-public class KubernetesOperatorMetricGroup
- extends AbstractMetricGroup<KubernetesOperatorMetricGroup> {
+/** Base metric group for Flink Operator System metrics. */
+public class KubernetesOperatorMetricGroup extends AbstractMetricGroup<AbstractMetricGroup<?>> {
- private static final String GROUP_NAME = "k8soperator";
- private final String namespace;
- private final String name;
- private final String hostname;
+ protected final String namespace;
+ protected final String name;
+ protected final String hostname;
private KubernetesOperatorMetricGroup(
MetricRegistry registry,
@@ -47,6 +45,16 @@ public class KubernetesOperatorMetricGroup
this.hostname = hostname;
}
+ public KubernetesResourceNamespaceMetricGroup createResourceNamespaceGroup(
+ Configuration config, String resourceNs) {
+ return new KubernetesResourceNamespaceMetricGroup(
+ registry,
+ this,
+ KubernetesResourceNamespaceScopeFormat.fromConfig(config)
+ .formatScope(namespace, name, hostname, resourceNs),
+ resourceNs);
+ }
+
public static KubernetesOperatorMetricGroup create(
MetricRegistry metricRegistry,
Configuration configuration,
@@ -71,7 +79,7 @@ public class KubernetesOperatorMetricGroup
@Override
protected final String getGroupName(CharacterFilter filter) {
- return GROUP_NAME;
+ return "k8soperator";
}
@Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
index 42e1d40..30b02f9 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
@@ -23,8 +23,22 @@ import org.apache.flink.configuration.ConfigOptions;
/** Configuration options for metrics. */
public class KubernetesOperatorMetricOptions {
public static final ConfigOption<String> SCOPE_NAMING_KUBERNETES_OPERATOR =
- ConfigOptions.key("metrics.scope.k8soperator")
- .defaultValue("<host>.k8soperator.<namespace>.<name>")
+ ConfigOptions.key("metrics.scope.k8soperator.system")
+ .defaultValue("<host>.k8soperator.<namespace>.<name>.system")
+ .withDeprecatedKeys("metrics.scope.k8soperator")
.withDescription(
"Defines the scope format string that is applied to all metrics scoped to the kubernetes operator.");
+
+ public static final ConfigOption<String> SCOPE_NAMING_KUBERNETES_OPERATOR_RESOURCENS =
+ ConfigOptions.key("metrics.scope.k8soperator.resourcens")
+ .defaultValue("<host>.k8soperator.<namespace>.<name>.namespace.<resourcens>.")
+ .withDescription(
+ "Defines the scope format string that is applied to all metrics scoped to the kubernetes operator resource namespace.");
+
+ public static final ConfigOption<String> SCOPE_NAMING_KUBERNETES_OPERATOR_RESOURCE =
+ ConfigOptions.key("metrics.scope.k8soperator.resource")
+ .defaultValue(
+ "<host>.k8soperator.<namespace>.<name>.resource.<resourcens>.<resourcename>")
+ .withDescription(
+ "Defines the scope format string that is applied to all metrics scoped to the kubernetes operator resource.");
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceMetricGroup.java
similarity index 51%
copy from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
copy to flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceMetricGroup.java
index 3b1c088..b76a594 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceMetricGroup.java
@@ -17,61 +17,36 @@
package org.apache.flink.kubernetes.operator.metrics;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
-import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import java.util.Map;
-/** Flink based operator metric group. */
-public class KubernetesOperatorMetricGroup
- extends AbstractMetricGroup<KubernetesOperatorMetricGroup> {
+/** Base metric group for Flink Operator Resource level metrics. */
+public class KubernetesResourceMetricGroup
+ extends AbstractMetricGroup<KubernetesResourceNamespaceMetricGroup> {
- private static final String GROUP_NAME = "k8soperator";
- private final String namespace;
- private final String name;
- private final String hostname;
+ private final String resourceName;
- private KubernetesOperatorMetricGroup(
+ protected KubernetesResourceMetricGroup(
MetricRegistry registry,
+ KubernetesResourceNamespaceMetricGroup parent,
String[] scope,
- String namespace,
- String name,
- String hostname) {
- super(registry, scope, null);
- this.namespace = namespace;
- this.name = name;
- this.hostname = hostname;
- }
-
- public static KubernetesOperatorMetricGroup create(
- MetricRegistry metricRegistry,
- Configuration configuration,
- String namespace,
- String name,
- String hostname) {
- return new KubernetesOperatorMetricGroup(
- metricRegistry,
- KubernetesOperatorScopeFormat.fromConfig(configuration)
- .formatScope(namespace, name, hostname),
- namespace,
- name,
- hostname);
+ String resourceName) {
+ super(registry, scope, parent);
+ this.resourceName = resourceName;
}
@Override
protected final void putVariables(Map<String, String> variables) {
- variables.put(KubernetesOperatorScopeFormat.NAMESPACE, namespace);
- variables.put(KubernetesOperatorScopeFormat.NAME, name);
- variables.put(ScopeFormat.SCOPE_HOST, hostname);
+ variables.put(KubernetesResourceScopeFormat.RESOURCE, resourceName);
}
@Override
protected final String getGroupName(CharacterFilter filter) {
- return GROUP_NAME;
+ return "resource";
}
@Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceMetricGroup.java
similarity index 56%
copy from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
copy to flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceMetricGroup.java
index 3b1c088..bd6f8e2 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceMetricGroup.java
@@ -22,56 +22,47 @@ import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
-import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import java.util.Map;
-/** Flink based operator metric group. */
-public class KubernetesOperatorMetricGroup
+/** Base metric group for Flink Operator Resource namespace level metrics. */
+public class KubernetesResourceNamespaceMetricGroup
extends AbstractMetricGroup<KubernetesOperatorMetricGroup> {
- private static final String GROUP_NAME = "k8soperator";
- private final String namespace;
- private final String name;
- private final String hostname;
+ private final String resourceNs;
- private KubernetesOperatorMetricGroup(
+ protected KubernetesResourceNamespaceMetricGroup(
MetricRegistry registry,
+ KubernetesOperatorMetricGroup parent,
String[] scope,
- String namespace,
- String name,
- String hostname) {
- super(registry, scope, null);
- this.namespace = namespace;
- this.name = name;
- this.hostname = hostname;
+ String resourceNs) {
+ super(registry, scope, parent);
+ this.resourceNs = resourceNs;
}
- public static KubernetesOperatorMetricGroup create(
- MetricRegistry metricRegistry,
- Configuration configuration,
- String namespace,
- String name,
- String hostname) {
- return new KubernetesOperatorMetricGroup(
- metricRegistry,
- KubernetesOperatorScopeFormat.fromConfig(configuration)
- .formatScope(namespace, name, hostname),
- namespace,
- name,
- hostname);
+ public KubernetesResourceMetricGroup createResourceNamespaceGroup(
+ Configuration config, String resourceName) {
+ return new KubernetesResourceMetricGroup(
+ registry,
+ this,
+ KubernetesResourceScopeFormat.fromConfig(config)
+ .formatScope(
+ parent.namespace,
+ parent.name,
+ parent.hostname,
+ resourceNs,
+ resourceName),
+ resourceName);
}
@Override
protected final void putVariables(Map<String, String> variables) {
- variables.put(KubernetesOperatorScopeFormat.NAMESPACE, namespace);
- variables.put(KubernetesOperatorScopeFormat.NAME, name);
- variables.put(ScopeFormat.SCOPE_HOST, hostname);
+ variables.put(KubernetesResourceNamespaceScopeFormat.RESOURCE_NS, resourceNs);
}
@Override
protected final String getGroupName(CharacterFilter filter) {
- return GROUP_NAME;
+ return "namespace";
}
@Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceScopeFormat.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceScopeFormat.java
new file mode 100644
index 0000000..17b6ffe
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceNamespaceScopeFormat.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.SCOPE_NAMING_KUBERNETES_OPERATOR_RESOURCENS;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorScopeFormat.NAME;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorScopeFormat.NAMESPACE;
+
+/** Format for metrics. * */
+public class KubernetesResourceNamespaceScopeFormat extends ScopeFormat {
+
+ public static final String RESOURCE_NS = asVariable("resourcens");
+
+ public KubernetesResourceNamespaceScopeFormat(String format) {
+ super(format, null, new String[] {NAMESPACE, NAME, SCOPE_HOST, RESOURCE_NS});
+ }
+
+ public String[] formatScope(String namespace, String name, String hostname, String resourceNs) {
+ final String[] template = copyTemplate();
+ final String[] values = {namespace, name, hostname, resourceNs};
+ return bindVariables(template, values);
+ }
+
+ public static KubernetesResourceNamespaceScopeFormat fromConfig(Configuration config) {
+ String format = config.getString(SCOPE_NAMING_KUBERNETES_OPERATOR_RESOURCENS);
+ return new KubernetesResourceNamespaceScopeFormat(format);
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceScopeFormat.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceScopeFormat.java
new file mode 100644
index 0000000..7f05ab6
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesResourceScopeFormat.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.SCOPE_NAMING_KUBERNETES_OPERATOR_RESOURCE;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorScopeFormat.NAME;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorScopeFormat.NAMESPACE;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesResourceNamespaceScopeFormat.RESOURCE_NS;
+
+/** Format for metrics. * */
+public class KubernetesResourceScopeFormat extends ScopeFormat {
+
+ public static final String RESOURCE = asVariable("resourcename");
+
+ public KubernetesResourceScopeFormat(String format) {
+ super(format, null, new String[] {NAMESPACE, NAME, SCOPE_HOST, RESOURCE_NS, RESOURCE});
+ }
+
+ public String[] formatScope(
+ String namespace,
+ String name,
+ String hostname,
+ String resourceNs,
+ String resourceName) {
+ final String[] template = copyTemplate();
+ final String[] values = {namespace, name, hostname, resourceNs, resourceName};
+ return bindVariables(template, values);
+ }
+
+ public static KubernetesResourceScopeFormat fromConfig(Configuration config) {
+ String format = config.getString(SCOPE_NAMING_KUBERNETES_OPERATOR_RESOURCE);
+ return new KubernetesResourceScopeFormat(format);
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
index 8cd9c3d..18b96b4 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/MetricManager.java
@@ -17,9 +17,9 @@
package org.apache.flink.kubernetes.operator.metrics;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
-import org.apache.flink.metrics.MetricGroup;
import io.fabric8.kubernetes.client.CustomResource;
@@ -28,12 +28,13 @@ import java.util.concurrent.ConcurrentHashMap;
/** Metric manager for Operator managed custom resources. */
public class MetricManager<CR extends CustomResource<?, ?>> {
- public static final String NS_SCOPE_KEY = "resourcens";
- private final MetricGroup metricGroup;
+ private final KubernetesOperatorMetricGroup opMetricGroup;
+ private final Configuration conf;
private final Map<String, CustomResourceMetrics> metrics = new ConcurrentHashMap<>();
- public MetricManager(MetricGroup metricGroup) {
- this.metricGroup = metricGroup;
+ public MetricManager(KubernetesOperatorMetricGroup opMetricGroup, Configuration conf) {
+ this.opMetricGroup = opMetricGroup;
+ this.conf = conf;
}
public void onUpdate(CR cr) {
@@ -50,12 +51,12 @@ public class MetricManager<CR extends CustomResource<?, ?>> {
}
private CustomResourceMetrics getCustomResourceMetricsImpl(CR cr) {
+ var namespaceMg =
+ opMetricGroup.createResourceNamespaceGroup(conf, cr.getMetadata().getNamespace());
if (cr instanceof FlinkDeployment) {
- return new FlinkDeploymentMetrics(
- metricGroup.addGroup(NS_SCOPE_KEY, cr.getMetadata().getNamespace()));
+ return new FlinkDeploymentMetrics(namespaceMg);
} else if (cr instanceof FlinkSessionJob) {
- return new FlinkSessionJobMetrics(
- metricGroup.addGroup(NS_SCOPE_KEY, cr.getMetadata().getNamespace()));
+ return new FlinkSessionJobMetrics(namespaceMg);
} else {
throw new IllegalArgumentException("Unknown CustomResource");
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
index 0e53caf..6040d39 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
@@ -43,7 +43,7 @@ public class OperatorMetricUtils {
private static final String OPERATOR_METRICS_PREFIX = "kubernetes.operator.metrics.";
private static final String METRICS_PREFIX = "metrics.";
- public static MetricGroup initOperatorMetrics(Configuration defaultConfig) {
+ public static KubernetesOperatorMetricGroup initOperatorMetrics(Configuration defaultConfig) {
Configuration metricConfig = createMetricConfig(defaultConfig);
LOG.info("Initializing operator metrics using conf: {}", metricConfig);
PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(metricConfig);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index f8b6290..038ffd7 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -18,9 +18,11 @@
package org.apache.flink.kubernetes.operator;
import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
@@ -36,6 +38,10 @@ import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerStatus;
@@ -421,6 +427,28 @@ public class TestUtils {
}
}
+ public static <T extends AbstractFlinkResource<?, ?>> MetricManager<T> createTestMetricManager(
+ Configuration conf) {
+ return createTestMetricManager(
+ TestingMetricRegistry.builder()
+ .setDelimiter(".".charAt(0))
+ .setRegisterConsumer((metric, name, group) -> {})
+ .build(),
+ conf);
+ }
+
+ public static <T extends AbstractFlinkResource<?, ?>> MetricManager<T> createTestMetricManager(
+ MetricRegistry metricRegistry, Configuration conf) {
+
+ return new MetricManager<>(createTestMetricGroup(metricRegistry, conf), conf);
+ }
+
+ public static KubernetesOperatorMetricGroup createTestMetricGroup(
+ MetricRegistry metricRegistry, Configuration conf) {
+ return KubernetesOperatorMetricGroup.create(
+ metricRegistry, conf, TEST_NAMESPACE, "testopname", "testhost");
+ }
+
/** Testing ResponseProvider. */
public static class ValidatingResponseProvider<T> implements ResponseProvider<Object> {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
index 8306761..5e9cfcc 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
@@ -18,11 +18,10 @@
package org.apache.flink.kubernetes.operator;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
-import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
-import org.apache.flink.metrics.testutils.MetricListener;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -30,7 +29,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
public class TestingStatusRecorder<STATUS extends CommonStatus<?>> extends StatusRecorder<STATUS> {
public TestingStatusRecorder() {
- super(null, new MetricManager<>(new MetricListener().getMetricGroup()), (r, s) -> {});
+ super(null, TestUtils.createTestMetricManager(new Configuration()), (r, s) -> {});
}
@Override
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index f78b769..68a29f7 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -17,19 +17,18 @@
package org.apache.flink.kubernetes.operator.controller;
+import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
-import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
-import org.apache.flink.metrics.testutils.MetricListener;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
@@ -67,7 +66,7 @@ public class TestingFlinkDeploymentController
statusRecorder =
new StatusRecorder<>(
kubernetesClient,
- new MetricManager<>(new MetricListener().getMetricGroup()),
+ TestUtils.createTestMetricManager(configManager.getDefaultConfig()),
statusUpdateCounter);
flinkDeploymentController =
new FlinkDeploymentController(
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
index 10a25da..e68c910 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
@@ -17,14 +17,13 @@
package org.apache.flink.kubernetes.operator.listener;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
-import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
-import org.apache.flink.metrics.testutils.MetricListener;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
@@ -56,7 +55,7 @@ public class FlinkResourceListenerTest {
var statusRecorder =
StatusRecorder.<FlinkDeploymentStatus>create(
kubernetesClient,
- new MetricManager<>(new MetricListener().getMetricGroup()),
+ TestUtils.createTestMetricManager(new Configuration()),
listeners);
var eventRecorder = EventRecorder.create(kubernetesClient, listeners);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroupTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroupTest.java
index 2e1bfc3..b02f112 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroupTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroupTest.java
@@ -44,10 +44,12 @@ public class KubernetesOperatorMetricGroupTest {
"flink-kubernetes-operator",
"localhost");
assertArrayEquals(
- new String[] {"localhost", "k8soperator", "default", "flink-kubernetes-operator"},
+ new String[] {
+ "localhost", "k8soperator", "default", "flink-kubernetes-operator", "system"
+ },
group.getScopeComponents());
assertEquals(
- "localhost.k8soperator.default.flink-kubernetes-operator.test",
+ "localhost.k8soperator.default.flink-kubernetes-operator.system.test",
group.getMetricIdentifier("test"));
assertEquals(
@@ -96,6 +98,48 @@ public class KubernetesOperatorMetricGroupTest {
registry.shutdown().get();
}
+ @Test
+ public void testSubGroupVariables() throws Exception {
+ var configuration = new Configuration();
+ var registry = new MetricRegistryImpl(fromConfiguration(configuration));
+ var operatorMetricGroup =
+ KubernetesOperatorMetricGroup.create(
+ registry,
+ configuration,
+ "default",
+ "flink-kubernetes-operator",
+ "localhost");
+
+ var namespaceGroup = operatorMetricGroup.createResourceNamespaceGroup(configuration, "rns");
+ var resourceGroup = namespaceGroup.createResourceNamespaceGroup(configuration, "rn");
+
+ assertEquals(
+ ImmutableMap.of(
+ "<host>",
+ "localhost",
+ "<namespace>",
+ "default",
+ "<name>",
+ "flink-kubernetes-operator",
+ "<resourcens>",
+ "rns"),
+ namespaceGroup.getAllVariables());
+ assertEquals(
+ ImmutableMap.of(
+ "<host>",
+ "localhost",
+ "<namespace>",
+ "default",
+ "<name>",
+ "flink-kubernetes-operator",
+ "<resourcens>",
+ "rns",
+ "<resourcename>",
+ "rn"),
+ resourceGroup.getAllVariables());
+ registry.shutdown().get();
+ }
+
private static MetricRegistryConfiguration fromConfiguration(Configuration configuration) {
return MetricRegistryConfiguration.fromConfiguration(configuration, Long.MAX_VALUE);
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
index e3cf2eb..52a2f45 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
@@ -17,21 +17,25 @@
package org.apache.flink.kubernetes.operator.utils;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
-import org.apache.flink.metrics.testutils.MetricListener;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import org.junit.jupiter.api.Test;
+import java.util.HashMap;
+
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.METRIC_GROUP_NAME;
-import static org.apache.flink.kubernetes.operator.metrics.MetricManager.NS_SCOPE_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -47,7 +51,7 @@ public class StatusRecorderTest {
var helper =
new StatusRecorder<FlinkDeploymentStatus>(
kubernetesClient,
- new MetricManager<>(new MetricListener().getMetricGroup()),
+ TestUtils.createTestMetricManager(new Configuration()),
(e, s) -> {});
var deployment = TestUtils.buildApplicationCluster();
kubernetesClient.resource(deployment).createOrReplace();
@@ -70,49 +74,64 @@ public class StatusRecorderTest {
@Test
public void testFlinkDeploymentMetrics() throws InterruptedException {
- var metricListener = new MetricListener();
+ var metrics = new HashMap<String, Metric>();
+ TestingMetricRegistry registry =
+ TestingMetricRegistry.builder()
+ .setDelimiter(".".charAt(0))
+ .setRegisterConsumer(
+ (metric, name, group) -> {
+ metrics.put(group.getMetricIdentifier(name), metric);
+ })
+ .build();
+
+ var metricManager =
+ (MetricManager) TestUtils.createTestMetricManager(registry, new Configuration());
var helper =
new StatusRecorder<FlinkDeploymentStatus>(
- kubernetesClient,
- new MetricManager<>(metricListener.getMetricGroup()),
- (e, s) -> {});
+ kubernetesClient, metricManager, (e, s) -> {});
var deployment = TestUtils.buildApplicationCluster();
kubernetesClient.resource(deployment).createOrReplace();
helper.updateStatusFromCache(deployment);
- assertEquals(1, metricListener.getGauge(totalIdentifier(deployment)).get().getValue());
- assertEquals(1, metricListener.getGauge(perStatusIdentifier(deployment)).get().getValue());
+ assertEquals(1, ((Gauge) metrics.get(totalIdentifier(deployment))).getValue());
+ assertEquals(1, ((Gauge) metrics.get(perStatusIdentifier(deployment))).getValue());
for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) {
deployment.getStatus().setJobManagerDeploymentStatus(status);
helper.patchAndCacheStatus(deployment);
- assertEquals(1, metricListener.getGauge(totalIdentifier(deployment)).get().getValue());
- assertEquals(
- 1, metricListener.getGauge(perStatusIdentifier(deployment)).get().getValue());
+ assertEquals(1, ((Gauge) metrics.get(totalIdentifier(deployment))).getValue());
+ assertEquals(1, ((Gauge) metrics.get(perStatusIdentifier(deployment))).getValue());
}
helper.removeCachedStatus(deployment);
- assertEquals(0, metricListener.getGauge(totalIdentifier(deployment)).get().getValue());
+ assertEquals(0, ((Gauge) metrics.get(totalIdentifier(deployment))).getValue());
for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) {
- assertEquals(
- 0, metricListener.getGauge(perStatusIdentifier(deployment)).get().getValue());
+ assertEquals(0, ((Gauge) metrics.get(perStatusIdentifier(deployment))).getValue());
}
}
- private String[] totalIdentifier(FlinkDeployment deployment) {
- return new String[] {
- NS_SCOPE_KEY, deployment.getMetadata().getNamespace(), METRIC_GROUP_NAME, "Count"
- };
+ private String totalIdentifier(FlinkDeployment deployment) {
+ String baseScope = "testhost.k8soperator.flink-operator-test.testopname.";
+ String[] metricScope =
+ new String[] {
+ "namespace", deployment.getMetadata().getNamespace(), METRIC_GROUP_NAME, "Count"
+ };
+ return baseScope + String.join(".", metricScope);
}
- private String[] perStatusIdentifier(FlinkDeployment deployment) {
- return new String[] {
- NS_SCOPE_KEY,
- deployment.getMetadata().getNamespace(),
- METRIC_GROUP_NAME,
- deployment.getStatus().getJobManagerDeploymentStatus().name(),
- "Count"
- };
+ private String perStatusIdentifier(FlinkDeployment deployment) {
+
+ String baseScope = "testhost.k8soperator.flink-operator-test.testopname.";
+ String[] metricScope =
+ new String[] {
+ "namespace",
+ deployment.getMetadata().getNamespace(),
+ METRIC_GROUP_NAME,
+ deployment.getStatus().getJobManagerDeploymentStatus().name(),
+ "Count"
+ };
+
+ return baseScope + String.join(".", metricScope);
}
}