You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/18 21:56:17 UTC

[GitHub] [kafka] abbccdda opened a new pull request #10156: KAFKA-10345 (WIP): File watch store reloading

abbccdda opened a new pull request #10156:
URL: https://github.com/apache/kafka/pull/10156


   Add file-based store reloading mechanism, which does both file watch triggering and time based reloading in a separate thread.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #10156: KAFKA-10345 (WIP): File watch store reloading

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #10156:
URL: https://github.com/apache/kafka/pull/10156#discussion_r580469819



##########
File path: clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java
##########
@@ -295,7 +442,7 @@ else if (keyPassword == null)
             throw new InvalidConfigurationException("SSL key store is not specified, but key store password is specified.");
         } else if (path != null && password == null) {
             throw new InvalidConfigurationException("SSL key store is specified, but key store password is not specified.");
-        } else if (path != null && password != null) {
+        } else if (path != null) {

Review comment:
       cleanup of redundant logic




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #10156: KAFKA-10345: File watch store reloading

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #10156:
URL: https://github.com/apache/kafka/pull/10156#discussion_r642939618



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
##########
@@ -98,7 +99,7 @@ private ClientUtils() {
      *
      * @return configured ChannelBuilder based on the configs.
      */
-    public static ChannelBuilder createChannelBuilder(AbstractConfig config, Time time, LogContext logContext) {
+    public static ChannelBuilder createChannelBuilder(AbstractConfig config, Time time, LogContext logContext) throws IOException {

Review comment:
       Rather than propagating IOException everywhere, couldn't we use one of the Kafka exceptions that is not a checked exception?

##########
File path: clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
##########
@@ -70,7 +73,7 @@ public static ChannelBuilder clientChannelBuilder(
             String clientSaslMechanism,
             Time time,
             boolean saslHandshakeRequestEnable,
-            LogContext logContext) {
+            LogContext logContext) throws IOException {

Review comment:
       As before, IOException propagation seems too noisy

##########
File path: clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java
##########
@@ -127,6 +158,131 @@ public KeyStore truststore() {
         return this.truststore != null ? this.truststore.get() : null;
     }
 
+    class SecurityFileChangeListener implements Runnable {
+        private final Timer keyStoreRefreshTimer;
+        private final Timer trustStoreRefreshTimer;
+        private final Map<WatchKey, Path> watchKeyPathMap = new HashMap<>();
+        private final Map<Path, SecurityStore> fileToStoreMap = new HashMap<>();
+        private final AtomicReference<Exception> lastLoadFailure = new AtomicReference<>(null);
+
+        SecurityFileChangeListener(final Timer keyStoreRefreshTimer,
+                                   final Timer trustStoreRefreshTimer) {
+            this.keyStoreRefreshTimer = keyStoreRefreshTimer;
+            this.trustStoreRefreshTimer = trustStoreRefreshTimer;
+        }
+
+        void updateStoreKey(SecurityStore store, final String watchFile) {
+            try {
+                Path filePath = Paths.get(watchFile);
+                fileToStoreMap.put(filePath, store);
+
+                Path dirPath = filePath.getParent();
+                if (dirPath == null) {
+                    throw new IOException("Unexpected null path with no parent");
+                }
+
+                if (!Files.exists(dirPath)) {
+                    Files.createDirectories(dirPath);
+                }
+                WatchKey watchkey = dirPath.register(watchService,
+                    StandardWatchEventKinds.ENTRY_CREATE,
+                    StandardWatchEventKinds.ENTRY_MODIFY,
+                    StandardWatchEventKinds.ENTRY_DELETE,
+                    StandardWatchEventKinds.OVERFLOW);
+                watchKeyPathMap.put(watchkey, dirPath);
+                log.info("Watch service registered for store path = {}", dirPath);
+            } catch (IOException e) {
+                // If the update failed, we will try to use existing store path instead.
+                log.error("Could not register store path for file {}", watchFile, e);
+            }
+        }
+
+        // For testing purpose now.
+        Exception lastLoadFailure() {
+            return lastLoadFailure.get();
+        }
+
+        public void run() {
+            for (Map.Entry<WatchKey, Path> key : watchKeyPathMap.entrySet()) {
+                log.debug("Starting listening for change key {} for path {}", key.getKey(), key.getValue());
+            }
+            resetKeyStoreTimer();
+            resetTrustStoreTimer();
+
+            try {
+                runLoop();
+            } catch (InterruptedException ie) {
+                log.debug("Security file listener {} was interrupted to shutdown", watchKeyPathMap);
+            } catch (Exception e) {
+                log.warn("Hit a fatal exception in security file listener", e);
+            }
+        }
+
+        private void runLoop() throws InterruptedException {
+            while (!watchKeyPathMap.isEmpty()) {
+                keyStoreRefreshTimer.update();
+                trustStoreRefreshTimer.update();
+                final long maxPollIntervalMs = Math.min(keyStoreRefreshTimer.remainingMs(),
+                    trustStoreRefreshTimer.remainingMs());
+                log.debug("Max poll interval is {} with trust store remaining time {} and trust store time {}",
+                    maxPollIntervalMs, trustStoreRefreshTimer.remainingMs(), trustStoreRefreshIntervalMs);
+                WatchKey watchKey = watchService.poll(maxPollIntervalMs, TimeUnit.MILLISECONDS);
+
+                // Handle file update triggered events.
+                if (watchKey != null && watchKeyPathMap.containsKey(watchKey)) {
+                    for (WatchEvent<?> event: watchKey.pollEvents()) {
+                        log.debug("Got file change event: {} {}", event.kind(), event.context());
+
+                        @SuppressWarnings("unchecked")
+                        Path filePath = watchKeyPathMap.get(watchKey).resolve(((WatchEvent<Path>) event).context());
+
+                        if (fileToStoreMap.containsKey(filePath)) {
+                            maybeReloadStore(fileToStoreMap.get(filePath));
+                            sslContext.set(createSSLContext(keystore, truststore));
+                        } else {
+                            log.debug("Unknown file name: {}", filePath);
+                        }
+                    }
+                    if (!watchKey.reset()) {
+                        watchKeyPathMap.remove(watchKey);
+                    }
+                }
+
+                keyStoreRefreshTimer.update();
+                if (keyStoreRefreshTimer.isExpired()) {
+                    maybeReloadStore(keystore);
+                    resetKeyStoreTimer();
+                }
+
+                trustStoreRefreshTimer.update();
+                if (trustStoreRefreshTimer.isExpired()) {
+                    maybeReloadStore(truststore);
+                    resetTrustStoreTimer();
+                }
+            }
+        }
+
+        private void resetKeyStoreTimer() {
+            keyStoreRefreshTimer.updateAndReset(keyStoreRefreshIntervalMs);
+        }
+
+        private void resetTrustStoreTimer() {
+            trustStoreRefreshTimer.updateAndReset(trustStoreRefreshIntervalMs);
+        }
+
+        private void maybeReloadStore(SecurityStore store) {
+            try {
+                if (!(store instanceof FileBasedStore))
+                    throw new IllegalStateException("Store " + store + " is expected to be file-based for reloading.");
+                ((FileBasedStore) store).reloadStore(store == keystore);
+                log.info("Reloaded store {}", store);
+            } catch (Exception e) {
+                log.error("Encountered a load failure for store {}", store, e);

Review comment:
       Do we have tests that verify that if the file was deleted or corrupted, we continue to work with the old stores?

##########
File path: clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
##########
@@ -249,6 +253,8 @@ public void close()  {
         for (AuthenticateCallbackHandler handler : saslCallbackHandlers.values())
             handler.close();
         if (sslFactory != null) sslFactory.close();
+
+        Utils.closeQuietly(watchService, "Watch service");

Review comment:
       If `watchService` is being propagated from outside because we want to share it or reuse it, we shouldn't be closing it here. Otherwise, we could avoid injecting it and let `SslFactory` create it.

##########
File path: clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java
##########
@@ -73,6 +84,7 @@
 
     private static final Logger log = LoggerFactory.getLogger(DefaultSslEngineFactory.class);
     public static final String PEM_TYPE = "PEM";
+    private static final long DEFAULT_SECURITY_STORE_REFRESH_INTERVAL_MS = 300_000L;

Review comment:
       Shouldn't defaults come from the config default?

##########
File path: clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
##########
@@ -103,7 +110,12 @@
     public static final String DEFAULT_SSL_TRUSTSTORE_TYPE = "JKS";
 
     public static final String SSL_TRUSTSTORE_LOCATION_CONFIG = "ssl.truststore.location";
-    public static final String SSL_TRUSTSTORE_LOCATION_DOC = "The location of the trust store file. ";
+    public static final String SSL_TRUSTSTORE_LOCATION_DOC = "The location of the trust store file.";
+
+    public static final String SSL_TRUSTSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG = "ssl.truststore.location.refresh.interval.ms";
+    public static final String SSL_TRUSTSTORE_LOCATION_REFRESH_INTERVAL_MS_DOC = "The refresh interval for in-place ssl truststore updates. In general, "
+        + "the update should trigger immediately when user modifies the security file path through file watch service, while "
+        + "this configuration is defining a time based guarantee of store reloading in worst case";

Review comment:
       Same comment for docs as keystore refresh above.

##########
File path: clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
##########
@@ -116,6 +120,7 @@ public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize
     @Override
     public void close() {
         if (sslFactory != null) sslFactory.close();
+        Utils.closeQuietly(watchService, "Watch service");

Review comment:
       As before, either let `SslFactory` create and delete this or use a shared watch service?

##########
File path: clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java
##########
@@ -81,12 +93,34 @@
     private String tmfAlgorithm;
     private SecurityStore keystore;
     private SecurityStore truststore;
+    private long keyStoreRefreshIntervalMs = DEFAULT_SECURITY_STORE_REFRESH_INTERVAL_MS;
+    private long trustStoreRefreshIntervalMs = DEFAULT_SECURITY_STORE_REFRESH_INTERVAL_MS;
+    private final SecurityFileChangeListener securityFileChangeListener;
+    private final Thread securityStoreRefreshThread;
+    private final WatchService watchService;
+
     private String[] cipherSuites;
     private String[] enabledProtocols;
     private SecureRandom secureRandomImplementation;
-    private SSLContext sslContext;
+    private AtomicReference<SSLContext> sslContext = new AtomicReference<>(null);
     private SslClientAuth sslClientAuth;
 
+    public DefaultSslEngineFactory(final WatchService watchService) {
+        this(watchService, SystemTime.SYSTEM);
+    }
+
+    // For testing only
+    DefaultSslEngineFactory(final WatchService watchService,
+                            final Time time) {
+        this.securityFileChangeListener = new SecurityFileChangeListener(time.timer(Long.MAX_VALUE), time.timer(Long.MAX_VALUE));
+        this.securityStoreRefreshThread = new Thread(securityFileChangeListener, "security-store-refresh-thread");

Review comment:
       Since this only applies to file-based key/trust stores, we should check if these are required before creating watcher/threads.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -441,7 +442,7 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
     }
 
     // visible for testing
-    Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
+    Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) throws IOException {

Review comment:
       Changes to this file can be reverted if we don't propagate IOException.

##########
File path: clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
##########
@@ -180,7 +183,8 @@ private static ChannelBuilder create(SecurityProtocol securityProtocol,
                         sslClientAuthOverride,
                         time,
                         logContext,
-                        apiVersionSupplier);
+                        apiVersionSupplier,
+                        createWatchService());

Review comment:
       Not sure what we decided - are we going to have a mechanism to disable watching, e.g. in clients?

##########
File path: clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
##########
@@ -86,9 +88,14 @@
         + "Default SSL engine factory supports only PEM format with X.509 certificates.";
 
     public static final String SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location";
-    public static final String SSL_KEYSTORE_LOCATION_DOC = "The location of the key store file. "
+    public static final String SSL_KEYSTORE_LOCATION_DOC = "The location of the key store file."
         + "This is optional for client and can be used for two-way authentication for client.";
 
+    public static final String SSL_KEYSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG = "ssl.keystore.location.refresh.interval.ms";
+    public static final String SSL_KEYSTORE_LOCATION_REFRESH_INTERVAL_MS_DOC = "The refresh interval for in-place ssl keystore updates. In general, "
+        + "the update should trigger immediately when user modifies the security file path through file watch service, while "
+        +  "this configuration is defining a time based guarantee of store reloading in worst case";

Review comment:
       We should clarify that this only applies to SSL keystores defined using `ssl.keystore.location` with the default SSL engine factory. It doesn't apply to custom factories or stores configured using `ssl.keystore.certificate.chain`. May also want to describe the behaviour if store parameters like password change.

##########
File path: clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorFailureDelayTest.java
##########
@@ -195,7 +195,7 @@ private TestJaasConfig configureMechanisms(String clientMechanism, List<String>
         return TestJaasConfig.createConfiguration(clientMechanism, serverMechanisms);
     }
 
-    private void createSelector(SecurityProtocol securityProtocol, Map<String, Object> clientConfigs) {
+    private void createSelector(SecurityProtocol securityProtocol, Map<String, Object> clientConfigs) throws IOException {

Review comment:
       I think quite a few of the files have changes that are related to either the propagation of IOException or the propagation of a watch service, both of which can be reverted.

##########
File path: clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java
##########
@@ -313,12 +322,124 @@ private String pemFilePath(String pem) throws Exception {
         return TestUtils.tempFile(pem).getAbsolutePath();
     }
 
-    private Password pemAsConfigValue(String... pemValues)  throws Exception {
+    private Password pemAsConfigValue(String... pemValues) {
         StringBuilder builder = new StringBuilder();
         for (String pem : pemValues) {
             builder.append(pem);
             builder.append("\n");
         }
         return new Password(builder.toString().trim());
     }
+
+    @Test
+    public void testKeyStoreFileTriggerReload() throws Exception {
+        MockTime time = new MockTime(0L, 0L, 0L);
+        DefaultSslEngineFactory factory = new DefaultSslEngineFactory(time);
+        configs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");
+        configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, 1000L);
+        configs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, Long.MAX_VALUE);
+
+        final String filePath = pemFilePath(pemAsConfigValue(ENCRYPTED_KEY, CERTCHAIN).value());
+
+        configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, filePath);
+        configs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, KEY_PASSWORD);
+        configs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, DefaultSslEngineFactory.PEM_TYPE);
+        factory.configure(configs);
+
+        // Make sure the thread starts to listen for file changes.
+        sleep(1000);

Review comment:
       Yes, we should remove `sleep` in the tests and ensure they work without them.

##########
File path: clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java
##########
@@ -127,6 +158,131 @@ public KeyStore truststore() {
         return this.truststore != null ? this.truststore.get() : null;
     }
 
+    class SecurityFileChangeListener implements Runnable {
+        private final Timer keyStoreRefreshTimer;
+        private final Timer trustStoreRefreshTimer;
+        private final Map<WatchKey, Path> watchKeyPathMap = new HashMap<>();
+        private final Map<Path, SecurityStore> fileToStoreMap = new HashMap<>();
+        private final AtomicReference<Exception> lastLoadFailure = new AtomicReference<>(null);
+
+        SecurityFileChangeListener(final Timer keyStoreRefreshTimer,
+                                   final Timer trustStoreRefreshTimer) {
+            this.keyStoreRefreshTimer = keyStoreRefreshTimer;
+            this.trustStoreRefreshTimer = trustStoreRefreshTimer;
+        }
+
+        void updateStoreKey(SecurityStore store, final String watchFile) {
+            try {
+                Path filePath = Paths.get(watchFile);
+                fileToStoreMap.put(filePath, store);
+
+                Path dirPath = filePath.getParent();
+                if (dirPath == null) {
+                    throw new IOException("Unexpected null path with no parent");
+                }
+
+                if (!Files.exists(dirPath)) {
+                    Files.createDirectories(dirPath);
+                }
+                WatchKey watchkey = dirPath.register(watchService,
+                    StandardWatchEventKinds.ENTRY_CREATE,
+                    StandardWatchEventKinds.ENTRY_MODIFY,
+                    StandardWatchEventKinds.ENTRY_DELETE,
+                    StandardWatchEventKinds.OVERFLOW);
+                watchKeyPathMap.put(watchkey, dirPath);
+                log.info("Watch service registered for store path = {}", dirPath);
+            } catch (IOException e) {
+                // If the update failed, we will try to use existing store path instead.
+                log.error("Could not register store path for file {}", watchFile, e);
+            }
+        }
+
+        // For testing purpose now.
+        Exception lastLoadFailure() {
+            return lastLoadFailure.get();
+        }
+
+        public void run() {
+            for (Map.Entry<WatchKey, Path> key : watchKeyPathMap.entrySet()) {
+                log.debug("Starting listening for change key {} for path {}", key.getKey(), key.getValue());
+            }
+            resetKeyStoreTimer();
+            resetTrustStoreTimer();
+
+            try {
+                runLoop();
+            } catch (InterruptedException ie) {
+                log.debug("Security file listener {} was interrupted to shutdown", watchKeyPathMap);
+            } catch (Exception e) {
+                log.warn("Hit a fatal exception in security file listener", e);

Review comment:
       Should this be at ERROR level?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #10156: KAFKA-10345: File watch store reloading

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #10156:
URL: https://github.com/apache/kafka/pull/10156#discussion_r633178435



##########
File path: clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
##########
@@ -579,6 +583,9 @@ public SslConfigsBuilder usePem(boolean usePem) {
 
         DefaultSslEngineFactory defaultSslEngineFactory = new DefaultSslEngineFactory();
 
+        public TestSslEngineFactory() throws IOException {

Review comment:
       It is needed since IOException needs to be thrown.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #10156: KAFKA-10345: File watch store reloading

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #10156:
URL: https://github.com/apache/kafka/pull/10156#discussion_r633178155



##########
File path: clients/src/main/java/org/apache/kafka/common/utils/Timer.java
##########
@@ -158,6 +158,10 @@ public long remainingMs() {
         return Math.max(0, deadlineMs - currentTimeMs);
     }
 
+    public long getDeadlineMs() {

Review comment:
       Seems ok to remove

##########
File path: clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
##########
@@ -579,6 +583,9 @@ public SslConfigsBuilder usePem(boolean usePem) {
 
         DefaultSslEngineFactory defaultSslEngineFactory = new DefaultSslEngineFactory();
 
+        public TestSslEngineFactory() throws IOException {

Review comment:
       It is needed since IOException needs to be thrown.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] soarez commented on a change in pull request #10156: KAFKA-10345: File watch store reloading

Posted by GitBox <gi...@apache.org>.
soarez commented on a change in pull request #10156:
URL: https://github.com/apache/kafka/pull/10156#discussion_r628515373



##########
File path: clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java
##########
@@ -81,12 +94,31 @@
     private String tmfAlgorithm;
     private SecurityStore keystore;
     private SecurityStore truststore;
+    private long keyStoreRefreshIntervalMs = DEFAULT_SECURITY_STORE_REFRESH_INTERVAL_MS;
+    private long trustStoreRefreshIntervalMs = DEFAULT_SECURITY_STORE_REFRESH_INTERVAL_MS;
+    private final SecurityFileChangeListener securityFileChangeListener;
+    private final Thread securityStoreRefreshThread;
+
     private String[] cipherSuites;
     private String[] enabledProtocols;
     private SecureRandom secureRandomImplementation;
-    private SSLContext sslContext;
+    private AtomicReference<SSLContext> sslContext = new AtomicReference<>(null);
     private SslClientAuth sslClientAuth;
 
+    public DefaultSslEngineFactory() throws IOException {
+        this(SystemTime.SYSTEM);
+    }
+
+    // For testing only
+    public DefaultSslEngineFactory(Time time) throws IOException {

Review comment:
       Should this be public?

##########
File path: clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java
##########
@@ -127,6 +156,138 @@ public KeyStore truststore() {
         return this.truststore != null ? this.truststore.get() : null;
     }
 
+    class SecurityFileChangeListener implements Runnable {
+        private final Timer keyStoreRefreshTimer;
+        private final Timer trustStoreRefreshTimer;
+        private final WatchService watchService;
+        private final Map<WatchKey, Path> watchKeyPathMap = new HashMap<>();
+        private final Map<Path, SecurityStore> fileToStoreMap = new HashMap<>();
+        private final AtomicReference<Exception> lastLoadFailure = new AtomicReference<>(null);

Review comment:
       Seeing as the value is only ever set and read, could this be simply `private volatile Exception lastLoadFailure`?

##########
File path: clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java
##########
@@ -127,6 +156,138 @@ public KeyStore truststore() {
         return this.truststore != null ? this.truststore.get() : null;
     }
 
+    class SecurityFileChangeListener implements Runnable {
+        private final Timer keyStoreRefreshTimer;
+        private final Timer trustStoreRefreshTimer;
+        private final WatchService watchService;
+        private final Map<WatchKey, Path> watchKeyPathMap = new HashMap<>();
+        private final Map<Path, SecurityStore> fileToStoreMap = new HashMap<>();
+        private final AtomicReference<Exception> lastLoadFailure = new AtomicReference<>(null);
+
+        SecurityFileChangeListener(final Timer keyStoreRefreshTimer,
+                                   final Timer trustStoreRefreshTimer) throws IOException {
+            this.keyStoreRefreshTimer = keyStoreRefreshTimer;
+            this.trustStoreRefreshTimer = trustStoreRefreshTimer;
+            try {
+                watchService = FileSystems.getDefault().newWatchService();
+            } catch (IOException e) {
+                log.error("Failed to run the listener thread due to IO exception", e);
+                throw e;
+            }
+        }
+
+        void updateStoreKey(SecurityStore store, final String watchFile) {
+            try {
+                Path filePath = Paths.get(watchFile);
+                fileToStoreMap.put(filePath, store);
+
+                Path dirPath = filePath.getParent();
+                if (dirPath == null) {
+                    throw new IOException("Unexpected null path with no parent");
+                }
+
+                if (!Files.exists(dirPath)) {
+                    Files.createDirectories(dirPath);
+                }
+                WatchKey watchkey = dirPath.register(watchService,
+                    StandardWatchEventKinds.ENTRY_CREATE,
+                    StandardWatchEventKinds.ENTRY_MODIFY,
+                    StandardWatchEventKinds.ENTRY_DELETE,
+                    StandardWatchEventKinds.OVERFLOW);
+                watchKeyPathMap.put(watchkey, dirPath);
+                log.info("Watch service registered for store path = {}", dirPath);
+            } catch (IOException e) {
+                // If the update failed, we will try to use existing store path instead.
+                log.error("Could not register store path for file {}", watchFile, e);
+            }
+        }
+
+        // For testing purpose now.
+        Exception lastLoadFailure() {
+            return lastLoadFailure.get();
+        }
+
+        public void run() {
+            for (Map.Entry<WatchKey, Path> key : watchKeyPathMap.entrySet()) {
+                log.debug("Starting listening for change key {} for path {}", key.getKey(), key.getValue());
+            }
+            resetKeyStoreTimer();
+            resetTrustStoreTimer();
+
+            try {
+                runLoop();
+            } catch (InterruptedException ie) {
+                log.debug("Security file listener {} was interrupted to shutdown", watchKeyPathMap);
+            } catch (Exception e) {
+                log.warn("Hit a fatal exception in security file listener", e);
+            }
+        }
+
+        private void runLoop() throws InterruptedException {
+            while (!watchKeyPathMap.isEmpty()) {
+                keyStoreRefreshTimer.update();
+                trustStoreRefreshTimer.update();
+                final long maxPollIntervalMs = Math.min(keyStoreRefreshTimer.remainingMs(),
+                    trustStoreRefreshTimer.remainingMs());
+                log.debug("Max poll interval is {} with trust store remaining time {} and trust store time {}",
+                    maxPollIntervalMs, trustStoreRefreshTimer.remainingMs(), trustStoreRefreshIntervalMs);
+                WatchKey watchKey = watchService.poll(maxPollIntervalMs, TimeUnit.MILLISECONDS);
+
+                // Handle file update triggered events.
+                if (watchKey != null && watchKeyPathMap.containsKey(watchKey)) {
+                    for (WatchEvent<?> event: watchKey.pollEvents()) {
+                        log.debug("Got file change event: {} {}", event.kind(), event.context());
+
+                        @SuppressWarnings("unchecked")
+                        Path filePath = watchKeyPathMap.get(watchKey).resolve(((WatchEvent<Path>) event).context());
+
+                        if (fileToStoreMap.containsKey(filePath)) {
+                            maybeReloadStore(fileToStoreMap.get(filePath));
+                            sslContext.set(createSSLContext(keystore, truststore));
+                        } else {
+                            log.debug("Unknown file name: {}", filePath);
+                        }
+                    }
+                    if (!watchKey.reset()) {
+                        watchKeyPathMap.remove(watchKey);

Review comment:
       This will stop the watch on the file, right? Should there be a warning logged indicating the file is no longer watched?

##########
File path: clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java
##########
@@ -313,12 +322,124 @@ private String pemFilePath(String pem) throws Exception {
         return TestUtils.tempFile(pem).getAbsolutePath();
     }
 
-    private Password pemAsConfigValue(String... pemValues)  throws Exception {
+    private Password pemAsConfigValue(String... pemValues) {
         StringBuilder builder = new StringBuilder();
         for (String pem : pemValues) {
             builder.append(pem);
             builder.append("\n");
         }
         return new Password(builder.toString().trim());
     }
+
+    @Test
+    public void testKeyStoreFileTriggerReload() throws Exception {
+        MockTime time = new MockTime(0L, 0L, 0L);
+        DefaultSslEngineFactory factory = new DefaultSslEngineFactory(time);
+        configs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");
+        configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, 1000L);
+        configs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, Long.MAX_VALUE);
+
+        final String filePath = pemFilePath(pemAsConfigValue(ENCRYPTED_KEY, CERTCHAIN).value());
+
+        configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, filePath);
+        configs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, KEY_PASSWORD);
+        configs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, DefaultSslEngineFactory.PEM_TYPE);
+        factory.configure(configs);
+
+        // Make sure the thread starts to listen for file changes.
+        sleep(1000);

Review comment:
       I don't think this is necessary. The SecurityFileChangeListener thread may not yet have started, but the watch services are already registered after `factory.configure(configs)`. The file change below should queue a change even if the thread hasn't started.

##########
File path: clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
##########
@@ -579,6 +583,9 @@ public SslConfigsBuilder usePem(boolean usePem) {
 
         DefaultSslEngineFactory defaultSslEngineFactory = new DefaultSslEngineFactory();
 
+        public TestSslEngineFactory() throws IOException {

Review comment:
       Is this needed?

##########
File path: clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java
##########
@@ -313,12 +322,124 @@ private String pemFilePath(String pem) throws Exception {
         return TestUtils.tempFile(pem).getAbsolutePath();
     }
 
-    private Password pemAsConfigValue(String... pemValues)  throws Exception {
+    private Password pemAsConfigValue(String... pemValues) {
         StringBuilder builder = new StringBuilder();
         for (String pem : pemValues) {
             builder.append(pem);
             builder.append("\n");
         }
         return new Password(builder.toString().trim());
     }
+
+    @Test
+    public void testKeyStoreFileTriggerReload() throws Exception {
+        MockTime time = new MockTime(0L, 0L, 0L);
+        DefaultSslEngineFactory factory = new DefaultSslEngineFactory(time);
+        configs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");
+        configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, 1000L);
+        configs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, Long.MAX_VALUE);
+
+        final String filePath = pemFilePath(pemAsConfigValue(ENCRYPTED_KEY, CERTCHAIN).value());
+
+        configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, filePath);
+        configs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, KEY_PASSWORD);
+        configs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, DefaultSslEngineFactory.PEM_TYPE);
+        factory.configure(configs);
+
+        // Make sure the thread starts to listen for file changes.
+        sleep(1000);
+
+        final FileWriter writer = new FileWriter(filePath);
+        writer.write(pemAsConfigValue(KEY, CERTCHAIN).value());
+        writer.flush();
+        writer.close();
+
+        TestUtils.waitForCondition(() -> factory.securityFileChangeListener().lastLoadFailure() != null,
+            "key store not reloaded or encountered expected failure");
+    }
+
+    @Test
+    public void testKeyStoreTimeBasedReload() throws Exception {
+        MockTime time = new MockTime(0L, 0L, 0L);
+        DefaultSslEngineFactory factory = new DefaultSslEngineFactory(time);
+        configs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");
+        configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, 1000L);
+        configs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, Long.MAX_VALUE);
+
+        final String filePath = pemFilePath(pemAsConfigValue(ENCRYPTED_KEY, CERTCHAIN).value());
+
+        configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, filePath);
+        configs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, KEY_PASSWORD);
+        configs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, DefaultSslEngineFactory.PEM_TYPE);
+        factory.configure(configs);
+
+        final FileWriter writer = new FileWriter(filePath);
+        writer.write(pemAsConfigValue(KEY, CERTCHAIN).value());
+        writer.flush();
+        writer.close();
+
+        time.setCurrentTimeMs(1200L);
+        TestUtils.waitForCondition(() -> factory.securityFileChangeListener().lastLoadFailure() != null,
+            "key store not reloaded or encountered expected failure");
+    }
+
+
+
+    @Test
+    public void testTrustStoreFileTriggerReload() throws Exception {
+        DefaultSslEngineFactory factory = new DefaultSslEngineFactory();
+        configs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");
+        configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, 1000L);
+        configs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_REFRESH_INTERVAL_MS_CONFIG, 1000L);
+
+        final String filePath = pemFilePath(CA1);
+
+        configs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, filePath);
+        configs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, DefaultSslEngineFactory.PEM_TYPE);
+        factory.configure(configs);
+
+        // Make sure the thread starts to listen for file changes.
+        sleep(1000);

Review comment:
       Same here

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/Timer.java
##########
@@ -158,6 +158,10 @@ public long remainingMs() {
         return Math.max(0, deadlineMs - currentTimeMs);
     }
 
+    public long getDeadlineMs() {

Review comment:
       I don't see where this is used. Is it necessary?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #10156: KAFKA-10345: File watch store reloading

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #10156:
URL: https://github.com/apache/kafka/pull/10156#discussion_r633178155



##########
File path: clients/src/main/java/org/apache/kafka/common/utils/Timer.java
##########
@@ -158,6 +158,10 @@ public long remainingMs() {
         return Math.max(0, deadlineMs - currentTimeMs);
     }
 
+    public long getDeadlineMs() {

Review comment:
       Seems ok to remove




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #10156: KAFKA-10345: File watch store reloading

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #10156:
URL: https://github.com/apache/kafka/pull/10156#discussion_r584949583



##########
File path: clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java
##########
@@ -106,10 +138,7 @@ public boolean shouldBeRebuilt(Map<String, Object> nextConfigs) {
         if (truststore != null && truststore.modified()) {
             return true;
         }
-        if (keystore != null && keystore.modified()) {

Review comment:
       logic cleanup




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #10156: KAFKA-10345: File watch store reloading

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #10156:
URL: https://github.com/apache/kafka/pull/10156#discussion_r584950105



##########
File path: clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java
##########
@@ -211,15 +390,15 @@ private SSLEngine createSslEngine(Mode mode, String peerHost, int peerPort, Stri
         }
         return sslEngine;
     }
+
     private static SslClientAuth createSslClientAuth(String key) {
         SslClientAuth auth = SslClientAuth.forConfig(key);
         if (auth != null) {
             return auth;
         }
         log.warn("Unrecognized client authentication configuration {}.  Falling " +
                 "back to NONE.  Recognized client authentication configurations are {}.",
-                key, String.join(", ", SslClientAuth.VALUES.stream().
-                        map(Enum::name).collect(Collectors.toList())));
+                key, SslClientAuth.VALUES.stream().map(Enum::name).collect(Collectors.joining(", ")));

Review comment:
       side cleaning




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org