You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2022/09/12 09:40:51 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-28272] Handle TLS Certificate Renewal in Webhook
This is an automated email from the ASF dual-hosted git repository.
mbalassi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new e5a325c4 [FLINK-28272] Handle TLS Certificate Renewal in Webhook
e5a325c4 is described below
commit e5a325c48965a50d61d0aa29e61ba79e97f27082
Author: Gabor Somogyi <ga...@apple.com>
AuthorDate: Mon Sep 12 11:40:46 2022 +0200
[FLINK-28272] Handle TLS Certificate Renewal in Webhook
---
.../operator/fs/FileSystemWatchService.java | 94 ++++++++++++
.../operator/ssl/ReloadableSslContext.java | 108 ++++++++++++++
.../operator/fs/FileSystemWatchServiceTest.java | 161 +++++++++++++++++++++
.../operator/admission/FlinkOperatorWebhook.java | 54 ++++---
4 files changed, 396 insertions(+), 21 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/fs/FileSystemWatchService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/fs/FileSystemWatchService.java
new file mode 100644
index 00000000..13378cc4
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/fs/FileSystemWatchService.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.fs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+
+import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
+import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
+import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
+import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
+
+/** Service which is able to watch local filesystem directories. */
+public class FileSystemWatchService extends Thread {
+ private static final Logger LOG = LoggerFactory.getLogger(FileSystemWatchService.class);
+
+ private final String directoryPath;
+
+ public FileSystemWatchService(String directoryPath) {
+ if (!new File(directoryPath).isDirectory()) {
+ throw new IllegalArgumentException("Directory must exists: " + directoryPath);
+ }
+ this.directoryPath = directoryPath;
+ }
+
+ @Override
+ public void run() {
+ try (WatchService watcher = FileSystems.getDefault().newWatchService()) {
+ LOG.info("Starting watching path: " + directoryPath);
+ Path realDirectoryPath = Paths.get(directoryPath).toRealPath();
+ LOG.info("Path is resolved to real path: " + realDirectoryPath);
+ realDirectoryPath.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
+ onWatchStarted(realDirectoryPath);
+
+ while (true) {
+ LOG.debug("Taking watch key");
+ WatchKey watchKey = watcher.take();
+ LOG.debug("Watch key arrived");
+ for (WatchEvent<?> watchEvent : watchKey.pollEvents()) {
+ LOG.debug("Watch event count: " + watchEvent.count());
+ if (watchEvent.kind() == OVERFLOW) {
+ LOG.error("Filesystem events may have been lost or discarded");
+ Thread.yield();
+ } else if (watchEvent.kind() == ENTRY_CREATE) {
+ onFileOrDirectoryCreated((Path) watchEvent.context());
+ } else if (watchEvent.kind() == ENTRY_DELETE) {
+ onFileOrDirectoryDeleted((Path) watchEvent.context());
+ } else if (watchEvent.kind() == ENTRY_MODIFY) {
+ onFileOrDirectoryModified((Path) watchEvent.context());
+ } else {
+ throw new IllegalStateException("Invalid event kind: " + watchEvent.kind());
+ }
+ }
+ watchKey.reset();
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Filesystem watcher interrupted");
+ } catch (Exception e) {
+ LOG.error("Filesystem watcher received exception and stopped: " + e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected void onWatchStarted(Path realDirectoryPath) {}
+
+ protected void onFileOrDirectoryCreated(Path relativePath) {}
+
+ protected void onFileOrDirectoryDeleted(Path relativePath) {}
+
+ protected void onFileOrDirectoryModified(Path relativePath) {}
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/ssl/ReloadableSslContext.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/ssl/ReloadableSslContext.java
new file mode 100644
index 00000000..768a92b0
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/ssl/ReloadableSslContext.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.ssl;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http2.Http2SecurityUtil;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.ApplicationProtocolNegotiator;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContextBuilder;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SupportedCipherSuiteFilter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLSessionContext;
+
+import java.io.File;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.security.KeyStore;
+import java.util.List;
+
+/** SSL context which is able to reload keystore. */
+public class ReloadableSslContext extends SslContext {
+ private static final Logger LOG = LoggerFactory.getLogger(ReloadableSslContext.class);
+
+ private final String keystorePath;
+ private final String keystoreType;
+ private final String keystorePassword;
+ private volatile SslContext sslContext;
+
+ public ReloadableSslContext(String keystorePath, String keystoreType, String keystorePassword)
+ throws Exception {
+ this.keystorePath = keystorePath;
+ this.keystoreType = keystoreType;
+ this.keystorePassword = keystorePassword;
+ loadContext();
+ }
+
+ @Override
+ public boolean isClient() {
+ return sslContext.isClient();
+ }
+
+ @Override
+ public List<String> cipherSuites() {
+ return sslContext.cipherSuites();
+ }
+
+ @Override
+ public ApplicationProtocolNegotiator applicationProtocolNegotiator() {
+ return sslContext.applicationProtocolNegotiator();
+ }
+
+ @Override
+ public SSLEngine newEngine(ByteBufAllocator byteBufAllocator) {
+ return sslContext.newEngine(byteBufAllocator);
+ }
+
+ @Override
+ public SSLEngine newEngine(ByteBufAllocator byteBufAllocator, String s, int i) {
+ return sslContext.newEngine(byteBufAllocator, s, i);
+ }
+
+ @Override
+ public SSLSessionContext sessionContext() {
+ return sslContext.sessionContext();
+ }
+
+ public void reload() throws Exception {
+ loadContext();
+ }
+
+ private void loadContext() throws Exception {
+ LOG.info("Creating keystore with type: " + keystoreType);
+ KeyStore keyStore = KeyStore.getInstance(keystoreType);
+ LOG.info("Loading keystore from file: " + keystorePath);
+ try (InputStream keyStoreFile = Files.newInputStream(new File(keystorePath).toPath())) {
+ keyStore.load(keyStoreFile, keystorePassword.toCharArray());
+ }
+ final KeyManagerFactory kmf =
+ KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ LOG.info("Initializing key manager with keystore and password");
+ kmf.init(keyStore, keystorePassword.toCharArray());
+
+ sslContext =
+ SslContextBuilder.forServer(kmf)
+ .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
+ .build();
+ }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/fs/FileSystemWatchServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/fs/FileSystemWatchServiceTest.java
new file mode 100644
index 00000000..b9075bb1
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/fs/FileSystemWatchServiceTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.fs;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/** Test for filesystem watch service. */
+class FileSystemWatchServiceTest {
+ private static final Logger LOG = LoggerFactory.getLogger(FileSystemWatchServiceTest.class);
+
+ private @TempDir Path tmpDir;
+ private String fileName;
+ private Path fileFullPath;
+ private CountDownLatch watchStartedLatch;
+ private CountDownLatch watchEventArrivedLatch;
+ private FileSystemWatchService fileSystemWatchService;
+
+ @BeforeEach
+ public void beforeEach() {
+ fileName = UUID.randomUUID().toString();
+ fileFullPath = Paths.get(tmpDir.toString(), fileName);
+ watchStartedLatch = new CountDownLatch(1);
+ watchEventArrivedLatch = new CountDownLatch(1);
+ fileSystemWatchService = null;
+ }
+
+ @AfterEach
+ public void afterEach() throws InterruptedException {
+ if (fileSystemWatchService != null) {
+ fileSystemWatchService.interrupt();
+ fileSystemWatchService.join(10_000);
+ fileSystemWatchService = null;
+ }
+ }
+
+ @Test
+ public void testMissingDirectory() {
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> {
+ new FileSystemWatchService("/intentionally/missing/directory");
+ });
+ }
+
+ @Test
+ public void testFileCreateEvent() throws Exception {
+ FileSystemWatchService fileSystemWatchService =
+ new FileSystemWatchService(tmpDir.toString()) {
+ @Override
+ protected void onWatchStarted(Path realDirectoryPath) {
+ watchStartedLatch.countDown();
+ }
+
+ @Override
+ protected void onFileOrDirectoryCreated(Path relativePath) {
+ if (relativePath.toString().equals(fileName)) {
+ watchEventArrivedLatch.countDown();
+ }
+ }
+ };
+ fileSystemWatchService.start();
+ Assertions.assertTrue(watchStartedLatch.await(10, TimeUnit.SECONDS));
+
+ Files.createFile(Paths.get(tmpDir.toString(), fileName));
+
+ Assertions.assertTrue(watchEventArrivedLatch.await(1, TimeUnit.MINUTES));
+ }
+
+ @Test
+ public void testFileDeleteEvent() throws Exception {
+ Files.createFile(fileFullPath);
+ Assertions.assertEquals(1, Files.list(tmpDir).count());
+ FileSystemWatchService fileSystemWatchService =
+ new FileSystemWatchService(tmpDir.toString()) {
+ @Override
+ protected void onWatchStarted(Path realDirectoryPath) {
+ LOG.info("onWatchStarted");
+ watchStartedLatch.countDown();
+ }
+
+ @Override
+ protected void onFileOrDirectoryDeleted(Path relativePath) {
+ LOG.info("onFileOrDirectoryDeleted");
+ if (relativePath.toString().equals(fileName)) {
+ watchEventArrivedLatch.countDown();
+ }
+ }
+ };
+ fileSystemWatchService.start();
+ Assertions.assertTrue(watchStartedLatch.await(10, TimeUnit.SECONDS));
+
+ Files.delete(fileFullPath);
+
+ Assertions.assertEquals(0, Files.list(tmpDir).count());
+ Assertions.assertTrue(watchEventArrivedLatch.await(1, TimeUnit.MINUTES));
+ }
+
+ @Test
+ public void testFileModifyEvent() throws Exception {
+ writeFile("1");
+
+ FileSystemWatchService fileSystemWatchService =
+ new FileSystemWatchService(tmpDir.toString()) {
+ @Override
+ protected void onWatchStarted(Path realDirectoryPath) {
+ watchStartedLatch.countDown();
+ }
+
+ @Override
+ protected void onFileOrDirectoryModified(Path relativePath) {
+ if (relativePath.toString().equals(fileName)) {
+ watchEventArrivedLatch.countDown();
+ }
+ }
+ };
+ fileSystemWatchService.start();
+ Assertions.assertTrue(watchStartedLatch.await(10, TimeUnit.SECONDS));
+
+ writeFile("2");
+
+ Assertions.assertTrue(watchEventArrivedLatch.await(1, TimeUnit.MINUTES));
+ }
+
+ private void writeFile(String content) throws IOException {
+ Files.write(
+ fileFullPath,
+ content.getBytes(),
+ StandardOpenOption.CREATE,
+ StandardOpenOption.TRUNCATE_EXISTING);
+ }
+}
diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
index 9a7c03db..cd2e813f 100644
--- a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
+++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
@@ -20,6 +20,8 @@ package org.apache.flink.kubernetes.operator.admission;
import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
import org.apache.flink.kubernetes.operator.admission.mutator.FlinkMutator;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.fs.FileSystemWatchService;
+import org.apache.flink.kubernetes.operator.ssl.ReloadableSslContext;
import org.apache.flink.kubernetes.operator.utils.EnvUtils;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
@@ -32,24 +34,16 @@ import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http2.Http2SecurityUtil;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContext;
-import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContextBuilder;
-import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SupportedCipherSuiteFilter;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.KeyManagerFactory;
-
-import java.io.File;
-import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.nio.file.Files;
-import java.security.KeyStore;
+import java.nio.file.Path;
import java.util.Set;
/** Main Class for Flink native k8s operator. */
@@ -58,6 +52,8 @@ public class FlinkOperatorWebhook {
private static final int MAX_CONTEXT_LENGTH = 104_857_600;
+ private static FileSystemWatchService fileSystemWatchService;
+
public static void main(String[] args) throws Exception {
EnvUtils.logEnvironmentInfo(LOG, "Flink Kubernetes Webhook", args);
var informerManager = new InformerManager(new DefaultKubernetesClient());
@@ -138,19 +134,35 @@ public class FlinkOperatorWebhook {
return null;
}
- String keystorePassword = EnvUtils.getRequired(EnvUtils.ENV_WEBHOOK_KEYSTORE_PASSWORD);
String keystoreType = EnvUtils.getRequired(EnvUtils.ENV_WEBHOOK_KEYSTORE_TYPE);
- KeyStore keyStore = KeyStore.getInstance(keystoreType);
- try (InputStream keyStoreFile =
- Files.newInputStream(new File(keystorePathOpt.get()).toPath())) {
- keyStore.load(keyStoreFile, keystorePassword.toCharArray());
- }
- final KeyManagerFactory kmf =
- KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
- kmf.init(keyStore, keystorePassword.toCharArray());
+ String keystorePassword = EnvUtils.getRequired(EnvUtils.ENV_WEBHOOK_KEYSTORE_PASSWORD);
+ ReloadableSslContext reloadableSslContext =
+ new ReloadableSslContext(keystorePathOpt.get(), keystoreType, keystorePassword);
+ stopFileSystemWatchService();
+ final String realKeystoreFileName =
+ Path.of(keystorePathOpt.get()).toRealPath().getFileName().toString();
+ LOG.info("Keystore path is resolved to real filename: " + realKeystoreFileName);
+ fileSystemWatchService =
+ new FileSystemWatchService(Path.of(keystorePathOpt.get()).getParent().toString()) {
+ @Override
+ protected void onFileOrDirectoryModified(Path relativePath) {
+ try {
+ LOG.info("Reloading SSL context because of certificate change");
+ reloadableSslContext.reload();
+ LOG.info("SSL context reloaded successfully");
+ } catch (Exception e) {
+ LOG.error("SSL context reload received exception: " + e);
+ }
+ }
+ };
+ fileSystemWatchService.start();
+ return reloadableSslContext;
+ }
- return SslContextBuilder.forServer(kmf)
- .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
- .build();
+ private static void stopFileSystemWatchService() throws InterruptedException {
+ if (fileSystemWatchService != null) {
+ fileSystemWatchService.interrupt();
+ fileSystemWatchService.join();
+ }
}
}