You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2018/10/02 19:59:29 UTC
[kafka] branch trunk updated: KAFKA-7429: Enable key/truststore
update with same filename/password (#5699)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8fb5e63 KAFKA-7429: Enable key/truststore update with same filename/password (#5699)
8fb5e63 is described below
commit 8fb5e63aa88019216e95fdbe0b6874c723b64bb4
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Tue Oct 2 20:57:31 2018 +0100
KAFKA-7429: Enable key/truststore update with same filename/password (#5699)
---
.../kafka/common/security/ssl/SslFactory.java | 31 ++++++++++-
.../kafka/common/security/ssl/SslFactoryTest.java | 46 ++++++++++++++++
.../src/main/scala/kafka/server/AdminManager.scala | 2 +
.../scala/kafka/server/DynamicBrokerConfig.scala | 64 ++++++++++++++++++----
.../server/DynamicBrokerReconfigurationTest.scala | 9 +++
5 files changed, 141 insertions(+), 11 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index b1f7df8..b9b5203 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -24,6 +24,8 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
@@ -47,6 +49,7 @@ import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Date;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
@@ -54,8 +57,9 @@ import java.util.Objects;
import java.util.Set;
import java.util.HashSet;
-
public class SslFactory implements Reconfigurable {
+ private static final Logger log = LoggerFactory.getLogger(SslFactory.class);
+
private final Mode mode;
private final String clientAuthConfigOverride;
private final boolean keystoreVerifiableUsingTruststore;
@@ -183,6 +187,9 @@ public class SslFactory implements Reconfigurable {
!Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), keystore.password) ||
!Objects.equals(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG), keystore.keyPassword);
+ if (!keystoreChanged) {
+ keystoreChanged = keystore.modified();
+ }
if (keystoreChanged) {
return createKeystore((String) configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG),
(String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG),
@@ -197,6 +204,9 @@ public class SslFactory implements Reconfigurable {
!Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG), truststore.path) ||
!Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG), truststore.password);
+ if (!truststoreChanged) {
+ truststoreChanged = truststore.modified();
+ }
if (truststoreChanged) {
return createTruststore((String) configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG),
(String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG),
@@ -306,6 +316,7 @@ public class SslFactory implements Reconfigurable {
private final String path;
private final Password password;
private final Password keyPassword;
+ private Long fileLastModifiedMs;
SecurityStore(String type, String path, Password password, Password keyPassword) {
Objects.requireNonNull(type, "type must not be null");
@@ -327,11 +338,29 @@ public class SslFactory implements Reconfigurable {
// If a password is not set access to the truststore is still available, but integrity checking is disabled.
char[] passwordChars = password != null ? password.value().toCharArray() : null;
ks.load(in, passwordChars);
+ fileLastModifiedMs = lastModifiedMs(path);
+
+ log.debug("Loaded key store with path {} modification time {}", path,
+ fileLastModifiedMs == null ? null : new Date(fileLastModifiedMs));
return ks;
} catch (GeneralSecurityException | IOException e) {
throw new KafkaException("Failed to load SSL keystore " + path + " of type " + type, e);
}
}
+
+ private Long lastModifiedMs(String path) {
+ try {
+ return Files.getLastModifiedTime(Paths.get(path)).toMillis();
+ } catch (IOException e) {
+ log.error("Modification time of key store could not be obtained: " + path, e);
+ return null;
+ }
+ }
+
+ boolean modified() {
+ Long modifiedMs = lastModifiedMs(path);
+ return modifiedMs != null && !Objects.equals(modifiedMs, this.fileLastModifiedMs);
+ }
}
/**
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
index a134104..97021e3 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.security.ssl;
import java.io.File;
+import java.nio.file.Files;
import java.security.KeyStore;
import java.util.Map;
@@ -32,8 +33,11 @@ import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -82,6 +86,48 @@ public class SslFactoryTest {
}
@Test
+ public void testReconfiguration() throws Exception {
+ File trustStoreFile = File.createTempFile("truststore", ".jks");
+ Map<String, Object> sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server");
+ SslFactory sslFactory = new SslFactory(Mode.SERVER);
+ sslFactory.configure(sslConfig);
+ SSLContext sslContext = sslFactory.sslContext();
+ assertNotNull("SSL context not created", sslContext);
+ assertSame("SSL context recreated unnecessarily", sslContext, sslFactory.sslContext());
+ assertFalse(sslContext.createSSLEngine("localhost", 0).getUseClientMode());
+
+ // Verify that context is not recreated on reconfigure() if config and file are not changed
+ sslFactory.reconfigure(sslConfig);
+ assertSame("SSL context recreated unnecessarily", sslContext, sslFactory.sslContext());
+
+ // Verify that context is recreated on reconfigure() if config is changed
+ trustStoreFile = File.createTempFile("truststore", ".jks");
+ sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server");
+ sslFactory.reconfigure(sslConfig);
+ assertNotSame("SSL context not recreated", sslContext, sslFactory.sslContext());
+ sslContext = sslFactory.sslContext();
+
+ // Verify that context is recreated on reconfigure() if config is not changed, but truststore file was modified
+ trustStoreFile.setLastModified(System.currentTimeMillis() + 10000);
+ sslFactory.reconfigure(sslConfig);
+ assertNotSame("SSL context not recreated", sslContext, sslFactory.sslContext());
+ sslContext = sslFactory.sslContext();
+
+ // Verify that context is recreated on reconfigure() if config is not changed, but keystore file was modified
+ File keyStoreFile = new File((String) sslConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG));
+ keyStoreFile.setLastModified(System.currentTimeMillis() + 10000);
+ sslFactory.reconfigure(sslConfig);
+ assertNotSame("SSL context not recreated", sslContext, sslFactory.sslContext());
+ sslContext = sslFactory.sslContext();
+
+ // Verify that the context is not recreated if modification time cannot be determined
+ keyStoreFile.setLastModified(System.currentTimeMillis() + 20000);
+ Files.delete(keyStoreFile.toPath());
+ sslFactory.reconfigure(sslConfig);
+ assertSame("SSL context recreated unnecessarily", sslContext, sslFactory.sslContext());
+ }
+
+ @Test
public void testKeyStoreTrustStoreValidation() throws Exception {
File trustStoreFile = File.createTempFile("truststore", ".jks");
Map<String, Object> serverSslConfig = TestSslUtils.createSslConfig(false, true,
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index f765f51..2b48170 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -389,6 +389,8 @@ class AdminManager(val config: KafkaConfig,
this.config.dynamicConfig.validate(configProps, perBrokerConfig)
validateConfigPolicy(ConfigResource.Type.BROKER)
if (!validateOnly) {
+ if (perBrokerConfig)
+ this.config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(configProps)
adminZkClient.changeBrokerConfig(brokerId,
this.config.dynamicConfig.toPersistentProps(configProps, perBrokerConfig))
}
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index bcaaa02..2c0f6c1 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -87,6 +87,8 @@ object DynamicBrokerConfig {
DynamicListenerConfig.ReconfigurableConfigs
private val ListenerMechanismConfigs = Set(KafkaConfig.SaslJaasConfigProp)
+ private val ReloadableFileConfigs = Set(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)
+
val ListenerConfigRegex = """listener\.name\.[^.]*\.(.*)""".r
private val DynamicPasswordConfigs = {
@@ -267,6 +269,27 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
}
}
+ /**
+ * All config updates through ZooKeeper are triggered through actual changes in values stored in ZooKeeper.
+ * For some configs like SSL keystores and truststores, we also want to reload the store if it was modified
+ * in-place, even though the actual value of the file path and password haven't changed. This scenario alone
+ * is handled here when a config update request using admin client is processed by AdminManager. If any of
+ * the SSL configs have changed, then the update will not be done here, but will be handled later when ZK
+ * changes are processed. At the moment, only listener configs are considered for reloading.
+ */
+ private[server] def reloadUpdatedFilesWithoutConfigChange(newProps: Properties): Unit = CoreUtils.inWriteLock(lock) {
+ reconfigurables
+ .filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
+ .foreach {
+ case reconfigurable: ListenerReconfigurable =>
+ val kafkaProps = validatedKafkaProps(newProps, perBrokerConfig = true)
+ val newConfig = new KafkaConfig(kafkaProps.asJava, false, None)
+ processListenerReconfigurable(reconfigurable, newConfig, Collections.emptyMap(), validateOnly = false, reloadOnly = true)
+ case reconfigurable =>
+ trace(s"Files will not be reloaded without config change for $reconfigurable")
+ }
+ }
+
private def maybeCreatePasswordEncoder(secret: Option[Password]): Option[PasswordEncoder] = {
secret.map { secret =>
new PasswordEncoder(secret,
@@ -355,17 +378,28 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
props
}
- private[server] def validate(props: Properties, perBrokerConfig: Boolean): Unit = CoreUtils.inReadLock(lock) {
- validateConfigs(props, perBrokerConfig)
+ /**
+ * Validate the provided configs `propsOverride` and return the full Kafka configs with
+ * the configured defaults and these overrides.
+ *
+ * Note: The caller must acquire the read or write lock before invoking this method.
+ */
+ private def validatedKafkaProps(propsOverride: Properties, perBrokerConfig: Boolean): Map[String, String] = {
+ validateConfigs(propsOverride, perBrokerConfig)
val newProps = mutable.Map[String, String]()
newProps ++= staticBrokerConfigs
if (perBrokerConfig) {
overrideProps(newProps, dynamicDefaultConfigs)
- overrideProps(newProps, props.asScala)
+ overrideProps(newProps, propsOverride.asScala)
} else {
- overrideProps(newProps, props.asScala)
+ overrideProps(newProps, propsOverride.asScala)
overrideProps(newProps, dynamicBrokerConfigs)
}
+ newProps
+ }
+
+ private[server] def validate(props: Properties, perBrokerConfig: Boolean): Unit = CoreUtils.inReadLock(lock) {
+ val newProps = validatedKafkaProps(props, perBrokerConfig)
processReconfiguration(newProps, validateOnly = true)
}
@@ -445,12 +479,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
newConfig.valuesFromThisConfig.keySet.asScala.foreach(customConfigs.remove)
reconfigurables.foreach {
case listenerReconfigurable: ListenerReconfigurable =>
- val listenerName = listenerReconfigurable.listenerName
- val oldValues = currentConfig.valuesWithPrefixOverride(listenerName.configPrefix)
- val newValues = newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
- val updatedKeys = updatedConfigs(newValues, oldValues).keySet
- if (needsReconfiguration(listenerReconfigurable.reconfigurableConfigs, updatedKeys))
- processReconfigurable(listenerReconfigurable, updatedKeys, newValues, customConfigs, validateOnly)
+ processListenerReconfigurable(listenerReconfigurable, newConfig, customConfigs, validateOnly, reloadOnly = false)
case reconfigurable =>
if (needsReconfiguration(reconfigurable.reconfigurableConfigs, updatedMap.keySet))
processReconfigurable(reconfigurable, updatedMap.keySet, newConfig.valuesFromThisConfig, customConfigs, validateOnly)
@@ -481,6 +510,21 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
reconfigurableConfigs.asScala.intersect(updatedKeys).nonEmpty
}
+ private def processListenerReconfigurable(listenerReconfigurable: ListenerReconfigurable,
+ newConfig: KafkaConfig,
+ customConfigs: util.Map[String, Object],
+ validateOnly: Boolean,
+ reloadOnly: Boolean): Unit = {
+ val listenerName = listenerReconfigurable.listenerName
+ val oldValues = currentConfig.valuesWithPrefixOverride(listenerName.configPrefix)
+ val newValues = newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
+ val updatedKeys = updatedConfigs(newValues, oldValues).keySet
+ val configsChanged = needsReconfiguration(listenerReconfigurable.reconfigurableConfigs, updatedKeys)
+ // if `reloadOnly`, reconfigure if configs haven't changed. Otherwise reconfigure if configs have changed
+ if (reloadOnly != configsChanged)
+ processReconfigurable(listenerReconfigurable, updatedKeys, newValues, customConfigs, validateOnly)
+ }
+
private def processReconfigurable(reconfigurable: Reconfigurable,
updatedConfigNames: Set[String],
allNewConfigs: util.Map[String, _],
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index f772e58..5d15cc4 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -300,6 +300,15 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
(s"$prefix$SSL_TRUSTSTORE_LOCATION_CONFIG", sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)))
verifyAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties2).build())
verifySslProduceConsume(sslProperties1, "alter-truststore-3")
+
+ // Update same truststore file to contain both certificates without changing any configs.
+ // Clients should connect successfully with either keystore after admin client AlterConfigsRequest completes.
+ Files.copy(Paths.get(combinedStoreProps.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)),
+ Paths.get(sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)),
+ StandardCopyOption.REPLACE_EXISTING)
+ TestUtils.alterConfigs(servers, adminClients.head, oldTruststoreProps, perBrokerConfig = true)
+ verifySslProduceConsume(sslProperties1, "alter-truststore-4")
+ verifySslProduceConsume(sslProperties2, "alter-truststore-5")
}
@Test