You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/07/12 09:36:20 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-28480] Forward timeControllerExecution time as histogram for JOSDK Metrics interface

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new b320516  [FLINK-28480] Forward timeControllerExecution time as histogram for JOSDK Metrics interface
b320516 is described below

commit b320516e40aba988c85135a5b6510b43f289ccd1
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Tue Jul 12 17:36:14 2022 +0800

    [FLINK-28480] Forward timeControllerExecution time as histogram for JOSDK Metrics interface
---
 .../operator/metrics/OperatorJosdkMetrics.java     | 53 ++++++++++++
 .../operator/metrics/OperatorJosdkMetricsTest.java | 93 ++++++++++++++++++++++
 2 files changed, 146 insertions(+)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
index c35ff53..538f01b 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
@@ -19,9 +19,15 @@ package org.apache.flink.kubernetes.operator.metrics;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
 
 import io.javaoperatorsdk.operator.api.monitoring.Metrics;
 import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
@@ -29,10 +35,12 @@ import io.javaoperatorsdk.operator.processing.event.Event;
 import io.javaoperatorsdk.operator.processing.event.ResourceID;
 import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Implementation of {@link Metrics} to monitor and forward JOSDK metrics to {@link MetricRegistry}.
@@ -43,21 +51,45 @@ public class OperatorJosdkMetrics implements Metrics {
     private static final String RECONCILIATION = "Reconciliation";
     private static final String RESOURCE = "Resource";
     private static final String EVENT = "Event";
+    private static final int WINDOW_SIZE = 1000;
 
     private final KubernetesOperatorMetricGroup operatorMetricGroup;
     private final Configuration conf;
+    private final Clock clock;
 
     private final Map<ResourceID, KubernetesResourceNamespaceMetricGroup> resourceNsMetricGroups =
             new ConcurrentHashMap<>();
     private final Map<ResourceID, KubernetesResourceMetricGroup> resourceMetricGroups =
             new ConcurrentHashMap<>();
 
+    private final Map<String, Histogram> histograms = new ConcurrentHashMap<>();
     private final Map<String, Counter> counters = new ConcurrentHashMap<>();
 
+    private static final Map<String, String> CONTROLLERS =
+            Map.of(
+                    FlinkDeploymentController.class.getSimpleName().toLowerCase(),
+                    "FlinkDeployment",
+                    FlinkSessionJob.class.getSimpleName().toLowerCase(),
+                    "FlinkSessionJob");
+
     public OperatorJosdkMetrics(
             KubernetesOperatorMetricGroup operatorMetricGroup, Configuration conf) {
         this.operatorMetricGroup = operatorMetricGroup;
         this.conf = conf;
+        this.clock = SystemClock.getInstance();
+    }
+
+    @Override
+    public <T> T timeControllerExecution(ControllerExecution<T> execution) throws Exception {
+        long startTime = clock.relativeTimeNanos();
+        try {
+            T result = execution.execute();
+            histogram(execution, execution.successTypeName(result)).update(toSeconds(startTime));
+            return result;
+        } catch (Exception e) {
+            histogram(execution, "failed").update(toSeconds(startTime));
+            throw e;
+        }
     }
 
     @Override
@@ -113,6 +145,27 @@ public class OperatorJosdkMetrics implements Metrics {
         return map;
     }
 
+    private Histogram histogram(ControllerExecution<?> execution, String name) {
+        MetricGroup group = operatorMetricGroup.addGroup(OPERATOR_SDK_GROUP);
+        for (String metricGroup :
+                Arrays.asList(
+                        CONTROLLERS.get(execution.controllerName().toLowerCase()),
+                        execution.name(),
+                        name)) {
+            group = group.addGroup(metricGroup);
+        }
+        var finalGroup = group;
+        return histograms.computeIfAbsent(
+                String.join(".", group.getScopeComponents()),
+                s ->
+                        finalGroup.histogram(
+                                "TimeSeconds", new DescriptiveStatisticsHistogram(WINDOW_SIZE)));
+    }
+
+    private long toSeconds(long startTime) {
+        return TimeUnit.NANOSECONDS.toSeconds(clock.relativeTimeNanos() - startTime);
+    }
+
     private Counter counter(
             MetricGroup parent, List<Tuple2<String, String>> additionalTags, String... names) {
         MetricGroup group = parent.addGroup(OPERATOR_SDK_GROUP);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java
index 0a719d7..dba0a82 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java
@@ -19,11 +19,15 @@ package org.apache.flink.kubernetes.operator.metrics;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
 
+import io.javaoperatorsdk.operator.api.monitoring.Metrics;
 import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
 import io.javaoperatorsdk.operator.processing.event.ResourceID;
 import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
@@ -35,15 +39,18 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
 
 /** {@link OperatorJosdkMetrics} tests. */
 public class OperatorJosdkMetricsTest {
 
     private final ResourceID resourceId = new ResourceID("testns", "testname");
+    private final String controllerName = FlinkDeploymentController.class.getSimpleName();
     private final String resourcePrefix =
             "testhost.k8soperator.flink-operator-test.testopname.resource.testname.testns.JOSDK.";
     private final String systemPrefix =
             "testhost.k8soperator.flink-operator-test.testopname.system.";
+    private final String executionPrefix = systemPrefix + "JOSDK.FlinkDeployment.";
 
     private Map<String, Metric> metrics = new HashMap<>();
     private OperatorJosdkMetrics operatorMetrics;
@@ -64,6 +71,87 @@ public class OperatorJosdkMetricsTest {
                         new Configuration());
     }
 
+    @Test
+    public void testTimeControllerExecution() throws Exception {
+        Metrics.ControllerExecution<Object> successExecution =
+                new Metrics.ControllerExecution<>() {
+                    @Override
+                    public String name() {
+                        return "reconcile";
+                    }
+
+                    @Override
+                    public String controllerName() {
+                        return controllerName;
+                    }
+
+                    @Override
+                    public String successTypeName(Object o) {
+                        return "resource";
+                    }
+
+                    @Override
+                    public Object execute() throws Exception {
+                        Thread.sleep(1000);
+                        return null;
+                    }
+                };
+        operatorMetrics.timeControllerExecution(successExecution);
+        assertEquals(1, metrics.size());
+        assertEquals(1, getHistogram("reconcile", "resource").getCount());
+        assertEquals(1, getHistogram("reconcile", "resource").getStatistics().getMin());
+        operatorMetrics.timeControllerExecution(successExecution);
+        operatorMetrics.timeControllerExecution(successExecution);
+        assertEquals(1, metrics.size());
+        assertEquals(3, getHistogram("reconcile", "resource").getCount());
+        assertEquals(1, getHistogram("reconcile", "resource").getStatistics().getMin());
+
+        Metrics.ControllerExecution<Object> failureExecution =
+                new Metrics.ControllerExecution<>() {
+                    @Override
+                    public String name() {
+                        return "cleanup";
+                    }
+
+                    @Override
+                    public String controllerName() {
+                        return controllerName;
+                    }
+
+                    @Override
+                    public String successTypeName(Object o) {
+                        return null;
+                    }
+
+                    @Override
+                    public Object execute() throws Exception {
+                        Thread.sleep(1000);
+                        throw new ReconciliationException(new RuntimeException());
+                    }
+                };
+        try {
+            operatorMetrics.timeControllerExecution(failureExecution);
+            fail();
+        } catch (Exception e) {
+            assertEquals(2, metrics.size());
+            assertEquals(1, getHistogram("cleanup", "failed").getCount());
+            assertEquals(1, getHistogram("cleanup", "failed").getStatistics().getMin());
+        }
+        try {
+            operatorMetrics.timeControllerExecution(failureExecution);
+            fail();
+        } catch (Exception ignored) {
+        }
+        try {
+            operatorMetrics.timeControllerExecution(failureExecution);
+            fail();
+        } catch (Exception e) {
+            assertEquals(2, metrics.size());
+            assertEquals(3, getHistogram("cleanup", "failed").getCount());
+            assertEquals(1, getHistogram("cleanup", "failed").getStatistics().getMin());
+        }
+    }
+
     @Test
     public void testMetrics() {
         operatorMetrics.failedReconciliation(resourceId, null);
@@ -113,6 +201,11 @@ public class OperatorJosdkMetricsTest {
         assertEquals(2, ((Gauge<Integer>) metrics.get(systemPrefix + "mymap.size")).getValue());
     }
 
+    private Histogram getHistogram(String... names) {
+        return ((Histogram)
+                metrics.get(executionPrefix + String.join(".", names) + ".TimeSeconds"));
+    }
+
     private long getCount(String name) {
         return ((Counter) metrics.get(resourcePrefix + name + ".Count")).getCount();
     }