You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/07/12 09:36:20 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-28480] Forward timeControllerExecution time as histogram for JOSDK Metrics interface
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new b320516 [FLINK-28480] Forward timeControllerExecution time as histogram for JOSDK Metrics interface
b320516 is described below
commit b320516e40aba988c85135a5b6510b43f289ccd1
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Tue Jul 12 17:36:14 2022 +0800
[FLINK-28480] Forward timeControllerExecution time as histogram for JOSDK Metrics interface
---
.../operator/metrics/OperatorJosdkMetrics.java | 53 ++++++++++++
.../operator/metrics/OperatorJosdkMetricsTest.java | 93 ++++++++++++++++++++++
2 files changed, 146 insertions(+)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
index c35ff53..538f01b 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
@@ -19,9 +19,15 @@ package org.apache.flink.kubernetes.operator.metrics;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
@@ -29,10 +35,12 @@ import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
/**
* Implementation of {@link Metrics} to monitor and forward JOSDK metrics to {@link MetricRegistry}.
@@ -43,21 +51,45 @@ public class OperatorJosdkMetrics implements Metrics {
private static final String RECONCILIATION = "Reconciliation";
private static final String RESOURCE = "Resource";
private static final String EVENT = "Event";
+ private static final int WINDOW_SIZE = 1000;
private final KubernetesOperatorMetricGroup operatorMetricGroup;
private final Configuration conf;
+ private final Clock clock;
private final Map<ResourceID, KubernetesResourceNamespaceMetricGroup> resourceNsMetricGroups =
new ConcurrentHashMap<>();
private final Map<ResourceID, KubernetesResourceMetricGroup> resourceMetricGroups =
new ConcurrentHashMap<>();
+ private final Map<String, Histogram> histograms = new ConcurrentHashMap<>();
private final Map<String, Counter> counters = new ConcurrentHashMap<>();
+ private static final Map<String, String> CONTROLLERS =
+ Map.of(
+ FlinkDeploymentController.class.getSimpleName().toLowerCase(),
+ "FlinkDeployment",
+ FlinkSessionJob.class.getSimpleName().toLowerCase(),
+ "FlinkSessionJob");
+
public OperatorJosdkMetrics(
KubernetesOperatorMetricGroup operatorMetricGroup, Configuration conf) {
this.operatorMetricGroup = operatorMetricGroup;
this.conf = conf;
+ this.clock = SystemClock.getInstance();
+ }
+
+ @Override
+ public <T> T timeControllerExecution(ControllerExecution<T> execution) throws Exception {
+ long startTime = clock.relativeTimeNanos();
+ try {
+ T result = execution.execute();
+ histogram(execution, execution.successTypeName(result)).update(toSeconds(startTime));
+ return result;
+ } catch (Exception e) {
+ histogram(execution, "failed").update(toSeconds(startTime));
+ throw e;
+ }
}
@Override
@@ -113,6 +145,27 @@ public class OperatorJosdkMetrics implements Metrics {
return map;
}
+ private Histogram histogram(ControllerExecution<?> execution, String name) {
+ MetricGroup group = operatorMetricGroup.addGroup(OPERATOR_SDK_GROUP);
+ for (String metricGroup :
+ Arrays.asList(
+ CONTROLLERS.get(execution.controllerName().toLowerCase()),
+ execution.name(),
+ name)) {
+ group = group.addGroup(metricGroup);
+ }
+ var finalGroup = group;
+ return histograms.computeIfAbsent(
+ String.join(".", group.getScopeComponents()),
+ s ->
+ finalGroup.histogram(
+ "TimeSeconds", new DescriptiveStatisticsHistogram(WINDOW_SIZE)));
+ }
+
+ private long toSeconds(long startTime) {
+ return TimeUnit.NANOSECONDS.toSeconds(clock.relativeTimeNanos() - startTime);
+ }
+
private Counter counter(
MetricGroup parent, List<Tuple2<String, String>> additionalTags, String... names) {
MetricGroup group = parent.addGroup(OPERATOR_SDK_GROUP);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java
index 0a719d7..dba0a82 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetricsTest.java
@@ -19,11 +19,15 @@ package org.apache.flink.kubernetes.operator.metrics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
+import io.javaoperatorsdk.operator.api.monitoring.Metrics;
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
@@ -35,15 +39,18 @@ import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
/** {@link OperatorJosdkMetrics} tests. */
public class OperatorJosdkMetricsTest {
private final ResourceID resourceId = new ResourceID("testns", "testname");
+ private final String controllerName = FlinkDeploymentController.class.getSimpleName();
private final String resourcePrefix =
"testhost.k8soperator.flink-operator-test.testopname.resource.testname.testns.JOSDK.";
private final String systemPrefix =
"testhost.k8soperator.flink-operator-test.testopname.system.";
+ private final String executionPrefix = systemPrefix + "JOSDK.FlinkDeployment.";
private Map<String, Metric> metrics = new HashMap<>();
private OperatorJosdkMetrics operatorMetrics;
@@ -64,6 +71,87 @@ public class OperatorJosdkMetricsTest {
new Configuration());
}
+ @Test
+ public void testTimeControllerExecution() throws Exception {
+ Metrics.ControllerExecution<Object> successExecution =
+ new Metrics.ControllerExecution<>() {
+ @Override
+ public String name() {
+ return "reconcile";
+ }
+
+ @Override
+ public String controllerName() {
+ return controllerName;
+ }
+
+ @Override
+ public String successTypeName(Object o) {
+ return "resource";
+ }
+
+ @Override
+ public Object execute() throws Exception {
+ Thread.sleep(1000);
+ return null;
+ }
+ };
+ operatorMetrics.timeControllerExecution(successExecution);
+ assertEquals(1, metrics.size());
+ assertEquals(1, getHistogram("reconcile", "resource").getCount());
+ assertEquals(1, getHistogram("reconcile", "resource").getStatistics().getMin());
+ operatorMetrics.timeControllerExecution(successExecution);
+ operatorMetrics.timeControllerExecution(successExecution);
+ assertEquals(1, metrics.size());
+ assertEquals(3, getHistogram("reconcile", "resource").getCount());
+ assertEquals(1, getHistogram("reconcile", "resource").getStatistics().getMin());
+
+ Metrics.ControllerExecution<Object> failureExecution =
+ new Metrics.ControllerExecution<>() {
+ @Override
+ public String name() {
+ return "cleanup";
+ }
+
+ @Override
+ public String controllerName() {
+ return controllerName;
+ }
+
+ @Override
+ public String successTypeName(Object o) {
+ return null;
+ }
+
+ @Override
+ public Object execute() throws Exception {
+ Thread.sleep(1000);
+ throw new ReconciliationException(new RuntimeException());
+ }
+ };
+ try {
+ operatorMetrics.timeControllerExecution(failureExecution);
+ fail();
+ } catch (Exception e) {
+ assertEquals(2, metrics.size());
+ assertEquals(1, getHistogram("cleanup", "failed").getCount());
+ assertEquals(1, getHistogram("cleanup", "failed").getStatistics().getMin());
+ }
+ try {
+ operatorMetrics.timeControllerExecution(failureExecution);
+ fail();
+ } catch (Exception ignored) {
+ }
+ try {
+ operatorMetrics.timeControllerExecution(failureExecution);
+ fail();
+ } catch (Exception e) {
+ assertEquals(2, metrics.size());
+ assertEquals(3, getHistogram("cleanup", "failed").getCount());
+ assertEquals(1, getHistogram("cleanup", "failed").getStatistics().getMin());
+ }
+ }
+
@Test
public void testMetrics() {
operatorMetrics.failedReconciliation(resourceId, null);
@@ -113,6 +201,11 @@ public class OperatorJosdkMetricsTest {
assertEquals(2, ((Gauge<Integer>) metrics.get(systemPrefix + "mymap.size")).getValue());
}
+ private Histogram getHistogram(String... names) {
+ return ((Histogram)
+ metrics.get(executionPrefix + String.join(".", names) + ".TimeSeconds"));
+ }
+
private long getCount(String name) {
return ((Counter) metrics.get(resourcePrefix + name + ".Count")).getCount();
}