You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/07/12 13:21:28 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-28476] Add metrics for Kubernetes API server access
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new e8f9e03 [FLINK-28476] Add metrics for Kubernetes API server access
e8f9e03 is described below
commit e8f9e03396255efa3fab40549389a6969141d36d
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Sat Jul 9 18:15:11 2022 +0200
[FLINK-28476] Add metrics for Kubernetes API server access
---
docs/content/docs/operations/metrics-logging.md | 16 ++
.../kubernetes_operator_metric_configuration.html | 12 ++
.../flink/kubernetes/operator/FlinkOperator.java | 6 +-
.../config/FlinkOperatorConfiguration.java | 12 ++
.../operator/metrics/KubernetesClientMetrics.java | 137 +++++++++++++++++
.../metrics/KubernetesOperatorMetricOptions.java | 17 +++
.../operator/utils/KubernetesClientUtils.java | 58 +++++++
.../metrics/KubernetesClientMetricsTest.java | 170 +++++++++++++++++++++
8 files changed, 426 insertions(+), 2 deletions(-)
diff --git a/docs/content/docs/operations/metrics-logging.md b/docs/content/docs/operations/metrics-logging.md
index f6a7a9a..4769381 100644
--- a/docs/content/docs/operations/metrics-logging.md
+++ b/docs/content/docs/operations/metrics-logging.md
@@ -37,6 +37,22 @@ The Operator gathers aggregates metrics about managed resources.
| Namespace | FlinkDeployment.JmDeploymentStatus.<Status>.Count | Number of managed FlinkDeployment resources per <Status> per namespace. <Status> can take values from: READY, DEPLOYED_NOT_READY, DEPLOYING, MISSING, ERROR | Gauge |
| Namespace | FlinkSessionJob.Count | Number of managed FlinkSessionJob instances per namespace | Gauge |
+## Kubernetes Client Metrics
+
+The Operator gathers various metrics related to Kubernetes API server access. The Kubernetes client metrics can be enabled by the configuration `kubernetes.operator.kubernetes.client.metrics.enabled` (default: `true`).
+
+| Scope | Metrics | Description | Type |
+|--------|----------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------|-----------|
+| System | KubeClient.HttpRequest.Count | Number of HTTP request sent to the Kubernetes API Server | Counter |
+| System | KubeClient.HttpRequest.<RequestMethod>.Count | Number of HTTP request sent to the Kubernetes API Server per request method. <RequestMethod> can take values from: GET, POST, PUT, PATCH, DELETE, etc. | Counter |
+| System | KubeClient.HttpRequest.Failed.Count | Number of failed HTTP requests that has no response from the Kubernetes API Server | Counter |
+| System | KubeClient.HttpResponse.Count | Number of HTTP responses received from the Kubernetes API Server | Counter |
+| System | KubeClient.HttpResponse.<ResponseCode>.Count | Number of HTTP responses received from the Kubernetes API Server per response code. <ResponseCode> can take values from: 200, 404, 503, etc. | Counter |
+| System | KubeClient.HttpRequest.NumPerSecond | Number of HTTP requests sent to the Kubernetes API Server per second | Meter |
+| System | KubeClient.HttpRequest.Failed.NumPerSecond | Number of failed HTTP requests sent to the Kubernetes API Server per second | Meter |
+| System | KubeClient.HttpResponse.NumPerSecond | Number of HTTP responses received from the Kubernetes API Server per second | Meter |
+| System | KubeClient.HttpResponse.TimeNanos | Latency statistics obtained from the HTTP responses received from the Kubernetes API Server | Histogram |
+
## System Metrics
The Operator gathers metrics about the JVM process and exposes it similarly to core Flink [System metrics](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#system-metrics). The list of metrics are not repeated in this document.
diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
index 37ee840..22fdd1d 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
@@ -14,6 +14,18 @@
<td>Boolean</td>
<td>Enable forwarding of Java Operator SDK metrics to the Flink metric registry.</td>
</tr>
+ <tr>
+ <td><h5>kubernetes.operator.kubernetes.client.metrics.enabled</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Enable KubernetesClient metrics for measuring the HTTP traffic to the Kubernetes API Server.</td>
+ </tr>
+ <tr>
+ <td><h5>kubernetes.operator.metrics.histogram.sample.size</h5></td>
+ <td style="word-wrap: break-word;">1000</td>
+ <td>Integer</td>
+ <td>Defines the number of measured samples when calculating statistics.</td>
+ </tr>
<tr>
<td><h5>metrics.scope.k8soperator.resource</h5></td>
<td style="word-wrap: break-word;">"<host>.k8soperator.<namespace>.<name>.resource.<resourcens>.<resourcename>"</td>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 3324478..996e67b 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -39,11 +39,11 @@ import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReco
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EnvUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
-import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.Operator;
import io.javaoperatorsdk.operator.RegisteredController;
@@ -75,13 +75,15 @@ public class FlinkOperator {
private final Collection<FlinkResourceListener> listeners;
public FlinkOperator(@Nullable Configuration conf) {
- this.client = new DefaultKubernetesClient();
this.configManager =
conf != null
? new FlinkConfigManager(conf) // For testing only
: new FlinkConfigManager(this::handleNamespaceChanges);
this.metricGroup =
OperatorMetricUtils.initOperatorMetrics(configManager.getDefaultConfig());
+ this.client =
+ KubernetesClientUtils.getKubernetesClient(
+ configManager.getOperatorConfiguration(), this.metricGroup);
this.operator = new Operator(client, this::overrideOperatorConfigs);
this.flinkService = new FlinkService(client, configManager);
this.validators = ValidatorUtils.discoverValidators(configManager);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index 8983b15..45b9507 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -45,6 +45,8 @@ public class FlinkOperatorConfiguration {
Set<String> watchedNamespaces;
boolean dynamicNamespacesEnabled;
boolean josdkMetricsEnabled;
+ int metricsHistogramSampleSize;
+ boolean kubernetesClientMetricsEnabled;
Duration flinkCancelJobTimeout;
Duration flinkShutdownClusterTimeout;
String artifactsBaseDir;
@@ -113,6 +115,14 @@ public class FlinkOperatorConfiguration {
boolean josdkMetricsEnabled =
operatorConfig.get(KubernetesOperatorMetricOptions.OPERATOR_JOSDK_METRICS_ENABLED);
+ boolean kubernetesClientMetricsEnabled =
+ operatorConfig.get(
+ KubernetesOperatorMetricOptions.OPERATOR_KUBERNETES_CLIENT_METRICS_ENABLED);
+
+ int metricsHistogramSampleSize =
+ operatorConfig.get(
+ KubernetesOperatorMetricOptions.OPERATOR_METRICS_HISTOGRAM_SAMPLE_SIZE);
+
RetryConfiguration retryConfiguration = new FlinkOperatorRetryConfiguration(operatorConfig);
return new FlinkOperatorConfiguration(
@@ -125,6 +135,8 @@ public class FlinkOperatorConfiguration {
watchedNamespaces,
dynamicNamespacesEnabled,
josdkMetricsEnabled,
+ metricsHistogramSampleSize,
+ kubernetesClientMetricsEnabled,
flinkCancelJobTimeout,
flinkShutdownClusterTimeout,
artifactsBaseDir,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetrics.java
new file mode 100644
index 0000000..1a9733d
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetrics.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
+
+import okhttp3.Interceptor;
+import okhttp3.Request;
+import okhttp3.Response;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/** Kubernetes client metrics. */
+public class KubernetesClientMetrics implements Interceptor {
+
+ public static final String KUBE_CLIENT_GROUP = "KubeClient";
+ public static final String HTTP_REQUEST_GROUP = "HttpRequest";
+ public static final String HTTP_REQUEST_FAILED_GROUP = "Failed";
+ public static final String HTTP_RESPONSE_GROUP = "HttpResponse";
+ public static final String COUNTER = "Count";
+ public static final String METER = "NumPerSecond";
+ public static final String HISTO = "TimeNanos";
+ private final Histogram responseLatency;
+
+ private final MetricGroup requestMetricGroup;
+ private final MetricGroup failedRequestMetricGroup;
+ private final MetricGroup responseMetricGroup;
+
+ private final Counter requestCounter;
+ private final Counter failedRequestCounter;
+ private final Counter responseCounter;
+
+ private final MeterView requestRateMeter;
+ private final MeterView requestFailedRateMeter;
+ private final MeterView responseRateMeter;
+
+ private final Map<Integer, Counter> responseCodeCounters = new ConcurrentHashMap<>();
+ private final Map<String, Counter> requestMethodCounter = new ConcurrentHashMap<>();
+
+ public KubernetesClientMetrics(
+ MetricGroup parentGroup, FlinkOperatorConfiguration flinkOperatorConfiguration) {
+ MetricGroup metricGroup = parentGroup.addGroup(KUBE_CLIENT_GROUP);
+
+ this.requestMetricGroup = metricGroup.addGroup(HTTP_REQUEST_GROUP);
+ this.failedRequestMetricGroup = requestMetricGroup.addGroup(HTTP_REQUEST_FAILED_GROUP);
+ this.responseMetricGroup = metricGroup.addGroup(HTTP_RESPONSE_GROUP);
+
+ this.requestCounter = requestMetricGroup.counter(COUNTER);
+ this.failedRequestCounter = failedRequestMetricGroup.counter(COUNTER);
+ this.responseCounter = responseMetricGroup.counter(COUNTER);
+
+ this.requestRateMeter = requestMetricGroup.meter(METER, new MeterView(requestCounter));
+ this.requestFailedRateMeter =
+ failedRequestMetricGroup.meter(METER, new MeterView(failedRequestCounter));
+ this.responseRateMeter = responseMetricGroup.meter(METER, new MeterView(responseCounter));
+
+ this.responseLatency =
+ responseMetricGroup.histogram(
+ HISTO,
+ new DescriptiveStatisticsHistogram(
+ flinkOperatorConfiguration.getMetricsHistogramSampleSize()));
+
+ Executors.newSingleThreadScheduledExecutor()
+ .scheduleAtFixedRate(this::updateMeters, 0, 1, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public Response intercept(Chain chain) throws IOException {
+ Request request = chain.request();
+ updateRequestMetrics(request);
+ Response response = null;
+ final long startTime = System.nanoTime();
+ try {
+ response = chain.proceed(request);
+ return response;
+ } finally {
+ updateResponseMetrics(response, startTime);
+ }
+ }
+
+ private void updateRequestMetrics(Request request) {
+ this.requestRateMeter.markEvent();
+ getCounterByRequestMethod(request.method()).inc();
+ }
+
+ private void updateResponseMetrics(Response response, long startTimeNanos) {
+ final long latency = System.nanoTime() - startTimeNanos;
+ if (response != null) {
+ this.responseRateMeter.markEvent();
+ this.responseLatency.update(latency);
+ getCounterByResponseCode(response.code()).inc();
+ } else {
+ this.requestFailedRateMeter.markEvent();
+ }
+ }
+
+ private Counter getCounterByRequestMethod(String method) {
+ return requestMethodCounter.computeIfAbsent(
+ method, key -> requestMetricGroup.addGroup(key).counter(COUNTER));
+ }
+
+ private Counter getCounterByResponseCode(int code) {
+ return responseCodeCounters.computeIfAbsent(
+ code, key -> responseMetricGroup.addGroup(key).counter(COUNTER));
+ }
+
+ private void updateMeters() {
+ this.requestRateMeter.update();
+ this.requestFailedRateMeter.update();
+ this.responseRateMeter.update();
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
index 924fc4c..ff6bed7 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
@@ -30,8 +30,23 @@ public class KubernetesOperatorMetricOptions {
.withDescription(
"Enable forwarding of Java Operator SDK metrics to the Flink metric registry.");
+ public static final ConfigOption<Boolean> OPERATOR_KUBERNETES_CLIENT_METRICS_ENABLED =
+ ConfigOptions.key("kubernetes.operator.kubernetes.client.metrics.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Enable KubernetesClient metrics for measuring the HTTP traffic to the Kubernetes API Server.");
+
+ public static final ConfigOption<Integer> OPERATOR_METRICS_HISTOGRAM_SAMPLE_SIZE =
+ ConfigOptions.key("kubernetes.operator.metrics.histogram.sample.size")
+ .intType()
+ .defaultValue(1000)
+ .withDescription(
+ "Defines the number of measured samples when calculating statistics.");
+
public static final ConfigOption<String> SCOPE_NAMING_KUBERNETES_OPERATOR =
ConfigOptions.key("metrics.scope.k8soperator.system")
+ .stringType()
.defaultValue("<host>.k8soperator.<namespace>.<name>.system")
.withDeprecatedKeys("metrics.scope.k8soperator")
.withDescription(
@@ -39,12 +54,14 @@ public class KubernetesOperatorMetricOptions {
public static final ConfigOption<String> SCOPE_NAMING_KUBERNETES_OPERATOR_RESOURCENS =
ConfigOptions.key("metrics.scope.k8soperator.resourcens")
+ .stringType()
.defaultValue("<host>.k8soperator.<namespace>.<name>.namespace.<resourcens>")
.withDescription(
"Defines the scope format string that is applied to all metrics scoped to the kubernetes operator resource namespace.");
public static final ConfigOption<String> SCOPE_NAMING_KUBERNETES_OPERATOR_RESOURCE =
ConfigOptions.key("metrics.scope.k8soperator.resource")
+ .stringType()
.defaultValue(
"<host>.k8soperator.<namespace>.<name>.resource.<resourcens>.<resourcename>")
.withDescription(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesClientUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesClientUtils.java
new file mode 100644
index 0000000..46363b5
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesClientUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.metrics.KubernetesClientMetrics;
+import org.apache.flink.metrics.MetricGroup;
+
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory;
+import io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl;
+
+/** Kubernetes client utils. */
+public class KubernetesClientUtils {
+
+ public static KubernetesClient getKubernetesClient(
+ FlinkOperatorConfiguration operatorConfig, MetricGroup metricGroup) {
+ return getKubernetesClient(
+ operatorConfig, metricGroup, new DefaultKubernetesClient().getConfiguration());
+ }
+
+ @VisibleForTesting
+ public static KubernetesClient getKubernetesClient(
+ FlinkOperatorConfiguration operatorConfig,
+ MetricGroup metricGroup,
+ Config kubernetesClientConfig) {
+ var httpClientBuilder =
+ new OkHttpClientFactory()
+ .createHttpClient(kubernetesClientConfig)
+ .getOkHttpClient()
+ .newBuilder();
+ if (operatorConfig.isKubernetesClientMetricsEnabled()) {
+ httpClientBuilder.addInterceptor(
+ new KubernetesClientMetrics(metricGroup, operatorConfig));
+ }
+ return new DefaultKubernetesClient(
+ new OkHttpClientImpl(httpClientBuilder.build()), kubernetesClientConfig);
+ }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsTest.java
new file mode 100644
index 0000000..70ef11d
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
+import org.apache.flink.metrics.testutils.MetricListener;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesClientMetrics.COUNTER;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesClientMetrics.HISTO;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesClientMetrics.HTTP_REQUEST_FAILED_GROUP;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesClientMetrics.HTTP_REQUEST_GROUP;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesClientMetrics.HTTP_RESPONSE_GROUP;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesClientMetrics.KUBE_CLIENT_GROUP;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesClientMetrics.METER;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** {@link KubernetesClientMetrics} tests. */
+@EnableKubernetesMockClient(crud = true)
+@TestMethodOrder(OrderAnnotation.class)
+public class KubernetesClientMetricsTest {
+ private KubernetesMockServer mockServer;
+ private final MetricListener listener = new MetricListener();
+
+ private static final String REQUEST_COUNTER_ID =
+ String.join(".", KUBE_CLIENT_GROUP, HTTP_REQUEST_GROUP, COUNTER);
+ private static final String REQUEST_METER_ID =
+ String.join(".", KUBE_CLIENT_GROUP, HTTP_REQUEST_GROUP, METER);
+ private static final String REQUEST_FAILED_METER_ID =
+ String.join(
+ ".", KUBE_CLIENT_GROUP, HTTP_REQUEST_GROUP, HTTP_REQUEST_FAILED_GROUP, METER);
+ private static final String REQUEST_POST_COUNTER_ID =
+ String.join(".", KUBE_CLIENT_GROUP, HTTP_REQUEST_GROUP, "POST", COUNTER);
+ private static final String REQUEST_DELETE_COUNTER_ID =
+ String.join(".", KUBE_CLIENT_GROUP, HTTP_REQUEST_GROUP, "DELETE", COUNTER);
+ private static final String REQUEST_FAILED_COUNTER_ID =
+ String.join(
+ ".", KUBE_CLIENT_GROUP, HTTP_REQUEST_GROUP, HTTP_REQUEST_FAILED_GROUP, COUNTER);
+ private static final String RESPONSE_COUNTER_ID =
+ String.join(".", KUBE_CLIENT_GROUP, HTTP_RESPONSE_GROUP, COUNTER);
+ private static final String RESPONSE_METER_ID =
+ String.join(".", KUBE_CLIENT_GROUP, HTTP_RESPONSE_GROUP, METER);
+ private static final String RESPONSE_200_COUNTER_ID =
+ String.join(".", KUBE_CLIENT_GROUP, HTTP_RESPONSE_GROUP, "200", COUNTER);
+ private static final String RESPONSE_404_COUNTER_ID =
+ String.join(".", KUBE_CLIENT_GROUP, HTTP_RESPONSE_GROUP, "404", COUNTER);
+ private static final String RESPONSE_LATENCY_ID =
+ String.join(".", KUBE_CLIENT_GROUP, HTTP_RESPONSE_GROUP, HISTO);
+
+ @Test
+ @Order(1)
+ public void testMetricsDisabled() {
+ var deployment = TestUtils.buildApplicationCluster();
+ KubernetesClient noMetricsClient = getKubernetesClient(false);
+ noMetricsClient.resource(deployment).get();
+ assertFalse(listener.getCounter(REQUEST_COUNTER_ID).isPresent());
+ assertFalse(listener.getMeter(REQUEST_METER_ID).isPresent());
+ assertFalse(listener.getCounter(REQUEST_FAILED_COUNTER_ID).isPresent());
+ assertFalse(listener.getMeter(REQUEST_FAILED_METER_ID).isPresent());
+ assertFalse(listener.getCounter(RESPONSE_COUNTER_ID).isPresent());
+ assertFalse(listener.getMeter(RESPONSE_METER_ID).isPresent());
+ assertFalse(listener.getHistogram(RESPONSE_LATENCY_ID).isPresent());
+ assertFalse(listener.getHistogram(RESPONSE_LATENCY_ID).isPresent());
+ }
+
+ @Test
+ @Order(2)
+ public void testMetricsEnabled() {
+ KubernetesClient kubernetesClient = getKubernetesClient(true);
+ var deployment = TestUtils.buildApplicationCluster();
+ assertEquals(0, listener.getCounter(REQUEST_COUNTER_ID).get().getCount());
+ assertEquals(0.0, listener.getMeter(REQUEST_METER_ID).get().getRate());
+ assertEquals(0, listener.getCounter(REQUEST_FAILED_COUNTER_ID).get().getCount());
+ assertEquals(0.0, listener.getMeter(REQUEST_FAILED_METER_ID).get().getRate());
+ assertEquals(0, listener.getCounter(RESPONSE_COUNTER_ID).get().getCount());
+ assertEquals(0.0, listener.getMeter(RESPONSE_METER_ID).get().getRate());
+ assertEquals(0, listener.getHistogram(RESPONSE_LATENCY_ID).get().getStatistics().getMin());
+ assertEquals(0, listener.getHistogram(RESPONSE_LATENCY_ID).get().getStatistics().getMax());
+
+ kubernetesClient.resource(deployment).createOrReplace();
+ assertEquals(1, listener.getCounter(REQUEST_COUNTER_ID).get().getCount());
+ assertEquals(1, listener.getCounter(REQUEST_POST_COUNTER_ID).get().getCount());
+ assertEquals(1, listener.getCounter(RESPONSE_COUNTER_ID).get().getCount());
+ assertEquals(1, listener.getCounter(RESPONSE_200_COUNTER_ID).get().getCount());
+ assertTrue(listener.getHistogram(RESPONSE_LATENCY_ID).get().getStatistics().getMin() > 0);
+ assertTrue(listener.getHistogram(RESPONSE_LATENCY_ID).get().getStatistics().getMax() > 0);
+
+ kubernetesClient.resource(deployment).delete();
+ assertEquals(1, listener.getCounter(REQUEST_DELETE_COUNTER_ID).get().getCount());
+
+ kubernetesClient.resource(deployment).delete();
+ assertEquals(2, listener.getCounter(REQUEST_DELETE_COUNTER_ID).get().getCount());
+ assertEquals(1, listener.getCounter(RESPONSE_404_COUNTER_ID).get().getCount());
+ Awaitility.await()
+ .atMost(1, TimeUnit.MINUTES)
+ .until(
+ () -> {
+ kubernetesClient.resource(deployment).createOrReplace();
+ return listener.getMeter(REQUEST_METER_ID).get().getRate() > 0.1
+ && listener.getMeter(RESPONSE_METER_ID).get().getRate() > 0.1;
+ });
+ }
+
+ @Test
+ @Order(3)
+ public void testAPIServerIsDown() {
+ var deployment = TestUtils.buildApplicationCluster();
+ KubernetesClient kubernetesClient = getKubernetesClient(true);
+ mockServer.shutdown();
+ assertEquals(0, listener.getCounter(REQUEST_FAILED_COUNTER_ID).get().getCount());
+ assertEquals(0.0, listener.getMeter(REQUEST_FAILED_METER_ID).get().getRate());
+ Awaitility.await()
+ .atMost(1, TimeUnit.MINUTES)
+ .until(
+ () -> {
+ assertThrows(
+ KubernetesClientException.class,
+ () -> kubernetesClient.resource(deployment).createOrReplace());
+ return listener.getCounter(REQUEST_FAILED_COUNTER_ID).get().getCount()
+ > 0
+ && listener.getMeter(REQUEST_FAILED_METER_ID).get().getRate()
+ > 0.1;
+ });
+ }
+
+ private KubernetesClient getKubernetesClient(boolean enableMetrics) {
+ var configuration = new Configuration();
+ configuration.setBoolean(
+ KubernetesOperatorMetricOptions.OPERATOR_KUBERNETES_CLIENT_METRICS_ENABLED,
+ enableMetrics);
+ var configManager = new FlinkConfigManager(configuration);
+ return KubernetesClientUtils.getKubernetesClient(
+ configManager.getOperatorConfiguration(),
+ listener.getMetricGroup(),
+ mockServer.createClient().getConfiguration());
+ }
+}