You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/01 10:08:51 UTC

[GitHub] [kafka] rajinisivaram commented on a change in pull request #10156: KAFKA-10345: File watch store reloading

rajinisivaram commented on a change in pull request #10156:
URL: https://github.com/apache/kafka/pull/10156#discussion_r642939618



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
##########
@@ -98,7 +99,7 @@ private ClientUtils() {
      *
      * @return configured ChannelBuilder based on the configs.
      */
-    public static ChannelBuilder createChannelBuilder(AbstractConfig config, Time time, LogContext logContext) {
+    public static ChannelBuilder createChannelBuilder(AbstractConfig config, Time time, LogContext logContext) throws IOException {

Review comment:
       Rather than propagating IOException everywhere, couldn't we use one of the Kafka exceptions that is not a checked exception?

##########
File path: clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
##########
@@ -70,7 +73,7 @@ public static ChannelBuilder clientChannelBuilder(
             String clientSaslMechanism,
             Time time,
             boolean saslHandshakeRequestEnable,
-            LogContext logContext) {
+            LogContext logContext) throws IOException {

Review comment:
       As before, IOException propagation seems too noisy

##########
File path: clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java
##########
@@ -127,6 +158,131 @@ public KeyStore truststore() {
         return this.truststore != null ? this.truststore.get() : null;
     }
 
+    class SecurityFileChangeListener implements Runnable {
+        private final Timer keyStoreRefreshTimer;
+        private final Timer trustStoreRefreshTimer;
+        private final Map<WatchKey, Path> watchKeyPathMap = new HashMap<>();
+        private final Map<Path, SecurityStore> fileToStoreMap = new HashMap<>();
+        private final AtomicReference<Exception> lastLoadFailure = new AtomicReference<>(null);
+
+        SecurityFileChangeListener(final Timer keyStoreRefreshTimer,
+                                   final Timer trustStoreRefreshTimer) {
+            this.keyStoreRefreshTimer = keyStoreRefreshTimer;
+            this.trustStoreRefreshTimer = trustStoreRefreshTimer;
+        }
+
+        void updateStoreKey(SecurityStore store, final String watchFile) {
+            try {
+                Path filePath = Paths.get(watchFile);
+                fileToStoreMap.put(filePath, store);
+
+                Path dirPath = filePath.getParent();
+                if (dirPath == null) {
+                    throw new IOException("Unexpected null path with no parent");
+                }
+
+                if (!Files.exists(dirPath)) {
+                    Files.createDirectories(dirPath);
+                }
+                WatchKey watchkey = dirPath.register(watchService,
+                    StandardWatchEventKinds.ENTRY_CREATE,
+                    StandardWatchEventKinds.ENTRY_MODIFY,
+                    StandardWatchEventKinds.ENTRY_DELETE,
+                    StandardWatchEventKinds.OVERFLOW);
+                watchKeyPathMap.put(watchkey, dirPath);
+                log.info("Watch service registered for store path = {}", dirPath);
+            } catch (IOException e) {
+                // If the update failed, we will try to use existing store path instead.
+                log.error("Could not register store path for file {}", watchFile, e);
+            }
+        }
+
+        // For testing purpose now.
+        Exception lastLoadFailure() {
+            return lastLoadFailure.get();
+        }
+
+        public void run() {
+            for (Map.Entry<WatchKey, Path> key : watchKeyPathMap.entrySet()) {
+                log.debug("Starting listening for change key {} for path {}", key.getKey(), key.getValue());
+            }
+            resetKeyStoreTimer();
+            resetTrustStoreTimer();
+
+            try {
+                runLoop();
+            } catch (InterruptedException ie) {
+                log.debug("Security file listener {} was interrupted to shutdown", watchKeyPathMap);
+            } catch (Exception e) {
+                log.warn("Hit a fatal exception in security file listener", e);
+            }
+        }
+
+        private void runLoop() throws InterruptedException {
+            while (!watchKeyPathMap.isEmpty()) {
+                keyStoreRefreshTimer.update();
+                trustStoreRefreshTimer.update();
+                final long maxPollIntervalMs = Math.min(keyStoreRefreshTimer.remainingMs(),
+                    trustStoreRefreshTimer.remainingMs());
+                log.debug("Max poll interval is {} with trust store remaining time {} and trust store time {}",
+                    maxPollIntervalMs, trustStoreRefreshTimer.remainingMs(), trustStoreRefreshIntervalMs);
+                WatchKey watchKey = watchService.poll(maxPollIntervalMs, TimeUnit.MILLISECONDS);
+
+                // Handle file update triggered events.
+                if (watchKey != null && watchKeyPathMap.containsKey(watchKey)) {
+                    for (WatchEvent<?> event: watchKey.pollEvents()) {
+                        log.debug("Got file change event: {} {}", event.kind(), event.context());
+
+                        @SuppressWarnings("unchecked")
+                        Path filePath = watchKeyPathMap.get(watchKey).resolve(((WatchEvent<Path>) event).context());
+
+                        if (fileToStoreMap.containsKey(filePath)) {
+                            maybeReloadStore(fileToStoreMap.get(filePath));
+                            sslContext.set(createSSLContext(keystore, truststore));
+                        } else {
+                            log.debug("Unknown file name: {}", filePath);
+                        }
+                    }
+                    if (!watchKey.reset()) {
+                        watchKeyPathMap.remove(watchKey);
+                    }
+                }
+
+                keyStoreRefreshTimer.update();
+                if (keyStoreRefreshTimer.isExpired()) {
+                    maybeReloadStore(keystore);
+                    resetKeyStoreTimer();
+                }
+
+                trustStoreRefreshTimer.update();
+                if (trustStoreRefreshTimer.isExpired()) {
+                    maybeReloadStore(truststore);
+                    resetTrustStoreTimer();
+                }
+            }
+        }
+
+        private void resetKeyStoreTimer() {
+            keyStoreRefreshTimer.updateAndReset(keyStoreRefreshIntervalMs);
+        }
+
+        private void resetTrustStoreTimer() {
+            trustStoreRefreshTimer.updateAndReset(trustStoreRefreshIntervalMs);
+        }
+
+        private void maybeReloadStore(SecurityStore store) {
+            try {
+                if (!(store instanceof FileBasedStore))
+                    throw new IllegalStateException("Store " + store + " is expected to be file-based for reloading.");
+                ((FileBasedStore) store).reloadStore(store == keystore);
+                log.info("Reloaded store {}", store);
+            } catch (Exception e) {
+                log.error("Encountered a load failure for store {}", store, e);

Review comment:
       Do we have tests that verify that if the file was deleted or corrupted, we continue to work with the old stores?

##########
File path: clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
##########
@@ -249,6 +253,8 @@ public void close()  {
         for (AuthenticateCallbackHandler handler : saslCallbackHandlers.values())
             handler.close();
         if (sslFactory != null) sslFactory.close();
+
+        Utils.closeQuietly(watchService, "Watch service");

Review comment:
       If `watchService` is being propagated from outside because we want to share it or reuse it, we shouldn't be closing it here. Otherwise, we could avoid injecting it and let `SslFactory` create it.

##########
File path: clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java
##########
@@ -73,6 +84,7 @@
 
     private static final Logger log = LoggerFactory.getLogger(DefaultSslEngineFactory.class);
     public static final String PEM_TYPE = "PEM";
+    private static final long DEFAULT_SECURITY_STORE_REFRESH_INTERVAL_MS = 300_000L;

Review comment:
       Shouldn't defaults come from the config default?

##########
File path: clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
##########
@@ -103,7 +110,12 @@
     public static final String DEFAULT_SSL_TRUSTSTORE_TYPE = "JKS";
 
     public static final String SSL_TRUSTSTORE_LOCATION_CONFIG = "ssl.truststore.location";
-    public static final String SSL_TRUSTSTORE_LOCATION_DOC = "The location of the trust store file. ";
+    public static final String SSL_TRUSTSTORE_LOCATION_DOC = "The location of the trust store file.";
+
+    public static final String SSL_TRUSTSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG = "ssl.truststore.location.refresh.interval.ms";
+    public static final String SSL_TRUSTSTORE_LOCATION_REFRESH_INTERVAL_MS_DOC = "The refresh interval for in-place ssl truststore updates. In general, "
+        + "the update should trigger immediately when user modifies the security file path through file watch service, while "
+        + "this configuration is defining a time based guarantee of store reloading in worst case";

Review comment:
       Same comment for docs as keystore refresh above.

##########
File path: clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
##########
@@ -116,6 +120,7 @@ public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize
     @Override
     public void close() {
         if (sslFactory != null) sslFactory.close();
+        Utils.closeQuietly(watchService, "Watch service");

Review comment:
       As before, either let `SslFactory` create and delete this or use a shared watch service?

##########
File path: clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java
##########
@@ -81,12 +93,34 @@
     private String tmfAlgorithm;
     private SecurityStore keystore;
     private SecurityStore truststore;
+    private long keyStoreRefreshIntervalMs = DEFAULT_SECURITY_STORE_REFRESH_INTERVAL_MS;
+    private long trustStoreRefreshIntervalMs = DEFAULT_SECURITY_STORE_REFRESH_INTERVAL_MS;
+    private final SecurityFileChangeListener securityFileChangeListener;
+    private final Thread securityStoreRefreshThread;
+    private final WatchService watchService;
+
     private String[] cipherSuites;
     private String[] enabledProtocols;
     private SecureRandom secureRandomImplementation;
-    private SSLContext sslContext;
+    private AtomicReference<SSLContext> sslContext = new AtomicReference<>(null);
     private SslClientAuth sslClientAuth;
 
+    public DefaultSslEngineFactory(final WatchService watchService) {
+        this(watchService, SystemTime.SYSTEM);
+    }
+
+    // For testing only
+    DefaultSslEngineFactory(final WatchService watchService,
+                            final Time time) {
+        this.securityFileChangeListener = new SecurityFileChangeListener(time.timer(Long.MAX_VALUE), time.timer(Long.MAX_VALUE));
+        this.securityStoreRefreshThread = new Thread(securityFileChangeListener, "security-store-refresh-thread");

Review comment:
       Since this only applies to file-based key/trust stores, we should check if these are required before creating watcher/threads.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -441,7 +442,7 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
     }
 
     // visible for testing
-    Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
+    Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) throws IOException {

Review comment:
       Changes to this file can be reverted if we don't propagate IOException.

##########
File path: clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
##########
@@ -180,7 +183,8 @@ private static ChannelBuilder create(SecurityProtocol securityProtocol,
                         sslClientAuthOverride,
                         time,
                         logContext,
-                        apiVersionSupplier);
+                        apiVersionSupplier,
+                        createWatchService());

Review comment:
       Not sure what we decided - are we going to have a mechanism to disable watching, e.g. in clients?

##########
File path: clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
##########
@@ -86,9 +88,14 @@
         + "Default SSL engine factory supports only PEM format with X.509 certificates.";
 
     public static final String SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location";
-    public static final String SSL_KEYSTORE_LOCATION_DOC = "The location of the key store file. "
+    public static final String SSL_KEYSTORE_LOCATION_DOC = "The location of the key store file."
         + "This is optional for client and can be used for two-way authentication for client.";
 
+    public static final String SSL_KEYSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG = "ssl.keystore.location.refresh.interval.ms";
+    public static final String SSL_KEYSTORE_LOCATION_REFRESH_INTERVAL_MS_DOC = "The refresh interval for in-place ssl keystore updates. In general, "
+        + "the update should trigger immediately when user modifies the security file path through file watch service, while "
+        +  "this configuration is defining a time based guarantee of store reloading in worst case";

Review comment:
       We should clarify that this only applies to SSL keystores defined using `ssl.keystore.location` with the default SSL engine factory. It doesn't apply to custom factories or stores configured using `ssl.keystore.certificate.chain`. May also want to describe the behaviour if store parameters like password change.

##########
File path: clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorFailureDelayTest.java
##########
@@ -195,7 +195,7 @@ private TestJaasConfig configureMechanisms(String clientMechanism, List<String>
         return TestJaasConfig.createConfiguration(clientMechanism, serverMechanisms);
     }
 
-    private void createSelector(SecurityProtocol securityProtocol, Map<String, Object> clientConfigs) {
+    private void createSelector(SecurityProtocol securityProtocol, Map<String, Object> clientConfigs) throws IOException {

Review comment:
       I think quite a few of the files have changes that are related to either the propagation of IOException or the propagation of a watch service, both of which can be reverted.

##########
File path: clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java
##########
@@ -313,12 +322,124 @@ private String pemFilePath(String pem) throws Exception {
         return TestUtils.tempFile(pem).getAbsolutePath();
     }
 
-    private Password pemAsConfigValue(String... pemValues)  throws Exception {
+    private Password pemAsConfigValue(String... pemValues) {
         StringBuilder builder = new StringBuilder();
         for (String pem : pemValues) {
             builder.append(pem);
             builder.append("\n");
         }
         return new Password(builder.toString().trim());
     }
+
+    @Test
+    public void testKeyStoreFileTriggerReload() throws Exception {
+        MockTime time = new MockTime(0L, 0L, 0L);
+        DefaultSslEngineFactory factory = new DefaultSslEngineFactory(time);
+        configs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");
+        configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, 1000L);
+        configs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, Long.MAX_VALUE);
+
+        final String filePath = pemFilePath(pemAsConfigValue(ENCRYPTED_KEY, CERTCHAIN).value());
+
+        configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, filePath);
+        configs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, KEY_PASSWORD);
+        configs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, DefaultSslEngineFactory.PEM_TYPE);
+        factory.configure(configs);
+
+        // Make sure the thread starts to listen for file changes.
+        sleep(1000);

Review comment:
       Yes, we should remove `sleep` in the tests and ensure they work without them.

##########
File path: clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java
##########
@@ -127,6 +158,131 @@ public KeyStore truststore() {
         return this.truststore != null ? this.truststore.get() : null;
     }
 
+    class SecurityFileChangeListener implements Runnable {
+        private final Timer keyStoreRefreshTimer;
+        private final Timer trustStoreRefreshTimer;
+        private final Map<WatchKey, Path> watchKeyPathMap = new HashMap<>();
+        private final Map<Path, SecurityStore> fileToStoreMap = new HashMap<>();
+        private final AtomicReference<Exception> lastLoadFailure = new AtomicReference<>(null);
+
+        SecurityFileChangeListener(final Timer keyStoreRefreshTimer,
+                                   final Timer trustStoreRefreshTimer) {
+            this.keyStoreRefreshTimer = keyStoreRefreshTimer;
+            this.trustStoreRefreshTimer = trustStoreRefreshTimer;
+        }
+
+        void updateStoreKey(SecurityStore store, final String watchFile) {
+            try {
+                Path filePath = Paths.get(watchFile);
+                fileToStoreMap.put(filePath, store);
+
+                Path dirPath = filePath.getParent();
+                if (dirPath == null) {
+                    throw new IOException("Unexpected null path with no parent");
+                }
+
+                if (!Files.exists(dirPath)) {
+                    Files.createDirectories(dirPath);
+                }
+                WatchKey watchkey = dirPath.register(watchService,
+                    StandardWatchEventKinds.ENTRY_CREATE,
+                    StandardWatchEventKinds.ENTRY_MODIFY,
+                    StandardWatchEventKinds.ENTRY_DELETE,
+                    StandardWatchEventKinds.OVERFLOW);
+                watchKeyPathMap.put(watchkey, dirPath);
+                log.info("Watch service registered for store path = {}", dirPath);
+            } catch (IOException e) {
+                // If the update failed, we will try to use existing store path instead.
+                log.error("Could not register store path for file {}", watchFile, e);
+            }
+        }
+
+        // For testing purpose now.
+        Exception lastLoadFailure() {
+            return lastLoadFailure.get();
+        }
+
+        public void run() {
+            for (Map.Entry<WatchKey, Path> key : watchKeyPathMap.entrySet()) {
+                log.debug("Starting listening for change key {} for path {}", key.getKey(), key.getValue());
+            }
+            resetKeyStoreTimer();
+            resetTrustStoreTimer();
+
+            try {
+                runLoop();
+            } catch (InterruptedException ie) {
+                log.debug("Security file listener {} was interrupted to shutdown", watchKeyPathMap);
+            } catch (Exception e) {
+                log.warn("Hit a fatal exception in security file listener", e);

Review comment:
       Should this be at ERROR level?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org