You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2022/09/12 09:40:51 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-28272] Handle TLS Certificate Renewal in Webhook

This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new e5a325c4 [FLINK-28272] Handle TLS Certificate Renewal in Webhook
e5a325c4 is described below

commit e5a325c48965a50d61d0aa29e61ba79e97f27082
Author: Gabor Somogyi <ga...@apple.com>
AuthorDate: Mon Sep 12 11:40:46 2022 +0200

    [FLINK-28272] Handle TLS Certificate Renewal in Webhook
---
 .../operator/fs/FileSystemWatchService.java        |  94 ++++++++++++
 .../operator/ssl/ReloadableSslContext.java         | 108 ++++++++++++++
 .../operator/fs/FileSystemWatchServiceTest.java    | 161 +++++++++++++++++++++
 .../operator/admission/FlinkOperatorWebhook.java   |  54 ++++---
 4 files changed, 396 insertions(+), 21 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/fs/FileSystemWatchService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/fs/FileSystemWatchService.java
new file mode 100644
index 00000000..13378cc4
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/fs/FileSystemWatchService.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.fs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+
+import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
+import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
+import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
+import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
+
+/** Service which is able to watch local filesystem directories. */
+public class FileSystemWatchService extends Thread {
+    private static final Logger LOG = LoggerFactory.getLogger(FileSystemWatchService.class);
+
+    private final String directoryPath;
+
+    public FileSystemWatchService(String directoryPath) {
+        if (!new File(directoryPath).isDirectory()) {
+            throw new IllegalArgumentException("Directory must exists: " + directoryPath);
+        }
+        this.directoryPath = directoryPath;
+    }
+
+    @Override
+    public void run() {
+        try (WatchService watcher = FileSystems.getDefault().newWatchService()) {
+            LOG.info("Starting watching path: " + directoryPath);
+            Path realDirectoryPath = Paths.get(directoryPath).toRealPath();
+            LOG.info("Path is resolved to real path: " + realDirectoryPath);
+            realDirectoryPath.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
+            onWatchStarted(realDirectoryPath);
+
+            while (true) {
+                LOG.debug("Taking watch key");
+                WatchKey watchKey = watcher.take();
+                LOG.debug("Watch key arrived");
+                for (WatchEvent<?> watchEvent : watchKey.pollEvents()) {
+                    LOG.debug("Watch event count: " + watchEvent.count());
+                    if (watchEvent.kind() == OVERFLOW) {
+                        LOG.error("Filesystem events may have been lost or discarded");
+                        Thread.yield();
+                    } else if (watchEvent.kind() == ENTRY_CREATE) {
+                        onFileOrDirectoryCreated((Path) watchEvent.context());
+                    } else if (watchEvent.kind() == ENTRY_DELETE) {
+                        onFileOrDirectoryDeleted((Path) watchEvent.context());
+                    } else if (watchEvent.kind() == ENTRY_MODIFY) {
+                        onFileOrDirectoryModified((Path) watchEvent.context());
+                    } else {
+                        throw new IllegalStateException("Invalid event kind: " + watchEvent.kind());
+                    }
+                }
+                watchKey.reset();
+            }
+        } catch (InterruptedException e) {
+            LOG.info("Filesystem watcher interrupted");
+        } catch (Exception e) {
+            LOG.error("Filesystem watcher received exception and stopped: " + e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    protected void onWatchStarted(Path realDirectoryPath) {}
+
+    protected void onFileOrDirectoryCreated(Path relativePath) {}
+
+    protected void onFileOrDirectoryDeleted(Path relativePath) {}
+
+    protected void onFileOrDirectoryModified(Path relativePath) {}
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/ssl/ReloadableSslContext.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/ssl/ReloadableSslContext.java
new file mode 100644
index 00000000..768a92b0
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/ssl/ReloadableSslContext.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.ssl;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http2.Http2SecurityUtil;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.ApplicationProtocolNegotiator;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContext;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContextBuilder;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SupportedCipherSuiteFilter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLSessionContext;
+
+import java.io.File;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.security.KeyStore;
+import java.util.List;
+
+/** SSL context which is able to reload keystore. */
+public class ReloadableSslContext extends SslContext {
+    private static final Logger LOG = LoggerFactory.getLogger(ReloadableSslContext.class);
+
+    private final String keystorePath;
+    private final String keystoreType;
+    private final String keystorePassword;
+    private volatile SslContext sslContext;
+
+    public ReloadableSslContext(String keystorePath, String keystoreType, String keystorePassword)
+            throws Exception {
+        this.keystorePath = keystorePath;
+        this.keystoreType = keystoreType;
+        this.keystorePassword = keystorePassword;
+        loadContext();
+    }
+
+    @Override
+    public boolean isClient() {
+        return sslContext.isClient();
+    }
+
+    @Override
+    public List<String> cipherSuites() {
+        return sslContext.cipherSuites();
+    }
+
+    @Override
+    public ApplicationProtocolNegotiator applicationProtocolNegotiator() {
+        return sslContext.applicationProtocolNegotiator();
+    }
+
+    @Override
+    public SSLEngine newEngine(ByteBufAllocator byteBufAllocator) {
+        return sslContext.newEngine(byteBufAllocator);
+    }
+
+    @Override
+    public SSLEngine newEngine(ByteBufAllocator byteBufAllocator, String s, int i) {
+        return sslContext.newEngine(byteBufAllocator, s, i);
+    }
+
+    @Override
+    public SSLSessionContext sessionContext() {
+        return sslContext.sessionContext();
+    }
+
+    public void reload() throws Exception {
+        loadContext();
+    }
+
+    private void loadContext() throws Exception {
+        LOG.info("Creating keystore with type: " + keystoreType);
+        KeyStore keyStore = KeyStore.getInstance(keystoreType);
+        LOG.info("Loading keystore from file: " + keystorePath);
+        try (InputStream keyStoreFile = Files.newInputStream(new File(keystorePath).toPath())) {
+            keyStore.load(keyStoreFile, keystorePassword.toCharArray());
+        }
+        final KeyManagerFactory kmf =
+                KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+        LOG.info("Initializing key manager with keystore and password");
+        kmf.init(keyStore, keystorePassword.toCharArray());
+
+        sslContext =
+                SslContextBuilder.forServer(kmf)
+                        .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
+                        .build();
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/fs/FileSystemWatchServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/fs/FileSystemWatchServiceTest.java
new file mode 100644
index 00000000..b9075bb1
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/fs/FileSystemWatchServiceTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.fs;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/** Test for filesystem watch service. */
+class FileSystemWatchServiceTest {
+    private static final Logger LOG = LoggerFactory.getLogger(FileSystemWatchServiceTest.class);
+
+    private @TempDir Path tmpDir;
+    private String fileName;
+    private Path fileFullPath;
+    private CountDownLatch watchStartedLatch;
+    private CountDownLatch watchEventArrivedLatch;
+    private FileSystemWatchService fileSystemWatchService;
+
+    @BeforeEach
+    public void beforeEach() {
+        fileName = UUID.randomUUID().toString();
+        fileFullPath = Paths.get(tmpDir.toString(), fileName);
+        watchStartedLatch = new CountDownLatch(1);
+        watchEventArrivedLatch = new CountDownLatch(1);
+        fileSystemWatchService = null;
+    }
+
+    @AfterEach
+    public void afterEach() throws InterruptedException {
+        if (fileSystemWatchService != null) {
+            fileSystemWatchService.interrupt();
+            fileSystemWatchService.join(10_000);
+            fileSystemWatchService = null;
+        }
+    }
+
+    @Test
+    public void testMissingDirectory() {
+        Assertions.assertThrows(
+                IllegalArgumentException.class,
+                () -> {
+                    new FileSystemWatchService("/intentionally/missing/directory");
+                });
+    }
+
+    @Test
+    public void testFileCreateEvent() throws Exception {
+        FileSystemWatchService fileSystemWatchService =
+                new FileSystemWatchService(tmpDir.toString()) {
+                    @Override
+                    protected void onWatchStarted(Path realDirectoryPath) {
+                        watchStartedLatch.countDown();
+                    }
+
+                    @Override
+                    protected void onFileOrDirectoryCreated(Path relativePath) {
+                        if (relativePath.toString().equals(fileName)) {
+                            watchEventArrivedLatch.countDown();
+                        }
+                    }
+                };
+        fileSystemWatchService.start();
+        Assertions.assertTrue(watchStartedLatch.await(10, TimeUnit.SECONDS));
+
+        Files.createFile(Paths.get(tmpDir.toString(), fileName));
+
+        Assertions.assertTrue(watchEventArrivedLatch.await(1, TimeUnit.MINUTES));
+    }
+
+    @Test
+    public void testFileDeleteEvent() throws Exception {
+        Files.createFile(fileFullPath);
+        Assertions.assertEquals(1, Files.list(tmpDir).count());
+        FileSystemWatchService fileSystemWatchService =
+                new FileSystemWatchService(tmpDir.toString()) {
+                    @Override
+                    protected void onWatchStarted(Path realDirectoryPath) {
+                        LOG.info("onWatchStarted");
+                        watchStartedLatch.countDown();
+                    }
+
+                    @Override
+                    protected void onFileOrDirectoryDeleted(Path relativePath) {
+                        LOG.info("onFileOrDirectoryDeleted");
+                        if (relativePath.toString().equals(fileName)) {
+                            watchEventArrivedLatch.countDown();
+                        }
+                    }
+                };
+        fileSystemWatchService.start();
+        Assertions.assertTrue(watchStartedLatch.await(10, TimeUnit.SECONDS));
+
+        Files.delete(fileFullPath);
+
+        Assertions.assertEquals(0, Files.list(tmpDir).count());
+        Assertions.assertTrue(watchEventArrivedLatch.await(1, TimeUnit.MINUTES));
+    }
+
+    @Test
+    public void testFileModifyEvent() throws Exception {
+        writeFile("1");
+
+        FileSystemWatchService fileSystemWatchService =
+                new FileSystemWatchService(tmpDir.toString()) {
+                    @Override
+                    protected void onWatchStarted(Path realDirectoryPath) {
+                        watchStartedLatch.countDown();
+                    }
+
+                    @Override
+                    protected void onFileOrDirectoryModified(Path relativePath) {
+                        if (relativePath.toString().equals(fileName)) {
+                            watchEventArrivedLatch.countDown();
+                        }
+                    }
+                };
+        fileSystemWatchService.start();
+        Assertions.assertTrue(watchStartedLatch.await(10, TimeUnit.SECONDS));
+
+        writeFile("2");
+
+        Assertions.assertTrue(watchEventArrivedLatch.await(1, TimeUnit.MINUTES));
+    }
+
+    private void writeFile(String content) throws IOException {
+        Files.write(
+                fileFullPath,
+                content.getBytes(),
+                StandardOpenOption.CREATE,
+                StandardOpenOption.TRUNCATE_EXISTING);
+    }
+}
diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
index 9a7c03db..cd2e813f 100644
--- a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
+++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
@@ -20,6 +20,8 @@ package org.apache.flink.kubernetes.operator.admission;
 import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
 import org.apache.flink.kubernetes.operator.admission.mutator.FlinkMutator;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.fs.FileSystemWatchService;
+import org.apache.flink.kubernetes.operator.ssl.ReloadableSslContext;
 import org.apache.flink.kubernetes.operator.utils.EnvUtils;
 import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
 import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
@@ -32,24 +34,16 @@ import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http2.Http2SecurityUtil;
 import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContext;
-import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContextBuilder;
-import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SupportedCipherSuiteFilter;
 import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
 
 import io.fabric8.kubernetes.client.DefaultKubernetesClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.KeyManagerFactory;
-
-import java.io.File;
-import java.io.InputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.nio.file.Files;
-import java.security.KeyStore;
+import java.nio.file.Path;
 import java.util.Set;
 
 /** Main Class for Flink native k8s operator. */
@@ -58,6 +52,8 @@ public class FlinkOperatorWebhook {
 
     private static final int MAX_CONTEXT_LENGTH = 104_857_600;
 
+    private static FileSystemWatchService fileSystemWatchService;
+
     public static void main(String[] args) throws Exception {
         EnvUtils.logEnvironmentInfo(LOG, "Flink Kubernetes Webhook", args);
         var informerManager = new InformerManager(new DefaultKubernetesClient());
@@ -138,19 +134,35 @@ public class FlinkOperatorWebhook {
             return null;
         }
 
-        String keystorePassword = EnvUtils.getRequired(EnvUtils.ENV_WEBHOOK_KEYSTORE_PASSWORD);
         String keystoreType = EnvUtils.getRequired(EnvUtils.ENV_WEBHOOK_KEYSTORE_TYPE);
-        KeyStore keyStore = KeyStore.getInstance(keystoreType);
-        try (InputStream keyStoreFile =
-                Files.newInputStream(new File(keystorePathOpt.get()).toPath())) {
-            keyStore.load(keyStoreFile, keystorePassword.toCharArray());
-        }
-        final KeyManagerFactory kmf =
-                KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
-        kmf.init(keyStore, keystorePassword.toCharArray());
+        String keystorePassword = EnvUtils.getRequired(EnvUtils.ENV_WEBHOOK_KEYSTORE_PASSWORD);
+        ReloadableSslContext reloadableSslContext =
+                new ReloadableSslContext(keystorePathOpt.get(), keystoreType, keystorePassword);
+        stopFileSystemWatchService();
+        final String realKeystoreFileName =
+                Path.of(keystorePathOpt.get()).toRealPath().getFileName().toString();
+        LOG.info("Keystore path is resolved to real filename: " + realKeystoreFileName);
+        fileSystemWatchService =
+                new FileSystemWatchService(Path.of(keystorePathOpt.get()).getParent().toString()) {
+                    @Override
+                    protected void onFileOrDirectoryModified(Path relativePath) {
+                        try {
+                            LOG.info("Reloading SSL context because of certificate change");
+                            reloadableSslContext.reload();
+                            LOG.info("SSL context reloaded successfully");
+                        } catch (Exception e) {
+                            LOG.error("SSL context reload received exception: " + e);
+                        }
+                    }
+                };
+        fileSystemWatchService.start();
+        return reloadableSslContext;
+    }
 
-        return SslContextBuilder.forServer(kmf)
-                .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
-                .build();
+    private static void stopFileSystemWatchService() throws InterruptedException {
+        if (fileSystemWatchService != null) {
+            fileSystemWatchService.interrupt();
+            fileSystemWatchService.join();
+        }
     }
 }