You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/08/18 13:26:04 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-28218] Add liveness probe to k8s operator
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 199ed194 [FLINK-28218] Add liveness probe to k8s operator
199ed194 is described below
commit 199ed194f81502d3e146265c1694eddaf07e838e
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Aug 18 15:25:59 2022 +0200
[FLINK-28218] Add liveness probe to k8s operator
---
docs/content/docs/operations/helm.md | 3 +
.../kubernetes_operator_config_configuration.html | 12 ++++
.../generated/system_advanced_section.html | 18 +++++
.../shortcodes/generated/system_section.html | 6 --
.../flink/kubernetes/operator/FlinkOperator.java | 17 +++--
.../config/KubernetesOperatorConfigOptions.java | 16 ++++-
.../controller/FlinkDeploymentController.java | 1 +
.../kubernetes/operator/health/HealthProbe.java | 35 ++++++++++
.../kubernetes/operator/health/HttpBootstrap.java | 81 ++++++++++++++++++++++
.../operator/health/OperatorHealthHandler.java | 75 ++++++++++++++++++++
.../operator/health/OperatorHealthService.java | 68 ++++++++++++++++++
.../operator/health/OperatorHealthServiceTest.java | 64 +++++++++++++++++
.../templates/flink-operator.yaml | 25 ++++++-
helm/flink-kubernetes-operator/values.yaml | 10 +++
14 files changed, 418 insertions(+), 13 deletions(-)
diff --git a/docs/content/docs/operations/helm.md b/docs/content/docs/operations/helm.md
index 49378b84..05d1ccf4 100644
--- a/docs/content/docs/operations/helm.md
+++ b/docs/content/docs/operations/helm.md
@@ -96,6 +96,9 @@ The configurable parameters of the Helm chart and which default values as detail
| fullnameOverride | Overrides the fullname with the specified full name. | |
| jvmArgs.webhook | The JVM start up options for webhook. | |
| jvmArgs.operator | The JVM start up options for operator. | |
+| operatorHealth.port | Operator health endpoint port to be used by the probes. | 8085 |
+| operatorHealth.livenessProbe | Liveness probe configuration for the operator using the health endpoint. Only time settings should be configured, endpoint is set automatically based on port. | |
+| operatorHealth.startupProbe | Startup probe configuration for the operator using the health endpoint. Only time settings should be configured, endpoint is set automatically based on port. | |
For more information check the [Helm documentation](https://helm.sh/docs/helm/helm_install/).
diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index aaf1e936..f00a1204 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -62,6 +62,18 @@
<td>Duration</td>
<td>The timeout for the observer to wait the flink rest client to return.</td>
</tr>
+ <tr>
+ <td><h5>kubernetes.operator.health.probe.enabled</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Enables health probe for the kubernetes operator.</td>
+ </tr>
+ <tr>
+ <td><h5>kubernetes.operator.health.probe.port</h5></td>
+ <td style="word-wrap: break-word;">8085</td>
+ <td>Integer</td>
+ <td>The port the health probe will use to expose the status.</td>
+ </tr>
<tr>
<td><h5>kubernetes.operator.jm-deployment-recovery.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
diff --git a/docs/layouts/shortcodes/generated/system_advanced_section.html b/docs/layouts/shortcodes/generated/system_advanced_section.html
index 72d525da..a47d7df2 100644
--- a/docs/layouts/shortcodes/generated/system_advanced_section.html
+++ b/docs/layouts/shortcodes/generated/system_advanced_section.html
@@ -32,6 +32,24 @@
<td>Boolean</td>
<td>Whether to enable on-the-fly config changes through the operator configmap.</td>
</tr>
+ <tr>
+ <td><h5>kubernetes.operator.health.probe.enabled</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Enables health probe for the kubernetes operator.</td>
+ </tr>
+ <tr>
+ <td><h5>kubernetes.operator.health.probe.port</h5></td>
+ <td style="word-wrap: break-word;">8085</td>
+ <td>Integer</td>
+ <td>The port the health probe will use to expose the status.</td>
+ </tr>
+ <tr>
+ <td><h5>kubernetes.operator.label.selector</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Label selector of the custom resources to be watched. Please see https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors for the format supported.</td>
+ </tr>
<tr>
<td><h5>kubernetes.operator.observer.progress-check.interval</h5></td>
<td style="word-wrap: break-word;">10 s</td>
diff --git a/docs/layouts/shortcodes/generated/system_section.html b/docs/layouts/shortcodes/generated/system_section.html
index d6b8cab8..a4428aa7 100644
--- a/docs/layouts/shortcodes/generated/system_section.html
+++ b/docs/layouts/shortcodes/generated/system_section.html
@@ -26,12 +26,6 @@
<td>Duration</td>
<td>The timeout for the observer to wait the flink rest client to return.</td>
</tr>
- <tr>
- <td><h5>kubernetes.operator.label.selector</h5></td>
- <td style="word-wrap: break-word;">(none)</td>
- <td>String</td>
- <td>Label selector of the custom resources to be watched. Please see https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors for the format supported.</td>
- </tr>
<tr>
<td><h5>kubernetes.operator.reconcile.interval</h5></td>
<td style="word-wrap: break-word;">1 min</td>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 202ccdeb..d4a97124 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
import org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController;
+import org.apache.flink.kubernetes.operator.health.OperatorHealthService;
import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
import org.apache.flink.kubernetes.operator.listener.ListenerUtils;
import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
@@ -72,14 +73,16 @@ public class FlinkOperator {
@VisibleForTesting final Set<RegisteredController<?>> registeredControllers = new HashSet<>();
private final KubernetesOperatorMetricGroup metricGroup;
private final Collection<FlinkResourceListener> listeners;
+ private final OperatorHealthService operatorHealthService;
public FlinkOperator(@Nullable Configuration conf) {
this.configManager =
conf != null
? new FlinkConfigManager(conf) // For testing only
: new FlinkConfigManager(this::handleNamespaceChanges);
- this.metricGroup =
- OperatorMetricUtils.initOperatorMetrics(configManager.getDefaultConfig());
+
+ var defaultConfig = configManager.getDefaultConfig();
+ this.metricGroup = OperatorMetricUtils.initOperatorMetrics(defaultConfig);
this.client =
KubernetesClientUtils.getKubernetesClient(
configManager.getOperatorConfiguration(), this.metricGroup);
@@ -87,9 +90,9 @@ public class FlinkOperator {
this.flinkServiceFactory = new FlinkServiceFactory(client, configManager);
this.validators = ValidatorUtils.discoverValidators(configManager);
this.listeners = ListenerUtils.discoverListeners(configManager);
- PluginManager pluginManager =
- PluginUtils.createPluginManagerFromRootFolder(configManager.getDefaultConfig());
- FileSystem.initialize(configManager.getDefaultConfig(), pluginManager);
+ PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(defaultConfig);
+ FileSystem.initialize(defaultConfig, pluginManager);
+ this.operatorHealthService = OperatorHealthService.fromConfig(configManager);
}
private void handleNamespaceChanges(Set<String> namespaces) {
@@ -183,6 +186,10 @@ public class FlinkOperator {
registerSessionJobController();
operator.installShutdownHook();
operator.start();
+ if (operatorHealthService != null) {
+ Runtime.getRuntime().addShutdownHook(new Thread(operatorHealthService::stop));
+ operatorHealthService.start();
+ }
}
public static void main(String... args) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index 11f84b7c..5736c540 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -234,7 +234,7 @@ public class KubernetesOperatorConfigOptions {
.withDescription(
"Comma separated list of namespaces the operator monitors for custom resources.");
- @Documentation.Section(SECTION_SYSTEM)
+ @Documentation.Section(SECTION_ADVANCED)
public static final ConfigOption<String> OPERATOR_LABEL_SELECTOR =
ConfigOptions.key("kubernetes.operator.label.selector")
.stringType()
@@ -289,4 +289,18 @@ public class KubernetesOperatorConfigOptions {
.defaultValue(SavepointFormatType.DEFAULT)
.withDescription(
"Type of the binary format in which a savepoint should be taken.");
+
+ @Documentation.Section(SECTION_ADVANCED)
+ public static final ConfigOption<Boolean> OPERATOR_HEALTH_PROBE_ENABLED =
+ ConfigOptions.key("kubernetes.operator.health.probe.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Enables health probe for the kubernetes operator.");
+
+ @Documentation.Section(SECTION_ADVANCED)
+ public static final ConfigOption<Integer> OPERATOR_HEALTH_PROBE_PORT =
+ ConfigOptions.key("kubernetes.operator.health.probe.port")
+ .intType()
+ .defaultValue(8085)
+ .withDescription("The port the health probe will use to expose the status.");
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 48fad9d7..a0b0a161 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -98,6 +98,7 @@ public class FlinkDeploymentController
@Override
public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context)
throws Exception {
+
LOG.info("Starting reconciliation");
statusRecorder.updateStatusFromCache(flinkApp);
FlinkDeployment previousDeployment = ReconciliationUtils.clone(flinkApp);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/HealthProbe.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/HealthProbe.java
new file mode 100644
index 00000000..bea1d8e3
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/HealthProbe.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.health;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/** Flink operator health probe. */
+public enum HealthProbe {
+ INSTANCE;
+
+ private final AtomicBoolean isHealthy = new AtomicBoolean(true);
+
+ public void markUnhealthy() {
+ isHealthy.set(false);
+ }
+
+ public boolean isHealthy() {
+ return isHealthy.get();
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/HttpBootstrap.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/HttpBootstrap.java
new file mode 100644
index 00000000..65b1cb84
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/HttpBootstrap.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.health;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Class for serving HTTP requests for the health probe. */
+public class HttpBootstrap {
+ private static final Logger LOG = LoggerFactory.getLogger(OperatorHealthService.class);
+ private static final int MAX_REQUEST_LENGTH = 65536;
+
+ private final ServerBootstrap bootstrap;
+ private final Channel serverChannel;
+
+ public HttpBootstrap(HealthProbe probe, int port) throws InterruptedException {
+ LOG.info("Health probe HTTP endpoint starting...");
+ ChannelInitializer<SocketChannel> initializer =
+ new ChannelInitializer<>() {
+ @Override
+ protected void initChannel(SocketChannel ch) {
+ ChannelPipeline pipeline = ch.pipeline();
+ pipeline.addLast(new HttpServerCodec());
+ pipeline.addLast(new HttpObjectAggregator(MAX_REQUEST_LENGTH));
+ pipeline.addLast(new OperatorHealthHandler());
+ }
+ };
+
+ NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
+ NioEventLoopGroup workerGroup = new NioEventLoopGroup();
+
+ this.bootstrap = new ServerBootstrap();
+
+ bootstrap
+ .group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .childHandler(initializer);
+
+ this.serverChannel = bootstrap.bind(port).sync().channel();
+ LOG.info("Health probe HTTP endpoint started on port {}", port);
+ }
+
+ public void stop() {
+ if (this.serverChannel != null) {
+ this.serverChannel.close().awaitUninterruptibly();
+ }
+ if (bootstrap != null) {
+ if (bootstrap.group() != null) {
+ bootstrap.group().shutdownGracefully();
+ }
+ if (bootstrap.childGroup() != null) {
+ bootstrap.childGroup().shutdownGracefully();
+ }
+ }
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/OperatorHealthHandler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/OperatorHealthHandler.java
new file mode 100644
index 00000000..d4098b41
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/OperatorHealthHandler.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.health;
+
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaderNames;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Simple code which returns HTTP 200 messages if the service is live, and HTTP 500 messages if the
+ * service is down. The response is based on the {@link HealthProbe#isHealthy()} result.
+ */
+@ChannelHandler.Sharable
+public class OperatorHealthHandler extends SimpleChannelInboundHandler<HttpObject> {
+
+ private final HealthProbe probe = HealthProbe.INSTANCE;
+
+ @Override
+ public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
+ try {
+ if (probe.isHealthy()) {
+ ctx.writeAndFlush(createResponse("OK", HttpResponseStatus.OK));
+ } else {
+ ctx.writeAndFlush(
+ createResponse("ERROR", HttpResponseStatus.INTERNAL_SERVER_ERROR));
+ }
+ } catch (Throwable t) {
+ if (ctx.channel().isActive()) {
+ ctx.writeAndFlush(
+ createResponse(
+ ExceptionUtils.stringifyException(t),
+ HttpResponseStatus.INTERNAL_SERVER_ERROR));
+ }
+ }
+ }
+
+ @NotNull
+ private DefaultFullHttpResponse createResponse(String content, HttpResponseStatus status) {
+ ByteBuf buff = Unpooled.copiedBuffer(content, StandardCharsets.UTF_8);
+ DefaultFullHttpResponse response =
+ new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, buff);
+
+ response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=UTF-8");
+ response.headers().set(HttpHeaderNames.CONTENT_LENGTH, buff.readableBytes());
+ return response;
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/OperatorHealthService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/OperatorHealthService.java
new file mode 100644
index 00000000..c0e67e8b
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/OperatorHealthService.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.health;
+
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Health probe service. */
+public class OperatorHealthService {
+ private static final Logger LOG = LoggerFactory.getLogger(OperatorHealthService.class);
+
+ private final HealthProbe probe = HealthProbe.INSTANCE;
+ private final FlinkConfigManager configManager;
+ private HttpBootstrap httpBootstrap = null;
+
+ public OperatorHealthService(FlinkConfigManager configManager) {
+ this.configManager = configManager;
+ }
+
+ public void start() {
+ // Start the service for the http endpoint
+ try {
+ httpBootstrap =
+ new HttpBootstrap(
+ probe,
+ configManager
+ .getDefaultConfig()
+ .get(
+ KubernetesOperatorConfigOptions
+ .OPERATOR_HEALTH_PROBE_PORT));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void stop() {
+ httpBootstrap.stop();
+ }
+
+ public static OperatorHealthService fromConfig(FlinkConfigManager configManager) {
+ var defaultConfig = configManager.getDefaultConfig();
+ if (defaultConfig.getBoolean(
+ KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_ENABLED)) {
+ return new OperatorHealthService(configManager);
+ } else {
+ LOG.info("Health probe disabled");
+ return null;
+ }
+ }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/OperatorHealthServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/OperatorHealthServiceTest.java
new file mode 100644
index 00000000..fea45c6f
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/OperatorHealthServiceTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.health;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.util.NetUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test for Health probe and endpoint logic. */
+public class OperatorHealthServiceTest {
+
+ @Test
+ public void testHealthService() throws Exception {
+ try (var port = NetUtils.getAvailablePort()) {
+ var conf = new Configuration();
+ conf.set(KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT, port.getPort());
+ var healthService = new OperatorHealthService(new FlinkConfigManager(conf));
+ healthService.start();
+
+ assertTrue(callHealthEndpoint(conf));
+ HealthProbe.INSTANCE.markUnhealthy();
+ assertFalse(callHealthEndpoint(conf));
+ healthService.stop();
+ }
+ }
+
+ private boolean callHealthEndpoint(Configuration conf) throws Exception {
+ URL u =
+ new URL(
+ "http://localhost:"
+ + conf.get(
+ KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT)
+ + "/");
+ HttpURLConnection connection = (HttpURLConnection) u.openConnection();
+ connection.setConnectTimeout(100000);
+ connection.connect();
+ return connection.getResponseCode() == OK.code();
+ }
+}
diff --git a/helm/flink-kubernetes-operator/templates/flink-operator.yaml b/helm/flink-kubernetes-operator/templates/flink-operator.yaml
index abc783c1..ff8b8c9b 100644
--- a/helm/flink-kubernetes-operator/templates/flink-operator.yaml
+++ b/helm/flink-kubernetes-operator/templates/flink-operator.yaml
@@ -59,12 +59,17 @@ spec:
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
command: ["/docker-entrypoint.sh", "operator"]
- {{- if .Values.metrics.port }}
ports:
+ {{- if .Values.metrics.port }}
- containerPort: {{ .Values.metrics.port }}
name: metrics
protocol: TCP
{{- end }}
+ {{- if index .Values "operatorHealth" }}
+ - containerPort: {{ .Values.operatorHealth.port }}
+ name: health-port
+ protocol: TCP
+ {{- end }}
env:
- name: OPERATOR_NAMESPACE
value: {{ .Release.Namespace }}
@@ -90,6 +95,20 @@ spec:
{{- if .Values.operatorVolumeMounts.create }}
{{- toYaml .Values.operatorVolumeMounts.data | nindent 12 }}
{{- end }}
+ {{- if and (index .Values "operatorHealth") (index .Values.operatorHealth "livenessProbe") }}
+ livenessProbe:
+ {{- toYaml .Values.operatorHealth.livenessProbe | nindent 12 }}
+ httpGet:
+ path: /
+ port: health-port
+ {{- end }}
+ {{- if and (index .Values "operatorHealth") (index .Values.operatorHealth "startupProbe") }}
+ startupProbe:
+ {{- toYaml .Values.operatorHealth.startupProbe | nindent 12 }}
+ httpGet:
+ path: /
+ port: health-port
+ {{- end }}
{{- if eq (include "webhook-enabled" .) "true" }}
- name: flink-webhook
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
@@ -173,6 +192,10 @@ data:
{{- end }}
{{- if .Values.watchNamespaces }}
kubernetes.operator.watched.namespaces: {{ join "," .Values.watchNamespaces }}
+{{- end }}
+{{- if index .Values "operatorHealth" }}
+ kubernetes.operator.health.probe.enabled: true
+ kubernetes.operator.health.probe.port: {{ .Values.operatorHealth.port }}
{{- end }}
log4j-operator.properties: |+
{{- if .Values.defaultConfiguration.append }}
diff --git a/helm/flink-kubernetes-operator/values.yaml b/helm/flink-kubernetes-operator/values.yaml
index 7e4511ae..7bb91cc5 100644
--- a/helm/flink-kubernetes-operator/values.yaml
+++ b/helm/flink-kubernetes-operator/values.yaml
@@ -124,3 +124,13 @@ fullnameOverride: ""
jvmArgs:
webhook: ""
operator: ""
+
+# Configure health probes for the operator
+operatorHealth:
+ port: 8085
+ livenessProbe:
+ periodSeconds: 10
+ initialDelaySeconds: 30
+ startupProbe:
+ failureThreshold: 30
+ periodSeconds: 10