You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2019/04/05 18:42:39 UTC
[kafka] branch trunk updated: KAFKA-8190;
Don't update keystore modification time during validation (#6539)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 844120c KAFKA-8190; Don't update keystore modification time during validation (#6539)
844120c is described below
commit 844120c601e87693e540fa98a9cdb4d0272f5601
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Apr 5 19:42:28 2019 +0100
KAFKA-8190; Don't update keystore modification time during validation (#6539)
Ensure that modification time is checked against the file used to create the SSLContext that is in-use so that SSLContext is updated whenever file is modified and a config update request is received.
Reviewers: Manikumar Reddy <ma...@gmail.com>
---
.../kafka/common/security/ssl/SslFactory.java | 16 ++++++++++-----
.../kafka/common/security/ssl/SslFactoryTest.java | 7 +++++++
.../server/DynamicBrokerReconfigurationTest.scala | 23 ++++++++++++++++++++++
3 files changed, 41 insertions(+), 5 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index b9b5203..0c5094d 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -139,6 +139,7 @@ public class SslFactory implements Reconfigurable {
(Password) configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
try {
this.sslContext = createSSLContext(keystore, truststore);
+ log.debug("Created SSL context with keystore {} truststore {}", keystore, truststore);
} catch (Exception e) {
throw new KafkaException(e);
}
@@ -173,6 +174,7 @@ public class SslFactory implements Reconfigurable {
SecurityStore keystore = newKeystore != null ? newKeystore : this.keystore;
SecurityStore truststore = newTruststore != null ? newTruststore : this.truststore;
this.sslContext = createSSLContext(keystore, truststore);
+ log.info("Created new SSL context with keystore {} truststore {}", keystore, truststore);
this.keystore = keystore;
this.truststore = truststore;
} catch (Exception e) {
@@ -316,7 +318,7 @@ public class SslFactory implements Reconfigurable {
private final String path;
private final Password password;
private final Password keyPassword;
- private Long fileLastModifiedMs;
+ private final Long fileLastModifiedMs;
SecurityStore(String type, String path, Password password, Password keyPassword) {
Objects.requireNonNull(type, "type must not be null");
@@ -324,6 +326,7 @@ public class SslFactory implements Reconfigurable {
this.path = path;
this.password = password;
this.keyPassword = keyPassword;
+ fileLastModifiedMs = lastModifiedMs(path);
}
/**
@@ -338,10 +341,6 @@ public class SslFactory implements Reconfigurable {
// If a password is not set access to the truststore is still available, but integrity checking is disabled.
char[] passwordChars = password != null ? password.value().toCharArray() : null;
ks.load(in, passwordChars);
- fileLastModifiedMs = lastModifiedMs(path);
-
- log.debug("Loaded key store with path {} modification time {}", path,
- fileLastModifiedMs == null ? null : new Date(fileLastModifiedMs));
return ks;
} catch (GeneralSecurityException | IOException e) {
throw new KafkaException("Failed to load SSL keystore " + path + " of type " + type, e);
@@ -361,6 +360,13 @@ public class SslFactory implements Reconfigurable {
Long modifiedMs = lastModifiedMs(path);
return modifiedMs != null && !Objects.equals(modifiedMs, this.fileLastModifiedMs);
}
+
+ @Override
+ public String toString() {
+ return "SecurityStore(" +
+ "path=" + path +
+ ", modificationTime=" + (fileLastModifiedMs == null ? null : new Date(fileLastModifiedMs)) + ")";
+ }
}
/**
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
index bfe34c9..6c4a239 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
@@ -120,6 +120,13 @@ public class SslFactoryTest {
assertNotSame("SSL context not recreated", sslContext, sslFactory.sslContext());
sslContext = sslFactory.sslContext();
+ // Verify that context is recreated after validation on reconfigure() if config is not changed, but keystore file was modified
+ keyStoreFile.setLastModified(System.currentTimeMillis() + 15000);
+ sslFactory.validateReconfiguration(sslConfig);
+ sslFactory.reconfigure(sslConfig);
+ assertNotSame("SSL context not recreated", sslContext, sslFactory.sslContext());
+ sslContext = sslFactory.sslContext();
+
// Verify that the context is not recreated if modification time cannot be determined
keyStoreFile.setLastModified(System.currentTimeMillis() + 20000);
Files.delete(keyStoreFile.toPath());
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index e8aa081..fb7b539 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -253,6 +253,29 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
alterSslKeystore(adminClient, sslPropertiesCopy, SecureInternal)
verifyProduceConsume(producer, consumer, 10, topic2)
+ // Verify that keystores can be updated using same file name.
+ val reusableProps = sslProperties2.clone().asInstanceOf[Properties]
+ val reusableFile = File.createTempFile("keystore", ".jks")
+ reusableProps.setProperty(SSL_KEYSTORE_LOCATION_CONFIG, reusableFile.getPath)
+ Files.copy(new File(sslProperties1.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)).toPath,
+ reusableFile.toPath, StandardCopyOption.REPLACE_EXISTING)
+ alterSslKeystore(adminClient, reusableProps, SecureExternal)
+ val producer3 = ProducerBuilder().trustStoreProps(sslProperties2).maxRetries(0).build()
+ verifyAuthenticationFailure(producer3)
+ // Now alter using same file name. We can't check if the update has completed by comparing config on
+ // the broker, so we wait for producer operation to succeed to verify that the update has been performed.
+ Files.copy(new File(sslProperties2.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)).toPath,
+ reusableFile.toPath, StandardCopyOption.REPLACE_EXISTING)
+ reusableFile.setLastModified(System.currentTimeMillis() + 1000)
+ alterSslKeystore(adminClient, reusableProps, SecureExternal)
+ TestUtils.waitUntilTrue(() => {
+ try {
+ producer3.partitionsFor(topic).size() == numPartitions
+ } catch {
+ case _: Exception => false
+ }
+ }, "Keystore not updated")
+
// Verify that all messages sent with retries=0 while keystores were being altered were consumed
stopAndVerifyProduceConsume(producerThread, consumerThread)
}