You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/07/18 12:09:26 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-27914] Fix SessionJobController name + avoid creating new metric groups all the time

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new f2e559a9 [FLINK-27914] Fix SessionJobController name + avoid creating new metric groups all the time
f2e559a9 is described below

commit f2e559a925881b78e89d5589f67fa622b751a1c4
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Mon Jul 18 14:09:22 2022 +0200

    [FLINK-27914] Fix SessionJobController name + avoid creating new metric groups all the time
---
 .../kubernetes_operator_metric_configuration.html  |  6 ++
 .../metrics/KubernetesOperatorMetricOptions.java   |  6 ++
 .../operator/metrics/OperatorJosdkMetrics.java     | 91 ++++++++++------------
 .../operator/metrics/OperatorMetricUtils.java      | 13 ++--
 .../operator/metrics/OperatorJosdkMetricsTest.java | 13 +++-
 5 files changed, 74 insertions(+), 55 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
index 9e934ea4..d5d23294 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
@@ -14,6 +14,12 @@
             <td>Boolean</td>
             <td>Enable forwarding of Java Operator SDK metrics to the Flink metric registry.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.jvm.metrics.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Enable Kubernetes Operator JVM metrics.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.kubernetes.client.metrics.enabled</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
index c8628b3e..faba69d0 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
@@ -23,6 +23,12 @@ import org.apache.flink.configuration.ConfigOptions;
 /** Configuration options for metrics. */
 public class KubernetesOperatorMetricOptions {
 
+    public static final ConfigOption<Boolean> OPERATOR_JVM_METRICS_ENABLED =
+            ConfigOptions.key("kubernetes.operator.jvm.metrics.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Enable Kubernetes Operator JVM metrics.");
+
     public static final ConfigOption<Boolean> OPERATOR_JOSDK_METRICS_ENABLED =
             ConfigOptions.key("kubernetes.operator.josdk.metrics.enabled")
                     .booleanType()
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
index 15a14ca7..6b880e8c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
@@ -17,10 +17,9 @@
 
 package org.apache.flink.kubernetes.operator.metrics;
 
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
-import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.MetricGroup;
@@ -34,8 +33,8 @@ import io.javaoperatorsdk.operator.processing.event.Event;
 import io.javaoperatorsdk.operator.processing.event.ResourceID;
 import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
 
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -60,14 +59,14 @@ public class OperatorJosdkMetrics implements Metrics {
     private final Map<ResourceID, KubernetesResourceMetricGroup> resourceMetricGroups =
             new ConcurrentHashMap<>();
 
-    private final Map<String, Histogram> histograms = new ConcurrentHashMap<>();
-    private final Map<String, Counter> counters = new ConcurrentHashMap<>();
+    private final Map<List<String>, Histogram> histograms = new ConcurrentHashMap<>();
+    private final Map<List<String>, Counter> counters = new ConcurrentHashMap<>();
 
     private static final Map<String, String> CONTROLLERS =
             Map.of(
                     FlinkDeploymentController.class.getSimpleName().toLowerCase(),
                     "FlinkDeployment",
-                    FlinkSessionJob.class.getSimpleName().toLowerCase(),
+                    FlinkSessionJobController.class.getSimpleName().toLowerCase(),
                     "FlinkSessionJob");
 
     public OperatorJosdkMetrics(
@@ -95,15 +94,9 @@ public class OperatorJosdkMetrics implements Metrics {
     public void receivedEvent(Event event) {
         if (event instanceof ResourceEvent) {
             var action = ((ResourceEvent) event).getAction();
+            counter(getResourceMg(event.getRelatedCustomResourceID()), RESOURCE, EVENT).inc();
             counter(
                             getResourceMg(event.getRelatedCustomResourceID()),
-                            Collections.emptyList(),
-                            RESOURCE,
-                            EVENT)
-                    .inc();
-            counter(
-                            getResourceMg(event.getRelatedCustomResourceID()),
-                            Collections.emptyList(),
                             RESOURCE,
                             EVENT,
                             action.name())
@@ -113,29 +106,26 @@ public class OperatorJosdkMetrics implements Metrics {
 
     @Override
     public void cleanupDoneFor(ResourceID resourceID) {
-        counter(getResourceMg(resourceID), Collections.emptyList(), RECONCILIATION, "cleanup")
-                .inc();
+        counter(getResourceMg(resourceID), RECONCILIATION, "cleanup").inc();
     }
 
     @Override
     public void reconcileCustomResource(ResourceID resourceID, RetryInfo retryInfoNullable) {
-        counter(getResourceMg(resourceID), Collections.emptyList(), RECONCILIATION).inc();
+        counter(getResourceMg(resourceID), RECONCILIATION).inc();
 
         if (retryInfoNullable != null) {
-            counter(getResourceMg(resourceID), Collections.emptyList(), RECONCILIATION, "retries")
-                    .inc();
+            counter(getResourceMg(resourceID), RECONCILIATION, "retries").inc();
         }
     }
 
     @Override
     public void finishedReconciliation(ResourceID resourceID) {
-        counter(getResourceMg(resourceID), Collections.emptyList(), RECONCILIATION, "finished")
-                .inc();
+        counter(getResourceMg(resourceID), RECONCILIATION, "finished").inc();
     }
 
     @Override
     public void failedReconciliation(ResourceID resourceID, Exception exception) {
-        counter(getResourceMg(resourceID), Collections.emptyList(), RECONCILIATION, "failed").inc();
+        counter(getResourceMg(resourceID), RECONCILIATION, "failed").inc();
     }
 
     @Override
@@ -145,41 +135,46 @@ public class OperatorJosdkMetrics implements Metrics {
     }
 
     private Histogram histogram(ControllerExecution<?> execution, String name) {
-        MetricGroup group = operatorMetricGroup.addGroup(OPERATOR_SDK_GROUP);
-        for (String metricGroup :
-                Arrays.asList(
-                        CONTROLLERS.get(execution.controllerName().toLowerCase()),
-                        execution.name(),
-                        name)) {
-            group = group.addGroup(metricGroup);
-        }
-        var finalGroup = group;
+        var groups = getHistoGroups(execution, name);
         return histograms.computeIfAbsent(
-                String.join(".", group.getScopeComponents()),
-                s ->
-                        finalGroup.histogram(
-                                "TimeSeconds",
-                                OperatorMetricUtils.createHistogram(
-                                        configManager.getOperatorConfiguration())));
+                groups,
+                k -> {
+                    var group = operatorMetricGroup.addGroup(OPERATOR_SDK_GROUP);
+                    for (String mg : groups) {
+                        group = group.addGroup(mg);
+                    }
+                    var finalGroup = group;
+                    return finalGroup.histogram(
+                            "TimeSeconds",
+                            OperatorMetricUtils.createHistogram(
+                                    configManager.getOperatorConfiguration()));
+                });
+    }
+
+    private List<String> getHistoGroups(ControllerExecution<?> execution, String name) {
+        return List.of(
+                CONTROLLERS.get(execution.controllerName().toLowerCase()), execution.name(), name);
     }
 
     private long toSeconds(long startTime) {
         return TimeUnit.NANOSECONDS.toSeconds(clock.relativeTimeNanos() - startTime);
     }
 
-    private Counter counter(
-            MetricGroup parent, List<Tuple2<String, String>> additionalTags, String... names) {
-        MetricGroup group = parent.addGroup(OPERATOR_SDK_GROUP);
-        for (String name : names) {
-            group = group.addGroup(name);
-        }
-        for (Tuple2<String, String> tag : additionalTags) {
-            group = group.addGroup(tag.f0, tag.f1);
-        }
-        var finalGroup = group;
+    private Counter counter(MetricGroup parent, String... names) {
+        var key = new ArrayList<String>(parent.getScopeComponents().length + names.length);
+        Arrays.stream(parent.getScopeComponents()).forEach(key::add);
+        Arrays.stream(names).forEach(key::add);
+
         return counters.computeIfAbsent(
-                String.join(".", group.getScopeComponents()),
-                s -> OperatorMetricUtils.synchronizedCounter(finalGroup.counter("Count")));
+                key,
+                s -> {
+                    MetricGroup group = parent.addGroup(OPERATOR_SDK_GROUP);
+                    for (String name : names) {
+                        group = group.addGroup(name);
+                    }
+                    var finalGroup = group;
+                    return OperatorMetricUtils.synchronizedCounter(finalGroup.counter("Count"));
+                });
     }
 
     private KubernetesResourceNamespaceMetricGroup getResourceNsMg(ResourceID resourceID) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
index 0ff3f2bf..6b989907 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
@@ -64,8 +64,12 @@ public class OperatorMetricUtils {
                         EnvUtils.getOrDefault(
                                 EnvUtils.ENV_OPERATOR_NAME, "flink-kubernetes-operator"),
                         EnvUtils.getOrDefault(EnvUtils.ENV_HOSTNAME, "localhost"));
-        MetricGroup statusGroup = operatorMetricGroup.addGroup("Status");
-        MetricUtils.instantiateStatusMetrics(statusGroup);
+
+        if (defaultConfig.getBoolean(
+                KubernetesOperatorMetricOptions.OPERATOR_JVM_METRICS_ENABLED)) {
+            MetricGroup statusGroup = operatorMetricGroup.addGroup("Status");
+            MetricUtils.instantiateStatusMetrics(statusGroup);
+        }
         return operatorMetricGroup;
     }
 
@@ -107,9 +111,8 @@ public class OperatorMetricUtils {
     }
 
     public static Histogram createHistogram(FlinkOperatorConfiguration operatorConfiguration) {
-        return synchronizedHistogram(
-                new DescriptiveStatisticsHistogram(
-                        operatorConfiguration.getMetricsHistogramSampleSize()));
+        return new DescriptiveStatisticsHistogram(
+                operatorConfiguration.getMetricsHistogramSampleSize());
     }
 
     /** Thread safe {@link Histogram} wrapper. */
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java
index fc49c401..5b0186db 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java
@@ -45,10 +45,10 @@ import static org.junit.jupiter.api.Assertions.fail;
 /** {@link OperatorJosdkMetrics} tests. */
 public class OperatorJosdkMetricsTest {
 
-    private final ResourceID resourceId = new ResourceID("testns", "testname");
+    private final ResourceID resourceId = new ResourceID("testname", "testns");
     private final String controllerName = FlinkDeploymentController.class.getSimpleName();
     private final String resourcePrefix =
-            "testhost.k8soperator.flink-operator-test.testopname.resource.testname.testns.JOSDK.";
+            "testhost.k8soperator.flink-operator-test.testopname.resource.testns.testname.JOSDK.";
     private final String systemPrefix =
             "testhost.k8soperator.flink-operator-test.testopname.system.";
     private final String executionPrefix = systemPrefix + "JOSDK.FlinkDeployment.";
@@ -200,6 +200,15 @@ public class OperatorJosdkMetricsTest {
         operatorMetrics.monitorSizeOf(Map.of("a", "b", "c", "d"), "mymap");
         assertEquals(8, metrics.size());
         assertEquals(2, ((Gauge<Integer>) metrics.get(systemPrefix + "mymap.size")).getValue());
+
+        operatorMetrics.reconcileCustomResource(new ResourceID("other", "otherns"), null);
+        assertEquals(9, metrics.size());
+        assertEquals(
+                1,
+                ((Counter)
+                                metrics.get(
+                                        "testhost.k8soperator.flink-operator-test.testopname.resource.otherns.other.JOSDK.Reconciliation.Count"))
+                        .getCount());
     }
 
     private Histogram getHistogram(String... names) {