You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/14 18:55:44 UTC

[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

gyfora commented on code in PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#discussion_r897218837


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkOperatorMetrics.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+
+import io.javaoperatorsdk.operator.Operator;
+import io.javaoperatorsdk.operator.api.monitoring.Metrics;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.processing.event.Event;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Implementation of {@link Metrics} to monitor the operations of {@link Operator} and forward
+ * metrics to {@link MetricRegistry}.
+ */
+public class FlinkOperatorMetrics implements Metrics {
+
+    private static final String RECONCILIATIONS = "reconciliations.";
+    private final MetricGroup metricGroup;
+    private final Map<String, Counter> counters;
+
+    public FlinkOperatorMetrics(MetricGroup metricGroup) {
+        this.metricGroup = metricGroup;
+        this.counters = new ConcurrentHashMap<>();
+    }
+
+    public <T> T timeControllerExecution(ControllerExecution<T> execution) throws Exception {
+        String executionMetric =
+                String.format(
+                        "controller.%s.execution.%s", execution.controllerName(), execution.name());
+        counter(executionMetric);
+        try {
+            T result = execution.execute();
+            counter(executionMetric + ".success");
+            counter(executionMetric + ".success." + execution.successTypeName(result));
+            return result;
+        } catch (Exception e) {
+            counter(executionMetric + ".failure");
+            counter(executionMetric + ".failure." + e.getClass().getSimpleName());
+            throw e;
+        }
+    }
+
+    public void receivedEvent(Event event) {
+        incrementCounter(
+                event.getRelatedCustomResourceID(),
+                "events.received",
+                "event",
+                event.getClass().getSimpleName());
+    }
+
+    @Override
+    public void cleanupDoneFor(ResourceID resourceID) {
+        incrementCounter(resourceID, "events.delete");
+    }
+
+    @Override
+    public void reconcileCustomResource(ResourceID resourceID, RetryInfo retryInfoNullable) {
+        Optional<RetryInfo> retryInfo = Optional.ofNullable(retryInfoNullable);
+        incrementCounter(
+                resourceID,
+                RECONCILIATIONS + "started",
+                RECONCILIATIONS + "retries.number",
+                "" + retryInfo.map(RetryInfo::getAttemptCount).orElse(0),
+                RECONCILIATIONS + "retries.last",
+                "" + retryInfo.map(RetryInfo::isLastAttempt).orElse(true));
+    }
+
+    @Override
+    public void finishedReconciliation(ResourceID resourceID) {
+        incrementCounter(resourceID, RECONCILIATIONS + "success");
+    }
+
+    public void failedReconciliation(ResourceID resourceID, Exception exception) {
+        var cause = exception.getCause();
+        if (cause == null) {
+            cause = exception;
+        } else if (cause instanceof RuntimeException) {
+            cause = cause.getCause() != null ? cause.getCause() : cause;
+        }
+        incrementCounter(
+                resourceID,
+                RECONCILIATIONS + "failed",
+                "exception",
+                cause.getClass().getSimpleName());
+    }
+
+    public <T extends Map<?, ?>> T monitorSizeOf(T map, String name) {
+        metricGroup.gauge(name + ".size", map::size);
+        return map;
+    }
+
+    private void incrementCounter(ResourceID id, String counterName, String... additionalTags) {

Review Comment:
   I think tags should be metric groups so that this integrates well with the reporters.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java:
##########
@@ -71,36 +70,34 @@ public class FlinkOperator {
     private final Set<FlinkResourceValidator> validators;
     private final MetricGroup metricGroup;
 
+    private static final String OPERATOR_SDK_GROUP = "operator.sdk";
+
     public FlinkOperator(@Nullable Configuration conf) {
         this.client = new DefaultKubernetesClient();
         this.configManager = conf != null ? new FlinkConfigManager(conf) : new FlinkConfigManager();
-        this.operator =
-                new Operator(
-                        client,
-                        getConfigurationServiceOverriderConsumer(
-                                configManager.getOperatorConfiguration()));
-        this.flinkService = new FlinkService(client, configManager);
-        this.validators = ValidatorUtils.discoverValidators(configManager);
         this.metricGroup =
                 OperatorMetricUtils.initOperatorMetrics(configManager.getDefaultConfig());
+        this.operator = new Operator(client, getConfigurationServiceOverriderConsumer());
+        this.flinkService = new FlinkService(client, configManager);
+        this.validators = ValidatorUtils.discoverValidators(configManager);
         PluginManager pluginManager =
                 PluginUtils.createPluginManagerFromRootFolder(configManager.getDefaultConfig());
         FileSystem.initialize(configManager.getDefaultConfig(), pluginManager);
     }
 
-    @VisibleForTesting
-    protected static Consumer<ConfigurationServiceOverrider>
-            getConfigurationServiceOverriderConsumer(
-                    FlinkOperatorConfiguration operatorConfiguration) {
+    private Consumer<ConfigurationServiceOverrider> getConfigurationServiceOverriderConsumer() {
         return overrider -> {
-            int parallelism = operatorConfiguration.getReconcilerMaxParallelism();
+            int parallelism =
+                    configManager.getOperatorConfiguration().getReconcilerMaxParallelism();
             if (parallelism == -1) {
                 LOG.info("Configuring operator with unbounded reconciliation thread pool.");
                 overrider.withExecutorService(Executors.newCachedThreadPool());
             } else {
                 LOG.info("Configuring operator with {} reconciliation threads.", parallelism);
                 overrider.withConcurrentReconciliationThreads(parallelism);
             }
+            overrider.withMetrics(
+                    new FlinkOperatorMetrics(metricGroup.addGroup(OPERATOR_SDK_GROUP)));

Review Comment:
   I think this should be configurable: `kubernetes.operator.josdk.metrics.enabled : true/false`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org