You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/07/18 12:09:26 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-27914] Fix SessionJobController name + avoid creating new metric groups all the time
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new f2e559a9 [FLINK-27914] Fix SessionJobController name + avoid creating new metric groups all the time
f2e559a9 is described below
commit f2e559a925881b78e89d5589f67fa622b751a1c4
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Mon Jul 18 14:09:22 2022 +0200
[FLINK-27914] Fix SessionJobController name + avoid creating new metric groups all the time
---
.../kubernetes_operator_metric_configuration.html | 6 ++
.../metrics/KubernetesOperatorMetricOptions.java | 6 ++
.../operator/metrics/OperatorJosdkMetrics.java | 91 ++++++++++------------
.../operator/metrics/OperatorMetricUtils.java | 13 ++--
.../operator/metrics/OperatorJosdkMetricsTest.java | 13 +++-
5 files changed, 74 insertions(+), 55 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
index 9e934ea4..d5d23294 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
@@ -14,6 +14,12 @@
<td>Boolean</td>
<td>Enable forwarding of Java Operator SDK metrics to the Flink metric registry.</td>
</tr>
+ <tr>
+ <td><h5>kubernetes.operator.jvm.metrics.enabled</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Enable Kubernetes Operator JVM metrics.</td>
+ </tr>
<tr>
<td><h5>kubernetes.operator.kubernetes.client.metrics.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
index c8628b3e..faba69d0 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
@@ -23,6 +23,12 @@ import org.apache.flink.configuration.ConfigOptions;
/** Configuration options for metrics. */
public class KubernetesOperatorMetricOptions {
+ public static final ConfigOption<Boolean> OPERATOR_JVM_METRICS_ENABLED =
+ ConfigOptions.key("kubernetes.operator.jvm.metrics.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Enable Kubernetes Operator JVM metrics.");
+
public static final ConfigOption<Boolean> OPERATOR_JOSDK_METRICS_ENABLED =
ConfigOptions.key("kubernetes.operator.josdk.metrics.enabled")
.booleanType()
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
index 15a14ca7..6b880e8c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
@@ -17,10 +17,9 @@
package org.apache.flink.kubernetes.operator.metrics;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
-import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MetricGroup;
@@ -34,8 +33,8 @@ import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
+import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -60,14 +59,14 @@ public class OperatorJosdkMetrics implements Metrics {
private final Map<ResourceID, KubernetesResourceMetricGroup> resourceMetricGroups =
new ConcurrentHashMap<>();
- private final Map<String, Histogram> histograms = new ConcurrentHashMap<>();
- private final Map<String, Counter> counters = new ConcurrentHashMap<>();
+ private final Map<List<String>, Histogram> histograms = new ConcurrentHashMap<>();
+ private final Map<List<String>, Counter> counters = new ConcurrentHashMap<>();
private static final Map<String, String> CONTROLLERS =
Map.of(
FlinkDeploymentController.class.getSimpleName().toLowerCase(),
"FlinkDeployment",
- FlinkSessionJob.class.getSimpleName().toLowerCase(),
+ FlinkSessionJobController.class.getSimpleName().toLowerCase(),
"FlinkSessionJob");
public OperatorJosdkMetrics(
@@ -95,15 +94,9 @@ public class OperatorJosdkMetrics implements Metrics {
public void receivedEvent(Event event) {
if (event instanceof ResourceEvent) {
var action = ((ResourceEvent) event).getAction();
+ counter(getResourceMg(event.getRelatedCustomResourceID()), RESOURCE, EVENT).inc();
counter(
getResourceMg(event.getRelatedCustomResourceID()),
- Collections.emptyList(),
- RESOURCE,
- EVENT)
- .inc();
- counter(
- getResourceMg(event.getRelatedCustomResourceID()),
- Collections.emptyList(),
RESOURCE,
EVENT,
action.name())
@@ -113,29 +106,26 @@ public class OperatorJosdkMetrics implements Metrics {
@Override
public void cleanupDoneFor(ResourceID resourceID) {
- counter(getResourceMg(resourceID), Collections.emptyList(), RECONCILIATION, "cleanup")
- .inc();
+ counter(getResourceMg(resourceID), RECONCILIATION, "cleanup").inc();
}
@Override
public void reconcileCustomResource(ResourceID resourceID, RetryInfo retryInfoNullable) {
- counter(getResourceMg(resourceID), Collections.emptyList(), RECONCILIATION).inc();
+ counter(getResourceMg(resourceID), RECONCILIATION).inc();
if (retryInfoNullable != null) {
- counter(getResourceMg(resourceID), Collections.emptyList(), RECONCILIATION, "retries")
- .inc();
+ counter(getResourceMg(resourceID), RECONCILIATION, "retries").inc();
}
}
@Override
public void finishedReconciliation(ResourceID resourceID) {
- counter(getResourceMg(resourceID), Collections.emptyList(), RECONCILIATION, "finished")
- .inc();
+ counter(getResourceMg(resourceID), RECONCILIATION, "finished").inc();
}
@Override
public void failedReconciliation(ResourceID resourceID, Exception exception) {
- counter(getResourceMg(resourceID), Collections.emptyList(), RECONCILIATION, "failed").inc();
+ counter(getResourceMg(resourceID), RECONCILIATION, "failed").inc();
}
@Override
@@ -145,41 +135,46 @@ public class OperatorJosdkMetrics implements Metrics {
}
private Histogram histogram(ControllerExecution<?> execution, String name) {
- MetricGroup group = operatorMetricGroup.addGroup(OPERATOR_SDK_GROUP);
- for (String metricGroup :
- Arrays.asList(
- CONTROLLERS.get(execution.controllerName().toLowerCase()),
- execution.name(),
- name)) {
- group = group.addGroup(metricGroup);
- }
- var finalGroup = group;
+ var groups = getHistoGroups(execution, name);
return histograms.computeIfAbsent(
- String.join(".", group.getScopeComponents()),
- s ->
- finalGroup.histogram(
- "TimeSeconds",
- OperatorMetricUtils.createHistogram(
- configManager.getOperatorConfiguration())));
+ groups,
+ k -> {
+ var group = operatorMetricGroup.addGroup(OPERATOR_SDK_GROUP);
+ for (String mg : groups) {
+ group = group.addGroup(mg);
+ }
+ var finalGroup = group;
+ return finalGroup.histogram(
+ "TimeSeconds",
+ OperatorMetricUtils.createHistogram(
+ configManager.getOperatorConfiguration()));
+ });
+ }
+
+ private List<String> getHistoGroups(ControllerExecution<?> execution, String name) {
+ return List.of(
+ CONTROLLERS.get(execution.controllerName().toLowerCase()), execution.name(), name);
}
private long toSeconds(long startTime) {
return TimeUnit.NANOSECONDS.toSeconds(clock.relativeTimeNanos() - startTime);
}
- private Counter counter(
- MetricGroup parent, List<Tuple2<String, String>> additionalTags, String... names) {
- MetricGroup group = parent.addGroup(OPERATOR_SDK_GROUP);
- for (String name : names) {
- group = group.addGroup(name);
- }
- for (Tuple2<String, String> tag : additionalTags) {
- group = group.addGroup(tag.f0, tag.f1);
- }
- var finalGroup = group;
+ private Counter counter(MetricGroup parent, String... names) {
+ var key = new ArrayList<String>(parent.getScopeComponents().length + names.length);
+ Arrays.stream(parent.getScopeComponents()).forEach(key::add);
+ Arrays.stream(names).forEach(key::add);
+
return counters.computeIfAbsent(
- String.join(".", group.getScopeComponents()),
- s -> OperatorMetricUtils.synchronizedCounter(finalGroup.counter("Count")));
+ key,
+ s -> {
+ MetricGroup group = parent.addGroup(OPERATOR_SDK_GROUP);
+ for (String name : names) {
+ group = group.addGroup(name);
+ }
+ var finalGroup = group;
+ return OperatorMetricUtils.synchronizedCounter(finalGroup.counter("Count"));
+ });
}
private KubernetesResourceNamespaceMetricGroup getResourceNsMg(ResourceID resourceID) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
index 0ff3f2bf..6b989907 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
@@ -64,8 +64,12 @@ public class OperatorMetricUtils {
EnvUtils.getOrDefault(
EnvUtils.ENV_OPERATOR_NAME, "flink-kubernetes-operator"),
EnvUtils.getOrDefault(EnvUtils.ENV_HOSTNAME, "localhost"));
- MetricGroup statusGroup = operatorMetricGroup.addGroup("Status");
- MetricUtils.instantiateStatusMetrics(statusGroup);
+
+ if (defaultConfig.getBoolean(
+ KubernetesOperatorMetricOptions.OPERATOR_JVM_METRICS_ENABLED)) {
+ MetricGroup statusGroup = operatorMetricGroup.addGroup("Status");
+ MetricUtils.instantiateStatusMetrics(statusGroup);
+ }
return operatorMetricGroup;
}
@@ -107,9 +111,8 @@ public class OperatorMetricUtils {
}
public static Histogram createHistogram(FlinkOperatorConfiguration operatorConfiguration) {
- return synchronizedHistogram(
- new DescriptiveStatisticsHistogram(
- operatorConfiguration.getMetricsHistogramSampleSize()));
+ return new DescriptiveStatisticsHistogram(
+ operatorConfiguration.getMetricsHistogramSampleSize());
}
/** Thread safe {@link Histogram} wrapper. */
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java
index fc49c401..5b0186db 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java
@@ -45,10 +45,10 @@ import static org.junit.jupiter.api.Assertions.fail;
/** {@link OperatorJosdkMetrics} tests. */
public class OperatorJosdkMetricsTest {
- private final ResourceID resourceId = new ResourceID("testns", "testname");
+ private final ResourceID resourceId = new ResourceID("testname", "testns");
private final String controllerName = FlinkDeploymentController.class.getSimpleName();
private final String resourcePrefix =
- "testhost.k8soperator.flink-operator-test.testopname.resource.testname.testns.JOSDK.";
+ "testhost.k8soperator.flink-operator-test.testopname.resource.testns.testname.JOSDK.";
private final String systemPrefix =
"testhost.k8soperator.flink-operator-test.testopname.system.";
private final String executionPrefix = systemPrefix + "JOSDK.FlinkDeployment.";
@@ -200,6 +200,15 @@ public class OperatorJosdkMetricsTest {
operatorMetrics.monitorSizeOf(Map.of("a", "b", "c", "d"), "mymap");
assertEquals(8, metrics.size());
assertEquals(2, ((Gauge<Integer>) metrics.get(systemPrefix + "mymap.size")).getValue());
+
+ operatorMetrics.reconcileCustomResource(new ResourceID("other", "otherns"), null);
+ assertEquals(9, metrics.size());
+ assertEquals(
+ 1,
+ ((Counter)
+ metrics.get(
+ "testhost.k8soperator.flink-operator-test.testopname.resource.otherns.other.JOSDK.Reconciliation.Count"))
+ .getCount());
}
private Histogram getHistogram(String... names) {