You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/07/12 13:21:28 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-28476] Add metrics for Kubernetes API server access

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new e8f9e03  [FLINK-28476] Add metrics for Kubernetes API server access
e8f9e03 is described below

commit e8f9e03396255efa3fab40549389a6969141d36d
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Sat Jul 9 18:15:11 2022 +0200

    [FLINK-28476] Add metrics for Kubernetes API server access
---
 docs/content/docs/operations/metrics-logging.md    |  16 ++
 .../kubernetes_operator_metric_configuration.html  |  12 ++
 .../flink/kubernetes/operator/FlinkOperator.java   |   6 +-
 .../config/FlinkOperatorConfiguration.java         |  12 ++
 .../operator/metrics/KubernetesClientMetrics.java  | 137 +++++++++++++++++
 .../metrics/KubernetesOperatorMetricOptions.java   |  17 +++
 .../operator/utils/KubernetesClientUtils.java      |  58 +++++++
 .../metrics/KubernetesClientMetricsTest.java       | 170 +++++++++++++++++++++
 8 files changed, 426 insertions(+), 2 deletions(-)

diff --git a/docs/content/docs/operations/metrics-logging.md b/docs/content/docs/operations/metrics-logging.md
index f6a7a9a..4769381 100644
--- a/docs/content/docs/operations/metrics-logging.md
+++ b/docs/content/docs/operations/metrics-logging.md
@@ -37,6 +37,22 @@ The Operator gathers aggregates metrics about managed resources.
 | Namespace | FlinkDeployment.JmDeploymentStatus.<Status>.Count | Number of managed FlinkDeployment resources per <Status> per namespace. <Status> can take values from: READY, DEPLOYED_NOT_READY, DEPLOYING, MISSING, ERROR | Gauge |
 | Namespace | FlinkSessionJob.Count                             | Number of managed FlinkSessionJob instances per namespace                                                                                                   | Gauge |
 
+## Kubernetes Client Metrics
+
+The Operator gathers various metrics related to Kubernetes API server access. The Kubernetes client metrics can be enabled by the configuration `kubernetes.operator.kubernetes.client.metrics.enabled` (default: `true`).
+
+| Scope  | Metrics                                      | Description                                                                                                                                            | Type      |
+|--------|----------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------|-----------|
+| System | KubeClient.HttpRequest.Count                 | Number of HTTP request sent to the Kubernetes API Server                                                                                               | Counter   |
+| System | KubeClient.HttpRequest.<RequestMethod>.Count | Number of HTTP request sent to the Kubernetes API Server per request method. <RequestMethod> can take values from: GET, POST, PUT, PATCH, DELETE, etc. | Counter   |
+| System | KubeClient.HttpRequest.Failed.Count          | Number of failed HTTP requests that has no response from the Kubernetes API Server                                                                     | Counter   |
+| System | KubeClient.HttpResponse.Count                | Number of HTTP responses received from the Kubernetes API Server                                                                                       | Counter   |
+| System | KubeClient.HttpResponse.<ResponseCode>.Count | Number of HTTP responses received from the Kubernetes API Server per response code. <ResponseCode> can take values from: 200, 404, 503, etc.           | Counter   |
+| System | KubeClient.HttpRequest.NumPerSecond          | Number of HTTP requests sent to the Kubernetes API Server per second                                                                                   | Meter     |
+| System | KubeClient.HttpRequest.Failed.NumPerSecond   | Number of failed HTTP requests sent to the Kubernetes API Server per second                                                                            | Meter     |
+| System | KubeClient.HttpResponse.NumPerSecond         | Number of HTTP responses received from the Kubernetes API Server per second                                                                            | Meter     |
+| System | KubeClient.HttpResponse.TimeNanos            | Latency statistics obtained from the HTTP responses received from the Kubernetes API Server                                                            | Histogram |
+
 ## System Metrics
 The Operator gathers metrics about the JVM process and exposes it similarly to core Flink [System metrics](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#system-metrics). The list of metrics are not repeated in this document.
 
diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
index 37ee840..22fdd1d 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
@@ -14,6 +14,18 @@
             <td>Boolean</td>
             <td>Enable forwarding of Java Operator SDK metrics to the Flink metric registry.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.kubernetes.client.metrics.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Enable KubernetesClient metrics for measuring the HTTP traffic to the Kubernetes API Server.</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.metrics.histogram.sample.size</h5></td>
+            <td style="word-wrap: break-word;">1000</td>
+            <td>Integer</td>
+            <td>Defines the number of measured samples when calculating statistics.</td>
+        </tr>
         <tr>
             <td><h5>metrics.scope.k8soperator.resource</h5></td>
             <td style="word-wrap: break-word;">"&lt;host&gt;.k8soperator.&lt;namespace&gt;.&lt;name&gt;.resource.&lt;resourcens&gt;.&lt;resourcename&gt;"</td>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 3324478..996e67b 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -39,11 +39,11 @@ import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReco
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EnvUtils;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
 import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
 
-import io.fabric8.kubernetes.client.DefaultKubernetesClient;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.Operator;
 import io.javaoperatorsdk.operator.RegisteredController;
@@ -75,13 +75,15 @@ public class FlinkOperator {
     private final Collection<FlinkResourceListener> listeners;
 
     public FlinkOperator(@Nullable Configuration conf) {
-        this.client = new DefaultKubernetesClient();
         this.configManager =
                 conf != null
                         ? new FlinkConfigManager(conf) // For testing only
                         : new FlinkConfigManager(this::handleNamespaceChanges);
         this.metricGroup =
                 OperatorMetricUtils.initOperatorMetrics(configManager.getDefaultConfig());
+        this.client =
+                KubernetesClientUtils.getKubernetesClient(
+                        configManager.getOperatorConfiguration(), this.metricGroup);
         this.operator = new Operator(client, this::overrideOperatorConfigs);
         this.flinkService = new FlinkService(client, configManager);
         this.validators = ValidatorUtils.discoverValidators(configManager);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index 8983b15..45b9507 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -45,6 +45,8 @@ public class FlinkOperatorConfiguration {
     Set<String> watchedNamespaces;
     boolean dynamicNamespacesEnabled;
     boolean josdkMetricsEnabled;
+    int metricsHistogramSampleSize;
+    boolean kubernetesClientMetricsEnabled;
     Duration flinkCancelJobTimeout;
     Duration flinkShutdownClusterTimeout;
     String artifactsBaseDir;
@@ -113,6 +115,14 @@ public class FlinkOperatorConfiguration {
         boolean josdkMetricsEnabled =
                 operatorConfig.get(KubernetesOperatorMetricOptions.OPERATOR_JOSDK_METRICS_ENABLED);
 
+        boolean kubernetesClientMetricsEnabled =
+                operatorConfig.get(
+                        KubernetesOperatorMetricOptions.OPERATOR_KUBERNETES_CLIENT_METRICS_ENABLED);
+
+        int metricsHistogramSampleSize =
+                operatorConfig.get(
+                        KubernetesOperatorMetricOptions.OPERATOR_METRICS_HISTOGRAM_SAMPLE_SIZE);
+
         RetryConfiguration retryConfiguration = new FlinkOperatorRetryConfiguration(operatorConfig);
 
         return new FlinkOperatorConfiguration(
@@ -125,6 +135,8 @@ public class FlinkOperatorConfiguration {
                 watchedNamespaces,
                 dynamicNamespacesEnabled,
                 josdkMetricsEnabled,
+                metricsHistogramSampleSize,
+                kubernetesClientMetricsEnabled,
                 flinkCancelJobTimeout,
                 flinkShutdownClusterTimeout,
                 artifactsBaseDir,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetrics.java
new file mode 100644
index 0000000..1a9733d
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetrics.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
+
+import okhttp3.Interceptor;
+import okhttp3.Request;
+import okhttp3.Response;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/** Kubernetes client metrics. */
+public class KubernetesClientMetrics implements Interceptor {
+
+    public static final String KUBE_CLIENT_GROUP = "KubeClient";
+    public static final String HTTP_REQUEST_GROUP = "HttpRequest";
+    public static final String HTTP_REQUEST_FAILED_GROUP = "Failed";
+    public static final String HTTP_RESPONSE_GROUP = "HttpResponse";
+    public static final String COUNTER = "Count";
+    public static final String METER = "NumPerSecond";
+    public static final String HISTO = "TimeNanos";
+    private final Histogram responseLatency;
+
+    private final MetricGroup requestMetricGroup;
+    private final MetricGroup failedRequestMetricGroup;
+    private final MetricGroup responseMetricGroup;
+
+    private final Counter requestCounter;
+    private final Counter failedRequestCounter;
+    private final Counter responseCounter;
+
+    private final MeterView requestRateMeter;
+    private final MeterView requestFailedRateMeter;
+    private final MeterView responseRateMeter;
+
+    private final Map<Integer, Counter> responseCodeCounters = new ConcurrentHashMap<>();
+    private final Map<String, Counter> requestMethodCounter = new ConcurrentHashMap<>();
+
+    public KubernetesClientMetrics(
+            MetricGroup parentGroup, FlinkOperatorConfiguration flinkOperatorConfiguration) {
+        MetricGroup metricGroup = parentGroup.addGroup(KUBE_CLIENT_GROUP);
+
+        this.requestMetricGroup = metricGroup.addGroup(HTTP_REQUEST_GROUP);
+        this.failedRequestMetricGroup = requestMetricGroup.addGroup(HTTP_REQUEST_FAILED_GROUP);
+        this.responseMetricGroup = metricGroup.addGroup(HTTP_RESPONSE_GROUP);
+
+        this.requestCounter = requestMetricGroup.counter(COUNTER);
+        this.failedRequestCounter = failedRequestMetricGroup.counter(COUNTER);
+        this.responseCounter = responseMetricGroup.counter(COUNTER);
+
+        this.requestRateMeter = requestMetricGroup.meter(METER, new MeterView(requestCounter));
+        this.requestFailedRateMeter =
+                failedRequestMetricGroup.meter(METER, new MeterView(failedRequestCounter));
+        this.responseRateMeter = responseMetricGroup.meter(METER, new MeterView(responseCounter));
+
+        this.responseLatency =
+                responseMetricGroup.histogram(
+                        HISTO,
+                        new DescriptiveStatisticsHistogram(
+                                flinkOperatorConfiguration.getMetricsHistogramSampleSize()));
+
+        Executors.newSingleThreadScheduledExecutor()
+                .scheduleAtFixedRate(this::updateMeters, 0, 1, TimeUnit.SECONDS);
+    }
+
+    @Override
+    public Response intercept(Chain chain) throws IOException {
+        Request request = chain.request();
+        updateRequestMetrics(request);
+        Response response = null;
+        final long startTime = System.nanoTime();
+        try {
+            response = chain.proceed(request);
+            return response;
+        } finally {
+            updateResponseMetrics(response, startTime);
+        }
+    }
+
+    private void updateRequestMetrics(Request request) {
+        this.requestRateMeter.markEvent();
+        getCounterByRequestMethod(request.method()).inc();
+    }
+
+    private void updateResponseMetrics(Response response, long startTimeNanos) {
+        final long latency = System.nanoTime() - startTimeNanos;
+        if (response != null) {
+            this.responseRateMeter.markEvent();
+            this.responseLatency.update(latency);
+            getCounterByResponseCode(response.code()).inc();
+        } else {
+            this.requestFailedRateMeter.markEvent();
+        }
+    }
+
+    private Counter getCounterByRequestMethod(String method) {
+        return requestMethodCounter.computeIfAbsent(
+                method, key -> requestMetricGroup.addGroup(key).counter(COUNTER));
+    }
+
+    private Counter getCounterByResponseCode(int code) {
+        return responseCodeCounters.computeIfAbsent(
+                code, key -> responseMetricGroup.addGroup(key).counter(COUNTER));
+    }
+
+    private void updateMeters() {
+        this.requestRateMeter.update();
+        this.requestFailedRateMeter.update();
+        this.responseRateMeter.update();
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
index 924fc4c..ff6bed7 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
@@ -30,8 +30,23 @@ public class KubernetesOperatorMetricOptions {
                     .withDescription(
                             "Enable forwarding of Java Operator SDK metrics to the Flink metric registry.");
 
+    public static final ConfigOption<Boolean> OPERATOR_KUBERNETES_CLIENT_METRICS_ENABLED =
+            ConfigOptions.key("kubernetes.operator.kubernetes.client.metrics.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Enable KubernetesClient metrics for measuring the HTTP traffic to the Kubernetes API Server.");
+
+    public static final ConfigOption<Integer> OPERATOR_METRICS_HISTOGRAM_SAMPLE_SIZE =
+            ConfigOptions.key("kubernetes.operator.metrics.histogram.sample.size")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "Defines the number of measured samples when calculating statistics.");
+
     public static final ConfigOption<String> SCOPE_NAMING_KUBERNETES_OPERATOR =
             ConfigOptions.key("metrics.scope.k8soperator.system")
+                    .stringType()
                     .defaultValue("<host>.k8soperator.<namespace>.<name>.system")
                     .withDeprecatedKeys("metrics.scope.k8soperator")
                     .withDescription(
@@ -39,12 +54,14 @@ public class KubernetesOperatorMetricOptions {
 
     public static final ConfigOption<String> SCOPE_NAMING_KUBERNETES_OPERATOR_RESOURCENS =
             ConfigOptions.key("metrics.scope.k8soperator.resourcens")
+                    .stringType()
                     .defaultValue("<host>.k8soperator.<namespace>.<name>.namespace.<resourcens>")
                     .withDescription(
                             "Defines the scope format string that is applied to all metrics scoped to the kubernetes operator resource namespace.");
 
     public static final ConfigOption<String> SCOPE_NAMING_KUBERNETES_OPERATOR_RESOURCE =
             ConfigOptions.key("metrics.scope.k8soperator.resource")
+                    .stringType()
                     .defaultValue(
                             "<host>.k8soperator.<namespace>.<name>.resource.<resourcens>.<resourcename>")
                     .withDescription(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesClientUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesClientUtils.java
new file mode 100644
index 0000000..46363b5
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesClientUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.metrics.KubernetesClientMetrics;
+import org.apache.flink.metrics.MetricGroup;
+
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory;
+import io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl;
+
+/** Kubernetes client utils. */
+public class KubernetesClientUtils {
+
+    public static KubernetesClient getKubernetesClient(
+            FlinkOperatorConfiguration operatorConfig, MetricGroup metricGroup) {
+        return getKubernetesClient(
+                operatorConfig, metricGroup, new DefaultKubernetesClient().getConfiguration());
+    }
+
+    @VisibleForTesting
+    public static KubernetesClient getKubernetesClient(
+            FlinkOperatorConfiguration operatorConfig,
+            MetricGroup metricGroup,
+            Config kubernetesClientConfig) {
+        var httpClientBuilder =
+                new OkHttpClientFactory()
+                        .createHttpClient(kubernetesClientConfig)
+                        .getOkHttpClient()
+                        .newBuilder();
+        if (operatorConfig.isKubernetesClientMetricsEnabled()) {
+            httpClientBuilder.addInterceptor(
+                    new KubernetesClientMetrics(metricGroup, operatorConfig));
+        }
+        return new DefaultKubernetesClient(
+                new OkHttpClientImpl(httpClientBuilder.build()), kubernetesClientConfig);
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsTest.java
new file mode 100644
index 0000000..70ef11d
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
+import org.apache.flink.metrics.testutils.MetricListener;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesClientMetrics.COUNTER;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesClientMetrics.HISTO;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesClientMetrics.HTTP_REQUEST_FAILED_GROUP;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesClientMetrics.HTTP_REQUEST_GROUP;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesClientMetrics.HTTP_RESPONSE_GROUP;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesClientMetrics.KUBE_CLIENT_GROUP;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesClientMetrics.METER;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** {@link KubernetesClientMetrics} tests. */
+@EnableKubernetesMockClient(crud = true)
+@TestMethodOrder(OrderAnnotation.class)
+public class KubernetesClientMetricsTest {
+    private KubernetesMockServer mockServer;
+    private final MetricListener listener = new MetricListener();
+
+    private static final String REQUEST_COUNTER_ID =
+            String.join(".", KUBE_CLIENT_GROUP, HTTP_REQUEST_GROUP, COUNTER);
+    private static final String REQUEST_METER_ID =
+            String.join(".", KUBE_CLIENT_GROUP, HTTP_REQUEST_GROUP, METER);
+    private static final String REQUEST_FAILED_METER_ID =
+            String.join(
+                    ".", KUBE_CLIENT_GROUP, HTTP_REQUEST_GROUP, HTTP_REQUEST_FAILED_GROUP, METER);
+    private static final String REQUEST_POST_COUNTER_ID =
+            String.join(".", KUBE_CLIENT_GROUP, HTTP_REQUEST_GROUP, "POST", COUNTER);
+    private static final String REQUEST_DELETE_COUNTER_ID =
+            String.join(".", KUBE_CLIENT_GROUP, HTTP_REQUEST_GROUP, "DELETE", COUNTER);
+    private static final String REQUEST_FAILED_COUNTER_ID =
+            String.join(
+                    ".", KUBE_CLIENT_GROUP, HTTP_REQUEST_GROUP, HTTP_REQUEST_FAILED_GROUP, COUNTER);
+    private static final String RESPONSE_COUNTER_ID =
+            String.join(".", KUBE_CLIENT_GROUP, HTTP_RESPONSE_GROUP, COUNTER);
+    private static final String RESPONSE_METER_ID =
+            String.join(".", KUBE_CLIENT_GROUP, HTTP_RESPONSE_GROUP, METER);
+    private static final String RESPONSE_200_COUNTER_ID =
+            String.join(".", KUBE_CLIENT_GROUP, HTTP_RESPONSE_GROUP, "200", COUNTER);
+    private static final String RESPONSE_404_COUNTER_ID =
+            String.join(".", KUBE_CLIENT_GROUP, HTTP_RESPONSE_GROUP, "404", COUNTER);
+    private static final String RESPONSE_LATENCY_ID =
+            String.join(".", KUBE_CLIENT_GROUP, HTTP_RESPONSE_GROUP, HISTO);
+
+    @Test
+    @Order(1)
+    public void testMetricsDisabled() {
+        var deployment = TestUtils.buildApplicationCluster();
+        KubernetesClient noMetricsClient = getKubernetesClient(false);
+        noMetricsClient.resource(deployment).get();
+        assertFalse(listener.getCounter(REQUEST_COUNTER_ID).isPresent());
+        assertFalse(listener.getMeter(REQUEST_METER_ID).isPresent());
+        assertFalse(listener.getCounter(REQUEST_FAILED_COUNTER_ID).isPresent());
+        assertFalse(listener.getMeter(REQUEST_FAILED_METER_ID).isPresent());
+        assertFalse(listener.getCounter(RESPONSE_COUNTER_ID).isPresent());
+        assertFalse(listener.getMeter(RESPONSE_METER_ID).isPresent());
+        assertFalse(listener.getHistogram(RESPONSE_LATENCY_ID).isPresent());
+        assertFalse(listener.getHistogram(RESPONSE_LATENCY_ID).isPresent());
+    }
+
+    @Test
+    @Order(2)
+    public void testMetricsEnabled() {
+        KubernetesClient kubernetesClient = getKubernetesClient(true);
+        var deployment = TestUtils.buildApplicationCluster();
+        assertEquals(0, listener.getCounter(REQUEST_COUNTER_ID).get().getCount());
+        assertEquals(0.0, listener.getMeter(REQUEST_METER_ID).get().getRate());
+        assertEquals(0, listener.getCounter(REQUEST_FAILED_COUNTER_ID).get().getCount());
+        assertEquals(0.0, listener.getMeter(REQUEST_FAILED_METER_ID).get().getRate());
+        assertEquals(0, listener.getCounter(RESPONSE_COUNTER_ID).get().getCount());
+        assertEquals(0.0, listener.getMeter(RESPONSE_METER_ID).get().getRate());
+        assertEquals(0, listener.getHistogram(RESPONSE_LATENCY_ID).get().getStatistics().getMin());
+        assertEquals(0, listener.getHistogram(RESPONSE_LATENCY_ID).get().getStatistics().getMax());
+
+        kubernetesClient.resource(deployment).createOrReplace();
+        assertEquals(1, listener.getCounter(REQUEST_COUNTER_ID).get().getCount());
+        assertEquals(1, listener.getCounter(REQUEST_POST_COUNTER_ID).get().getCount());
+        assertEquals(1, listener.getCounter(RESPONSE_COUNTER_ID).get().getCount());
+        assertEquals(1, listener.getCounter(RESPONSE_200_COUNTER_ID).get().getCount());
+        assertTrue(listener.getHistogram(RESPONSE_LATENCY_ID).get().getStatistics().getMin() > 0);
+        assertTrue(listener.getHistogram(RESPONSE_LATENCY_ID).get().getStatistics().getMax() > 0);
+
+        kubernetesClient.resource(deployment).delete();
+        assertEquals(1, listener.getCounter(REQUEST_DELETE_COUNTER_ID).get().getCount());
+
+        kubernetesClient.resource(deployment).delete();
+        assertEquals(2, listener.getCounter(REQUEST_DELETE_COUNTER_ID).get().getCount());
+        assertEquals(1, listener.getCounter(RESPONSE_404_COUNTER_ID).get().getCount());
+        Awaitility.await()
+                .atMost(1, TimeUnit.MINUTES)
+                .until(
+                        () -> {
+                            kubernetesClient.resource(deployment).createOrReplace();
+                            return listener.getMeter(REQUEST_METER_ID).get().getRate() > 0.1
+                                    && listener.getMeter(RESPONSE_METER_ID).get().getRate() > 0.1;
+                        });
+    }
+
+    @Test
+    @Order(3)
+    public void testAPIServerIsDown() {
+        var deployment = TestUtils.buildApplicationCluster();
+        KubernetesClient kubernetesClient = getKubernetesClient(true);
+        mockServer.shutdown();
+        assertEquals(0, listener.getCounter(REQUEST_FAILED_COUNTER_ID).get().getCount());
+        assertEquals(0.0, listener.getMeter(REQUEST_FAILED_METER_ID).get().getRate());
+        Awaitility.await()
+                .atMost(1, TimeUnit.MINUTES)
+                .until(
+                        () -> {
+                            assertThrows(
+                                    KubernetesClientException.class,
+                                    () -> kubernetesClient.resource(deployment).createOrReplace());
+                            return listener.getCounter(REQUEST_FAILED_COUNTER_ID).get().getCount()
+                                            > 0
+                                    && listener.getMeter(REQUEST_FAILED_METER_ID).get().getRate()
+                                            > 0.1;
+                        });
+    }
+
+    private KubernetesClient getKubernetesClient(boolean enableMetrics) {
+        var configuration = new Configuration();
+        configuration.setBoolean(
+                KubernetesOperatorMetricOptions.OPERATOR_KUBERNETES_CLIENT_METRICS_ENABLED,
+                enableMetrics);
+        var configManager = new FlinkConfigManager(configuration);
+        return KubernetesClientUtils.getKubernetesClient(
+                configManager.getOperatorConfiguration(),
+                listener.getMetricGroup(),
+                mockServer.createClient().getConfiguration());
+    }
+}