You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/07/08 12:40:48 UTC

[flink-kubernetes-operator] 02/03: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 308f8add74165febaa2f80c61a2a9d242d8b0629
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Jul 7 14:13:38 2022 +0200

    [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter
---
 docs/content/docs/operations/metrics-logging.md    |   3 +-
 .../kubernetes_operator_config_configuration.html  |   6 +
 .../flink/kubernetes/operator/FlinkOperator.java   |   9 +-
 .../config/FlinkOperatorConfiguration.java         |   5 +
 .../config/KubernetesOperatorConfigOptions.java    |   7 +
 .../metrics/KubernetesOperatorMetricOptions.java   |   2 +-
 .../operator/metrics/OperatorJosdkMetrics.java     | 143 +++++++++++++++++++++
 .../operator/metrics/OperatorJosdkMetricsTest.java | 119 +++++++++++++++++
 8 files changed, 289 insertions(+), 5 deletions(-)

diff --git a/docs/content/docs/operations/metrics-logging.md b/docs/content/docs/operations/metrics-logging.md
index 0e34dcc..13aa01d 100644
--- a/docs/content/docs/operations/metrics-logging.md
+++ b/docs/content/docs/operations/metrics-logging.md
@@ -28,7 +28,6 @@ under the License.
 
 The Flink Kubernetes Operator (Operator) extends the [Flink Metric System](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/) that allows gathering and exposing metrics to centralized monitoring solutions. 
 
-
 ## Deployment Metrics
 The Operator gathers aggregates metrics about managed resources.
 
@@ -39,7 +38,7 @@ The Operator gathers aggregates metrics about managed resources.
 | Namespace | FlinkSessionJob.Count          | Number of managed FlinkSessionJob instances per namespace                                                                                                   | Gauge |
 
 ## System Metrics
-The Operator gathers metrics about the JVM process and exposes it similarly to core Flink [System metrics](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#system-metrics). The list of metrics are not repeated in this document. 
+The Operator gathers metrics about the JVM process and exposes it similarly to core Flink [System metrics](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#system-metrics). The list of metrics are not repeated in this document.
 
 ## Metric Reporters
 
diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index cfc1ba9..5c34d95 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -80,6 +80,12 @@
             <td>Boolean</td>
             <td>Enables last-state fallback for savepoint upgrade mode. When the job is not running thus savepoint cannot be triggered but HA metadata is available for last state restore the operator can initiate the upgrade process when the flag is enabled.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.josdk.metrics.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Enable forwarding of Java Operator SDK metrics to the Flink metric registry.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.observer.progress-check.interval</h5></td>
             <td style="word-wrap: break-word;">10 s</td>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 419702d..13bbe89 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
 import org.apache.flink.kubernetes.operator.listener.ListenerUtils;
 import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
 import org.apache.flink.kubernetes.operator.metrics.MetricManager;
+import org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics;
 import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
 import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
 import org.apache.flink.kubernetes.operator.observer.sessionjob.SessionJobObserver;
@@ -79,12 +80,12 @@ public class FlinkOperator {
                 conf != null
                         ? new FlinkConfigManager(conf) // For testing only
                         : new FlinkConfigManager(this::handleNamespaceChanges);
+        this.metricGroup =
+                OperatorMetricUtils.initOperatorMetrics(configManager.getDefaultConfig());
         this.operator = new Operator(client, this::overrideOperatorConfigs);
         this.flinkService = new FlinkService(client, configManager);
         this.validators = ValidatorUtils.discoverValidators(configManager);
         this.listeners = ListenerUtils.discoverListeners(configManager);
-        this.metricGroup =
-                OperatorMetricUtils.initOperatorMetrics(configManager.getDefaultConfig());
         PluginManager pluginManager =
                 PluginUtils.createPluginManagerFromRootFolder(configManager.getDefaultConfig());
         FileSystem.initialize(configManager.getDefaultConfig(), pluginManager);
@@ -109,6 +110,10 @@ public class FlinkOperator {
             LOG.info("Configuring operator with {} reconciliation threads.", parallelism);
             overrider.withConcurrentReconciliationThreads(parallelism);
         }
+        if (configManager.getOperatorConfiguration().getJosdkMetricsEnabled()) {
+            overrider.withMetrics(
+                    new OperatorJosdkMetrics(metricGroup, configManager.getDefaultConfig()));
+        }
     }
 
     private void registerDeploymentController() {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index 52fbb85..d295501 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -43,6 +43,7 @@ public class FlinkOperatorConfiguration {
     String flinkServiceHostOverride;
     Set<String> watchedNamespaces;
     Boolean dynamicNamespacesEnabled;
+    Boolean josdkMetricsEnabled;
     Duration flinkCancelJobTimeout;
     Duration flinkShutdownClusterTimeout;
     String artifactsBaseDir;
@@ -108,6 +109,9 @@ public class FlinkOperatorConfiguration {
                 operatorConfig.get(
                         KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_NAMESPACES_ENABLED);
 
+        boolean josdkMetricsEnabled =
+                operatorConfig.get(KubernetesOperatorConfigOptions.OPERATOR_JOSDK_METRICS_ENABLED);
+
         RetryConfiguration retryConfiguration = new FlinkOperatorRetryConfiguration(operatorConfig);
 
         return new FlinkOperatorConfiguration(
@@ -119,6 +123,7 @@ public class FlinkOperatorConfiguration {
                 flinkServiceHostOverride,
                 watchedNamespaces,
                 dynamicNamespacesEnabled,
+                josdkMetricsEnabled,
                 flinkCancelJobTimeout,
                 flinkShutdownClusterTimeout,
                 artifactsBaseDir,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index aa6031c..b01e845 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -210,6 +210,13 @@ public class KubernetesOperatorConfigOptions {
                     .defaultValue(false)
                     .withDescription("Enables dynamic change of watched/monitored namespaces.");
 
+    public static final ConfigOption<Boolean> OPERATOR_JOSDK_METRICS_ENABLED =
+            ConfigOptions.key("kubernetes.operator.josdk.metrics.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Enable forwarding of Java Operator SDK metrics to the Flink metric registry.");
+
     public static final ConfigOption<Duration> OPERATOR_RETRY_INITIAL_INTERVAL =
             ConfigOptions.key("kubernetes.operator.retry.initial.interval")
                     .durationType()
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
index 30b02f9..20e0f94 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
@@ -31,7 +31,7 @@ public class KubernetesOperatorMetricOptions {
 
     public static final ConfigOption<String> SCOPE_NAMING_KUBERNETES_OPERATOR_RESOURCENS =
             ConfigOptions.key("metrics.scope.k8soperator.resourcens")
-                    .defaultValue("<host>.k8soperator.<namespace>.<name>.namespace.<resourcens>.")
+                    .defaultValue("<host>.k8soperator.<namespace>.<name>.namespace.<resourcens>")
                     .withDescription(
                             "Defines the scope format string that is applied to all metrics scoped to the kubernetes operator resource namespace.");
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
new file mode 100644
index 0000000..c35ff53
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+
+import io.javaoperatorsdk.operator.api.monitoring.Metrics;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.processing.event.Event;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Implementation of {@link Metrics} to monitor and forward JOSDK metrics to {@link MetricRegistry}.
+ */
+public class OperatorJosdkMetrics implements Metrics {
+
+    private static final String OPERATOR_SDK_GROUP = "JOSDK";
+    private static final String RECONCILIATION = "Reconciliation";
+    private static final String RESOURCE = "Resource";
+    private static final String EVENT = "Event";
+
+    private final KubernetesOperatorMetricGroup operatorMetricGroup;
+    private final Configuration conf;
+
+    private final Map<ResourceID, KubernetesResourceNamespaceMetricGroup> resourceNsMetricGroups =
+            new ConcurrentHashMap<>();
+    private final Map<ResourceID, KubernetesResourceMetricGroup> resourceMetricGroups =
+            new ConcurrentHashMap<>();
+
+    private final Map<String, Counter> counters = new ConcurrentHashMap<>();
+
+    public OperatorJosdkMetrics(
+            KubernetesOperatorMetricGroup operatorMetricGroup, Configuration conf) {
+        this.operatorMetricGroup = operatorMetricGroup;
+        this.conf = conf;
+    }
+
+    @Override
+    public void receivedEvent(Event event) {
+        if (event instanceof ResourceEvent) {
+            var action = ((ResourceEvent) event).getAction();
+            counter(
+                            getResourceMg(event.getRelatedCustomResourceID()),
+                            Collections.emptyList(),
+                            RESOURCE,
+                            EVENT)
+                    .inc();
+            counter(
+                            getResourceMg(event.getRelatedCustomResourceID()),
+                            Collections.emptyList(),
+                            RESOURCE,
+                            EVENT,
+                            action.name())
+                    .inc();
+        }
+    }
+
+    @Override
+    public void cleanupDoneFor(ResourceID resourceID) {
+        counter(getResourceMg(resourceID), Collections.emptyList(), RECONCILIATION, "cleanup")
+                .inc();
+    }
+
+    @Override
+    public void reconcileCustomResource(ResourceID resourceID, RetryInfo retryInfoNullable) {
+        counter(getResourceMg(resourceID), Collections.emptyList(), RECONCILIATION).inc();
+
+        if (retryInfoNullable != null) {
+            counter(getResourceMg(resourceID), Collections.emptyList(), RECONCILIATION, "retries")
+                    .inc();
+        }
+    }
+
+    @Override
+    public void finishedReconciliation(ResourceID resourceID) {
+        counter(getResourceMg(resourceID), Collections.emptyList(), RECONCILIATION, "finished")
+                .inc();
+    }
+
+    @Override
+    public void failedReconciliation(ResourceID resourceID, Exception exception) {
+        counter(getResourceMg(resourceID), Collections.emptyList(), RECONCILIATION, "failed").inc();
+    }
+
+    @Override
+    public <T extends Map<?, ?>> T monitorSizeOf(T map, String name) {
+        operatorMetricGroup.addGroup(name).gauge("size", map::size);
+        return map;
+    }
+
+    private Counter counter(
+            MetricGroup parent, List<Tuple2<String, String>> additionalTags, String... names) {
+        MetricGroup group = parent.addGroup(OPERATOR_SDK_GROUP);
+        for (String name : names) {
+            group = group.addGroup(name);
+        }
+        for (Tuple2<String, String> tag : additionalTags) {
+            group = group.addGroup(tag.f0, tag.f1);
+        }
+        var finalGroup = group;
+        return counters.computeIfAbsent(
+                String.join(".", group.getScopeComponents()), s -> finalGroup.counter("Count"));
+    }
+
+    private KubernetesResourceNamespaceMetricGroup getResourceNsMg(ResourceID resourceID) {
+        return resourceNsMetricGroups.computeIfAbsent(
+                resourceID,
+                rid ->
+                        operatorMetricGroup.createResourceNamespaceGroup(
+                                conf, rid.getNamespace().orElse("default")));
+    }
+
+    private KubernetesResourceMetricGroup getResourceMg(ResourceID resourceID) {
+        return resourceMetricGroups.computeIfAbsent(
+                resourceID,
+                rid -> getResourceNsMg(rid).createResourceNamespaceGroup(conf, rid.getName()));
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java
new file mode 100644
index 0000000..0a719d7
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
+
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
+import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** {@link OperatorJosdkMetrics} tests. */
+public class OperatorJosdkMetricsTest {
+
+    private final ResourceID resourceId = new ResourceID("testns", "testname");
+    private final String resourcePrefix =
+            "testhost.k8soperator.flink-operator-test.testopname.resource.testname.testns.JOSDK.";
+    private final String systemPrefix =
+            "testhost.k8soperator.flink-operator-test.testopname.system.";
+
+    private Map<String, Metric> metrics = new HashMap<>();
+    private OperatorJosdkMetrics operatorMetrics;
+
+    @BeforeEach
+    public void setup() {
+        TestingMetricRegistry registry =
+                TestingMetricRegistry.builder()
+                        .setDelimiter(".".charAt(0))
+                        .setRegisterConsumer(
+                                (metric, name, group) -> {
+                                    metrics.put(group.getMetricIdentifier(name), metric);
+                                })
+                        .build();
+        operatorMetrics =
+                new OperatorJosdkMetrics(
+                        TestUtils.createTestMetricGroup(registry, new Configuration()),
+                        new Configuration());
+    }
+
+    @Test
+    public void testMetrics() {
+        operatorMetrics.failedReconciliation(resourceId, null);
+        assertEquals(1, metrics.size());
+        assertEquals(1, getCount("Reconciliation.failed"));
+        operatorMetrics.failedReconciliation(resourceId, null);
+        operatorMetrics.failedReconciliation(resourceId, null);
+        assertEquals(1, metrics.size());
+        assertEquals(3, getCount("Reconciliation.failed"));
+
+        operatorMetrics.reconcileCustomResource(resourceId, null);
+        assertEquals(2, metrics.size());
+        assertEquals(1, getCount("Reconciliation"));
+
+        operatorMetrics.reconcileCustomResource(
+                resourceId,
+                new RetryInfo() {
+                    @Override
+                    public int getAttemptCount() {
+                        return 0;
+                    }
+
+                    @Override
+                    public boolean isLastAttempt() {
+                        return false;
+                    }
+                });
+        assertEquals(3, metrics.size());
+        assertEquals(2, getCount("Reconciliation"));
+        assertEquals(1, getCount("Reconciliation.retries"));
+
+        operatorMetrics.receivedEvent(new ResourceEvent(ResourceAction.ADDED, resourceId, null));
+        assertEquals(5, metrics.size());
+        assertEquals(1, getCount("Resource.Event"));
+        assertEquals(1, getCount("Resource.Event.ADDED"));
+
+        operatorMetrics.cleanupDoneFor(resourceId);
+        assertEquals(6, metrics.size());
+        assertEquals(1, getCount("Reconciliation.cleanup"));
+
+        operatorMetrics.finishedReconciliation(resourceId);
+        assertEquals(7, metrics.size());
+        assertEquals(1, getCount("Reconciliation.finished"));
+
+        operatorMetrics.monitorSizeOf(Map.of("a", "b", "c", "d"), "mymap");
+        assertEquals(8, metrics.size());
+        assertEquals(2, ((Gauge<Integer>) metrics.get(systemPrefix + "mymap.size")).getValue());
+    }
+
+    private long getCount(String name) {
+        return ((Counter) metrics.get(resourcePrefix + name + ".Count")).getCount();
+    }
+}