You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/08/18 13:26:04 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-28218] Add liveness probe to k8s operator

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 199ed194 [FLINK-28218] Add liveness probe to k8s operator
199ed194 is described below

commit 199ed194f81502d3e146265c1694eddaf07e838e
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Aug 18 15:25:59 2022 +0200

    [FLINK-28218] Add liveness probe to k8s operator
---
 docs/content/docs/operations/helm.md               |  3 +
 .../kubernetes_operator_config_configuration.html  | 12 ++++
 .../generated/system_advanced_section.html         | 18 +++++
 .../shortcodes/generated/system_section.html       |  6 --
 .../flink/kubernetes/operator/FlinkOperator.java   | 17 +++--
 .../config/KubernetesOperatorConfigOptions.java    | 16 ++++-
 .../controller/FlinkDeploymentController.java      |  1 +
 .../kubernetes/operator/health/HealthProbe.java    | 35 ++++++++++
 .../kubernetes/operator/health/HttpBootstrap.java  | 81 ++++++++++++++++++++++
 .../operator/health/OperatorHealthHandler.java     | 75 ++++++++++++++++++++
 .../operator/health/OperatorHealthService.java     | 68 ++++++++++++++++++
 .../operator/health/OperatorHealthServiceTest.java | 64 +++++++++++++++++
 .../templates/flink-operator.yaml                  | 25 ++++++-
 helm/flink-kubernetes-operator/values.yaml         | 10 +++
 14 files changed, 418 insertions(+), 13 deletions(-)

diff --git a/docs/content/docs/operations/helm.md b/docs/content/docs/operations/helm.md
index 49378b84..05d1ccf4 100644
--- a/docs/content/docs/operations/helm.md
+++ b/docs/content/docs/operations/helm.md
@@ -96,6 +96,9 @@ The configurable parameters of the Helm chart and which default values as detail
 | fullnameOverride | Overrides the fullname with the specified full name. | |
 | jvmArgs.webhook | The JVM start up options for webhook. | |
 | jvmArgs.operator |  The JVM start up options for operator. | |
+| operatorHealth.port |  Operator health endpoint port to be used by the probes. | 8085 |
+| operatorHealth.livenessProbe |  Liveness probe configuration for the operator using the health endpoint. Only time settings should be configured, endpoint is set automatically based on port. | |
+| operatorHealth.startupProbe |  Startup probe configuration for the operator using the health endpoint. Only time settings should be configured, endpoint is set automatically based on port. | |
 
 For more information check the [Helm documentation](https://helm.sh/docs/helm/helm_install/).
 
diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index aaf1e936..f00a1204 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -62,6 +62,18 @@
             <td>Duration</td>
             <td>The timeout for the observer to wait the flink rest client to return.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.health.probe.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Enables health probe for the kubernetes operator.</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.health.probe.port</h5></td>
+            <td style="word-wrap: break-word;">8085</td>
+            <td>Integer</td>
+            <td>The port the health probe will use to expose the status.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.jm-deployment-recovery.enabled</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git a/docs/layouts/shortcodes/generated/system_advanced_section.html b/docs/layouts/shortcodes/generated/system_advanced_section.html
index 72d525da..a47d7df2 100644
--- a/docs/layouts/shortcodes/generated/system_advanced_section.html
+++ b/docs/layouts/shortcodes/generated/system_advanced_section.html
@@ -32,6 +32,24 @@
             <td>Boolean</td>
             <td>Whether to enable on-the-fly config changes through the operator configmap.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.health.probe.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Enables health probe for the kubernetes operator.</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.health.probe.port</h5></td>
+            <td style="word-wrap: break-word;">8085</td>
+            <td>Integer</td>
+            <td>The port the health probe will use to expose the status.</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.label.selector</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Label selector of the custom resources to be watched. Please see https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors for the format supported.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.observer.progress-check.interval</h5></td>
             <td style="word-wrap: break-word;">10 s</td>
diff --git a/docs/layouts/shortcodes/generated/system_section.html b/docs/layouts/shortcodes/generated/system_section.html
index d6b8cab8..a4428aa7 100644
--- a/docs/layouts/shortcodes/generated/system_section.html
+++ b/docs/layouts/shortcodes/generated/system_section.html
@@ -26,12 +26,6 @@
             <td>Duration</td>
             <td>The timeout for the observer to wait the flink rest client to return.</td>
         </tr>
-        <tr>
-            <td><h5>kubernetes.operator.label.selector</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
-            <td>String</td>
-            <td>Label selector of the custom resources to be watched. Please see https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors for the format supported.</td>
-        </tr>
         <tr>
             <td><h5>kubernetes.operator.reconcile.interval</h5></td>
             <td style="word-wrap: break-word;">1 min</td>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 202ccdeb..d4a97124 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
 import org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController;
+import org.apache.flink.kubernetes.operator.health.OperatorHealthService;
 import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
 import org.apache.flink.kubernetes.operator.listener.ListenerUtils;
 import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
@@ -72,14 +73,16 @@ public class FlinkOperator {
     @VisibleForTesting final Set<RegisteredController<?>> registeredControllers = new HashSet<>();
     private final KubernetesOperatorMetricGroup metricGroup;
     private final Collection<FlinkResourceListener> listeners;
+    private final OperatorHealthService operatorHealthService;
 
     public FlinkOperator(@Nullable Configuration conf) {
         this.configManager =
                 conf != null
                         ? new FlinkConfigManager(conf) // For testing only
                         : new FlinkConfigManager(this::handleNamespaceChanges);
-        this.metricGroup =
-                OperatorMetricUtils.initOperatorMetrics(configManager.getDefaultConfig());
+
+        var defaultConfig = configManager.getDefaultConfig();
+        this.metricGroup = OperatorMetricUtils.initOperatorMetrics(defaultConfig);
         this.client =
                 KubernetesClientUtils.getKubernetesClient(
                         configManager.getOperatorConfiguration(), this.metricGroup);
@@ -87,9 +90,9 @@ public class FlinkOperator {
         this.flinkServiceFactory = new FlinkServiceFactory(client, configManager);
         this.validators = ValidatorUtils.discoverValidators(configManager);
         this.listeners = ListenerUtils.discoverListeners(configManager);
-        PluginManager pluginManager =
-                PluginUtils.createPluginManagerFromRootFolder(configManager.getDefaultConfig());
-        FileSystem.initialize(configManager.getDefaultConfig(), pluginManager);
+        PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(defaultConfig);
+        FileSystem.initialize(defaultConfig, pluginManager);
+        this.operatorHealthService = OperatorHealthService.fromConfig(configManager);
     }
 
     private void handleNamespaceChanges(Set<String> namespaces) {
@@ -183,6 +186,10 @@ public class FlinkOperator {
         registerSessionJobController();
         operator.installShutdownHook();
         operator.start();
+        if (operatorHealthService != null) {
+            Runtime.getRuntime().addShutdownHook(new Thread(operatorHealthService::stop));
+            operatorHealthService.start();
+        }
     }
 
     public static void main(String... args) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index 11f84b7c..5736c540 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -234,7 +234,7 @@ public class KubernetesOperatorConfigOptions {
                     .withDescription(
                             "Comma separated list of namespaces the operator monitors for custom resources.");
 
-    @Documentation.Section(SECTION_SYSTEM)
+    @Documentation.Section(SECTION_ADVANCED)
     public static final ConfigOption<String> OPERATOR_LABEL_SELECTOR =
             ConfigOptions.key("kubernetes.operator.label.selector")
                     .stringType()
@@ -289,4 +289,18 @@ public class KubernetesOperatorConfigOptions {
                     .defaultValue(SavepointFormatType.DEFAULT)
                     .withDescription(
                             "Type of the binary format in which a savepoint should be taken.");
+
+    @Documentation.Section(SECTION_ADVANCED)
+    public static final ConfigOption<Boolean> OPERATOR_HEALTH_PROBE_ENABLED =
+            ConfigOptions.key("kubernetes.operator.health.probe.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Enables health probe for the kubernetes operator.");
+
+    @Documentation.Section(SECTION_ADVANCED)
+    public static final ConfigOption<Integer> OPERATOR_HEALTH_PROBE_PORT =
+            ConfigOptions.key("kubernetes.operator.health.probe.port")
+                    .intType()
+                    .defaultValue(8085)
+                    .withDescription("The port the health probe will use to expose the status.");
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 48fad9d7..a0b0a161 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -98,6 +98,7 @@ public class FlinkDeploymentController
     @Override
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context)
             throws Exception {
+
         LOG.info("Starting reconciliation");
         statusRecorder.updateStatusFromCache(flinkApp);
         FlinkDeployment previousDeployment = ReconciliationUtils.clone(flinkApp);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/HealthProbe.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/HealthProbe.java
new file mode 100644
index 00000000..bea1d8e3
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/HealthProbe.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.health;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/** Flink operator health probe. */
+public enum HealthProbe {
+    INSTANCE;
+
+    private final AtomicBoolean isHealthy = new AtomicBoolean(true);
+
+    public void markUnhealthy() {
+        isHealthy.set(false);
+    }
+
+    public boolean isHealthy() {
+        return isHealthy.get();
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/HttpBootstrap.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/HttpBootstrap.java
new file mode 100644
index 00000000..65b1cb84
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/HttpBootstrap.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.health;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Class for serving HTTP requests for the health probe. */
+public class HttpBootstrap {
+    private static final Logger LOG = LoggerFactory.getLogger(OperatorHealthService.class);
+    private static final int MAX_REQUEST_LENGTH = 65536;
+
+    private final ServerBootstrap bootstrap;
+    private final Channel serverChannel;
+
+    public HttpBootstrap(HealthProbe probe, int port) throws InterruptedException {
+        LOG.info("Health probe HTTP endpoint starting...");
+        ChannelInitializer<SocketChannel> initializer =
+                new ChannelInitializer<>() {
+                    @Override
+                    protected void initChannel(SocketChannel ch) {
+                        ChannelPipeline pipeline = ch.pipeline();
+                        pipeline.addLast(new HttpServerCodec());
+                        pipeline.addLast(new HttpObjectAggregator(MAX_REQUEST_LENGTH));
+                        pipeline.addLast(new OperatorHealthHandler());
+                    }
+                };
+
+        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
+        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
+
+        this.bootstrap = new ServerBootstrap();
+
+        bootstrap
+                .group(bossGroup, workerGroup)
+                .channel(NioServerSocketChannel.class)
+                .childHandler(initializer);
+
+        this.serverChannel = bootstrap.bind(port).sync().channel();
+        LOG.info("Health probe HTTP endpoint started on port {}", port);
+    }
+
+    public void stop() {
+        if (this.serverChannel != null) {
+            this.serverChannel.close().awaitUninterruptibly();
+        }
+        if (bootstrap != null) {
+            if (bootstrap.group() != null) {
+                bootstrap.group().shutdownGracefully();
+            }
+            if (bootstrap.childGroup() != null) {
+                bootstrap.childGroup().shutdownGracefully();
+            }
+        }
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/OperatorHealthHandler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/OperatorHealthHandler.java
new file mode 100644
index 00000000..d4098b41
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/OperatorHealthHandler.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.health;
+
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaderNames;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Simple code which returns HTTP 200 messages if the service is live, and HTTP 500 messages if the
+ * service is down. The response is based on the {@link HealthProbe#isHealthy()} result.
+ */
+@ChannelHandler.Sharable
+public class OperatorHealthHandler extends SimpleChannelInboundHandler<HttpObject> {
+
+    private final HealthProbe probe = HealthProbe.INSTANCE;
+
+    @Override
+    public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
+        try {
+            if (probe.isHealthy()) {
+                ctx.writeAndFlush(createResponse("OK", HttpResponseStatus.OK));
+            } else {
+                ctx.writeAndFlush(
+                        createResponse("ERROR", HttpResponseStatus.INTERNAL_SERVER_ERROR));
+            }
+        } catch (Throwable t) {
+            if (ctx.channel().isActive()) {
+                ctx.writeAndFlush(
+                        createResponse(
+                                ExceptionUtils.stringifyException(t),
+                                HttpResponseStatus.INTERNAL_SERVER_ERROR));
+            }
+        }
+    }
+
+    @NotNull
+    private DefaultFullHttpResponse createResponse(String content, HttpResponseStatus status) {
+        ByteBuf buff = Unpooled.copiedBuffer(content, StandardCharsets.UTF_8);
+        DefaultFullHttpResponse response =
+                new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, buff);
+
+        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=UTF-8");
+        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, buff.readableBytes());
+        return response;
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/OperatorHealthService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/OperatorHealthService.java
new file mode 100644
index 00000000..c0e67e8b
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/OperatorHealthService.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.health;
+
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Health probe service. */
+public class OperatorHealthService {
+    private static final Logger LOG = LoggerFactory.getLogger(OperatorHealthService.class);
+
+    private final HealthProbe probe = HealthProbe.INSTANCE;
+    private final FlinkConfigManager configManager;
+    private HttpBootstrap httpBootstrap = null;
+
+    public OperatorHealthService(FlinkConfigManager configManager) {
+        this.configManager = configManager;
+    }
+
+    public void start() {
+        // Start the service for the http endpoint
+        try {
+            httpBootstrap =
+                    new HttpBootstrap(
+                            probe,
+                            configManager
+                                    .getDefaultConfig()
+                                    .get(
+                                            KubernetesOperatorConfigOptions
+                                                    .OPERATOR_HEALTH_PROBE_PORT));
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void stop() {
+        httpBootstrap.stop();
+    }
+
+    public static OperatorHealthService fromConfig(FlinkConfigManager configManager) {
+        var defaultConfig = configManager.getDefaultConfig();
+        if (defaultConfig.getBoolean(
+                KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_ENABLED)) {
+            return new OperatorHealthService(configManager);
+        } else {
+            LOG.info("Health probe disabled");
+            return null;
+        }
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/OperatorHealthServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/OperatorHealthServiceTest.java
new file mode 100644
index 00000000..fea45c6f
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/OperatorHealthServiceTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.health;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.util.NetUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test for Health probe and endpoint logic. */
+public class OperatorHealthServiceTest {
+
+    @Test
+    public void testHealthService() throws Exception {
+        try (var port = NetUtils.getAvailablePort()) {
+            var conf = new Configuration();
+            conf.set(KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT, port.getPort());
+            var healthService = new OperatorHealthService(new FlinkConfigManager(conf));
+            healthService.start();
+
+            assertTrue(callHealthEndpoint(conf));
+            HealthProbe.INSTANCE.markUnhealthy();
+            assertFalse(callHealthEndpoint(conf));
+            healthService.stop();
+        }
+    }
+
+    private boolean callHealthEndpoint(Configuration conf) throws Exception {
+        URL u =
+                new URL(
+                        "http://localhost:"
+                                + conf.get(
+                                        KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT)
+                                + "/");
+        HttpURLConnection connection = (HttpURLConnection) u.openConnection();
+        connection.setConnectTimeout(100000);
+        connection.connect();
+        return connection.getResponseCode() == OK.code();
+    }
+}
diff --git a/helm/flink-kubernetes-operator/templates/flink-operator.yaml b/helm/flink-kubernetes-operator/templates/flink-operator.yaml
index abc783c1..ff8b8c9b 100644
--- a/helm/flink-kubernetes-operator/templates/flink-operator.yaml
+++ b/helm/flink-kubernetes-operator/templates/flink-operator.yaml
@@ -59,12 +59,17 @@ spec:
           image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
           imagePullPolicy: {{ .Values.image.pullPolicy }}
           command: ["/docker-entrypoint.sh", "operator"]
-          {{- if .Values.metrics.port }}
           ports:
+          {{- if .Values.metrics.port }}
             - containerPort: {{ .Values.metrics.port }}
               name: metrics
               protocol: TCP
           {{- end }}
+          {{- if index .Values "operatorHealth" }}
+            - containerPort: {{ .Values.operatorHealth.port }}
+              name: health-port
+              protocol: TCP
+          {{- end }}
           env:
             - name: OPERATOR_NAMESPACE
               value: {{ .Release.Namespace }}
@@ -90,6 +95,20 @@ spec:
             {{- if .Values.operatorVolumeMounts.create }}
                 {{- toYaml .Values.operatorVolumeMounts.data | nindent 12 }}
             {{- end }}
+          {{- if and (index .Values "operatorHealth") (index .Values.operatorHealth "livenessProbe") }}
+          livenessProbe:
+            {{- toYaml .Values.operatorHealth.livenessProbe | nindent 12 }}
+            httpGet:
+              path: /
+              port: health-port
+          {{- end }}
+          {{- if and (index .Values "operatorHealth") (index .Values.operatorHealth "startupProbe") }}
+          startupProbe:
+            {{- toYaml .Values.operatorHealth.startupProbe | nindent 12 }}
+            httpGet:
+              path: /
+              port: health-port
+          {{- end }}
         {{- if eq (include "webhook-enabled" .) "true" }}
         - name: flink-webhook
           image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
@@ -173,6 +192,10 @@ data:
 {{- end }}
 {{- if .Values.watchNamespaces }}
     kubernetes.operator.watched.namespaces: {{ join "," .Values.watchNamespaces  }}
+{{- end }}
+{{- if index .Values "operatorHealth" }}
+    kubernetes.operator.health.probe.enabled: true
+    kubernetes.operator.health.probe.port: {{ .Values.operatorHealth.port }}
 {{- end }}
   log4j-operator.properties: |+
 {{- if .Values.defaultConfiguration.append }}
diff --git a/helm/flink-kubernetes-operator/values.yaml b/helm/flink-kubernetes-operator/values.yaml
index 7e4511ae..7bb91cc5 100644
--- a/helm/flink-kubernetes-operator/values.yaml
+++ b/helm/flink-kubernetes-operator/values.yaml
@@ -124,3 +124,13 @@ fullnameOverride: ""
 jvmArgs:
   webhook: ""
   operator: ""
+
+# Configure health probes for the operator
+operatorHealth:
+  port: 8085
+  livenessProbe:
+    periodSeconds: 10
+    initialDelaySeconds: 30
+  startupProbe:
+    failureThreshold: 30
+    periodSeconds: 10