You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2019/04/05 18:42:39 UTC

[kafka] branch trunk updated: KAFKA-8190; Don't update keystore modification time during validation (#6539)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 844120c  KAFKA-8190; Don't update keystore modification time during validation (#6539)
844120c is described below

commit 844120c601e87693e540fa98a9cdb4d0272f5601
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Apr 5 19:42:28 2019 +0100

    KAFKA-8190; Don't update keystore modification time during validation (#6539)
    
    Ensure that modification time is checked against the file used to create the SSLContext that is in-use so that SSLContext is updated whenever file is modified and a config update request is received.
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
---
 .../kafka/common/security/ssl/SslFactory.java      | 16 ++++++++++-----
 .../kafka/common/security/ssl/SslFactoryTest.java  |  7 +++++++
 .../server/DynamicBrokerReconfigurationTest.scala  | 23 ++++++++++++++++++++++
 3 files changed, 41 insertions(+), 5 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index b9b5203..0c5094d 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -139,6 +139,7 @@ public class SslFactory implements Reconfigurable {
                          (Password) configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
         try {
             this.sslContext = createSSLContext(keystore, truststore);
+            log.debug("Created SSL context with keystore {} truststore {}", keystore, truststore);
         } catch (Exception e) {
             throw new KafkaException(e);
         }
@@ -173,6 +174,7 @@ public class SslFactory implements Reconfigurable {
                 SecurityStore keystore = newKeystore != null ? newKeystore : this.keystore;
                 SecurityStore truststore = newTruststore != null ? newTruststore : this.truststore;
                 this.sslContext = createSSLContext(keystore, truststore);
+                log.info("Created new SSL context with keystore {} truststore {}", keystore, truststore);
                 this.keystore = keystore;
                 this.truststore = truststore;
             } catch (Exception e) {
@@ -316,7 +318,7 @@ public class SslFactory implements Reconfigurable {
         private final String path;
         private final Password password;
         private final Password keyPassword;
-        private Long fileLastModifiedMs;
+        private final Long fileLastModifiedMs;
 
         SecurityStore(String type, String path, Password password, Password keyPassword) {
             Objects.requireNonNull(type, "type must not be null");
@@ -324,6 +326,7 @@ public class SslFactory implements Reconfigurable {
             this.path = path;
             this.password = password;
             this.keyPassword = keyPassword;
+            fileLastModifiedMs = lastModifiedMs(path);
         }
 
         /**
@@ -338,10 +341,6 @@ public class SslFactory implements Reconfigurable {
                 // If a password is not set access to the truststore is still available, but integrity checking is disabled.
                 char[] passwordChars = password != null ? password.value().toCharArray() : null;
                 ks.load(in, passwordChars);
-                fileLastModifiedMs = lastModifiedMs(path);
-
-                log.debug("Loaded key store with path {} modification time {}", path,
-                        fileLastModifiedMs == null ? null : new Date(fileLastModifiedMs));
                 return ks;
             } catch (GeneralSecurityException | IOException e) {
                 throw new KafkaException("Failed to load SSL keystore " + path + " of type " + type, e);
@@ -361,6 +360,13 @@ public class SslFactory implements Reconfigurable {
             Long modifiedMs = lastModifiedMs(path);
             return modifiedMs != null && !Objects.equals(modifiedMs, this.fileLastModifiedMs);
         }
+
+        @Override
+        public String toString() {
+            return "SecurityStore(" +
+                    "path=" + path +
+                    ", modificationTime=" + (fileLastModifiedMs == null ? null : new Date(fileLastModifiedMs)) + ")";
+        }
     }
 
     /**
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
index bfe34c9..6c4a239 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
@@ -120,6 +120,13 @@ public class SslFactoryTest {
         assertNotSame("SSL context not recreated", sslContext, sslFactory.sslContext());
         sslContext = sslFactory.sslContext();
 
+        // Verify that context is recreated after validation on reconfigure() if config is not changed, but keystore file was modified
+        keyStoreFile.setLastModified(System.currentTimeMillis() + 15000);
+        sslFactory.validateReconfiguration(sslConfig);
+        sslFactory.reconfigure(sslConfig);
+        assertNotSame("SSL context not recreated", sslContext, sslFactory.sslContext());
+        sslContext = sslFactory.sslContext();
+
         // Verify that the context is not recreated if modification time cannot be determined
         keyStoreFile.setLastModified(System.currentTimeMillis() + 20000);
         Files.delete(keyStoreFile.toPath());
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index e8aa081..fb7b539 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -253,6 +253,29 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     alterSslKeystore(adminClient, sslPropertiesCopy, SecureInternal)
     verifyProduceConsume(producer, consumer, 10, topic2)
 
+    // Verify that keystores can be updated using same file name.
+    val reusableProps = sslProperties2.clone().asInstanceOf[Properties]
+    val reusableFile = File.createTempFile("keystore", ".jks")
+    reusableProps.setProperty(SSL_KEYSTORE_LOCATION_CONFIG, reusableFile.getPath)
+    Files.copy(new File(sslProperties1.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)).toPath,
+      reusableFile.toPath, StandardCopyOption.REPLACE_EXISTING)
+    alterSslKeystore(adminClient, reusableProps, SecureExternal)
+    val producer3 = ProducerBuilder().trustStoreProps(sslProperties2).maxRetries(0).build()
+    verifyAuthenticationFailure(producer3)
+    // Now alter using same file name. We can't check if the update has completed by comparing config on
+    // the broker, so we wait for producer operation to succeed to verify that the update has been performed.
+    Files.copy(new File(sslProperties2.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)).toPath,
+      reusableFile.toPath, StandardCopyOption.REPLACE_EXISTING)
+    reusableFile.setLastModified(System.currentTimeMillis() + 1000)
+    alterSslKeystore(adminClient, reusableProps, SecureExternal)
+    TestUtils.waitUntilTrue(() => {
+      try {
+        producer3.partitionsFor(topic).size() == numPartitions
+      } catch {
+        case _: Exception  => false
+      }
+    }, "Keystore not updated")
+
     // Verify that all messages sent with retries=0 while keystores were being altered were consumed
     stopAndVerifyProduceConsume(producerThread, consumerThread)
   }