You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2018/10/02 19:59:29 UTC

[kafka] branch trunk updated: KAFKA-7429: Enable key/truststore update with same filename/password (#5699)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8fb5e63  KAFKA-7429: Enable key/truststore update with same filename/password (#5699)
8fb5e63 is described below

commit 8fb5e63aa88019216e95fdbe0b6874c723b64bb4
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Tue Oct 2 20:57:31 2018 +0100

    KAFKA-7429: Enable key/truststore update with same filename/password (#5699)
---
 .../kafka/common/security/ssl/SslFactory.java      | 31 ++++++++++-
 .../kafka/common/security/ssl/SslFactoryTest.java  | 46 ++++++++++++++++
 .../src/main/scala/kafka/server/AdminManager.scala |  2 +
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 64 ++++++++++++++++++----
 .../server/DynamicBrokerReconfigurationTest.scala  |  9 +++
 5 files changed, 141 insertions(+), 11 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index b1f7df8..b9b5203 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -24,6 +24,8 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.network.Mode;
 import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.net.ssl.KeyManager;
 import javax.net.ssl.KeyManagerFactory;
@@ -47,6 +49,7 @@ import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Date;
 import java.util.Enumeration;
 import java.util.List;
 import java.util.Map;
@@ -54,8 +57,9 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.HashSet;
 
-
 public class SslFactory implements Reconfigurable {
+    private static final Logger log = LoggerFactory.getLogger(SslFactory.class);
+
     private final Mode mode;
     private final String clientAuthConfigOverride;
     private final boolean keystoreVerifiableUsingTruststore;
@@ -183,6 +187,9 @@ public class SslFactory implements Reconfigurable {
                 !Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), keystore.password) ||
                 !Objects.equals(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG), keystore.keyPassword);
 
+        if (!keystoreChanged) {
+            keystoreChanged = keystore.modified();
+        }
         if (keystoreChanged) {
             return createKeystore((String) configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG),
                     (String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG),
@@ -197,6 +204,9 @@ public class SslFactory implements Reconfigurable {
                 !Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG), truststore.path) ||
                 !Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG), truststore.password);
 
+        if (!truststoreChanged) {
+            truststoreChanged = truststore.modified();
+        }
         if (truststoreChanged) {
             return createTruststore((String) configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG),
                     (String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG),
@@ -306,6 +316,7 @@ public class SslFactory implements Reconfigurable {
         private final String path;
         private final Password password;
         private final Password keyPassword;
+        private Long fileLastModifiedMs;
 
         SecurityStore(String type, String path, Password password, Password keyPassword) {
             Objects.requireNonNull(type, "type must not be null");
@@ -327,11 +338,29 @@ public class SslFactory implements Reconfigurable {
                 // If a password is not set access to the truststore is still available, but integrity checking is disabled.
                 char[] passwordChars = password != null ? password.value().toCharArray() : null;
                 ks.load(in, passwordChars);
+                fileLastModifiedMs = lastModifiedMs(path);
+
+                log.debug("Loaded key store with path {} modification time {}", path,
+                        fileLastModifiedMs == null ? null : new Date(fileLastModifiedMs));
                 return ks;
             } catch (GeneralSecurityException | IOException e) {
                 throw new KafkaException("Failed to load SSL keystore " + path + " of type " + type, e);
             }
         }
+
+        private Long lastModifiedMs(String path) {
+            try {
+                return Files.getLastModifiedTime(Paths.get(path)).toMillis();
+            } catch (IOException e) {
+                log.error("Modification time of key store could not be obtained: " + path, e);
+                return null;
+            }
+        }
+
+        boolean modified() {
+            Long modifiedMs = lastModifiedMs(path);
+            return modifiedMs != null && !Objects.equals(modifiedMs, this.fileLastModifiedMs);
+        }
     }
 
     /**
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
index a134104..97021e3 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.security.ssl;
 
 import java.io.File;
+import java.nio.file.Files;
 import java.security.KeyStore;
 import java.util.Map;
 
@@ -32,8 +33,11 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -82,6 +86,48 @@ public class SslFactoryTest {
     }
 
     @Test
+    public void testReconfiguration() throws Exception {
+        File trustStoreFile = File.createTempFile("truststore", ".jks");
+        Map<String, Object> sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server");
+        SslFactory sslFactory = new SslFactory(Mode.SERVER);
+        sslFactory.configure(sslConfig);
+        SSLContext sslContext = sslFactory.sslContext();
+        assertNotNull("SSL context not created", sslContext);
+        assertSame("SSL context recreated unnecessarily", sslContext, sslFactory.sslContext());
+        assertFalse(sslContext.createSSLEngine("localhost", 0).getUseClientMode());
+
+        // Verify that context is not recreated on reconfigure() if config and file are not changed
+        sslFactory.reconfigure(sslConfig);
+        assertSame("SSL context recreated unnecessarily", sslContext, sslFactory.sslContext());
+
+        // Verify that context is recreated on reconfigure() if config is changed
+        trustStoreFile = File.createTempFile("truststore", ".jks");
+        sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server");
+        sslFactory.reconfigure(sslConfig);
+        assertNotSame("SSL context not recreated", sslContext, sslFactory.sslContext());
+        sslContext = sslFactory.sslContext();
+
+        // Verify that context is recreated on reconfigure() if config is not changed, but truststore file was modified
+        trustStoreFile.setLastModified(System.currentTimeMillis() + 10000);
+        sslFactory.reconfigure(sslConfig);
+        assertNotSame("SSL context not recreated", sslContext, sslFactory.sslContext());
+        sslContext = sslFactory.sslContext();
+
+        // Verify that context is recreated on reconfigure() if config is not changed, but keystore file was modified
+        File keyStoreFile = new File((String) sslConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG));
+        keyStoreFile.setLastModified(System.currentTimeMillis() + 10000);
+        sslFactory.reconfigure(sslConfig);
+        assertNotSame("SSL context not recreated", sslContext, sslFactory.sslContext());
+        sslContext = sslFactory.sslContext();
+
+        // Verify that the context is not recreated if modification time cannot be determined
+        keyStoreFile.setLastModified(System.currentTimeMillis() + 20000);
+        Files.delete(keyStoreFile.toPath());
+        sslFactory.reconfigure(sslConfig);
+        assertSame("SSL context recreated unnecessarily", sslContext, sslFactory.sslContext());
+    }
+
+    @Test
     public void testKeyStoreTrustStoreValidation() throws Exception {
         File trustStoreFile = File.createTempFile("truststore", ".jks");
         Map<String, Object> serverSslConfig = TestSslUtils.createSslConfig(false, true,
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index f765f51..2b48170 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -389,6 +389,8 @@ class AdminManager(val config: KafkaConfig,
             this.config.dynamicConfig.validate(configProps, perBrokerConfig)
             validateConfigPolicy(ConfigResource.Type.BROKER)
             if (!validateOnly) {
+              if (perBrokerConfig)
+                this.config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(configProps)
               adminZkClient.changeBrokerConfig(brokerId,
                 this.config.dynamicConfig.toPersistentProps(configProps, perBrokerConfig))
             }
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index bcaaa02..2c0f6c1 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -87,6 +87,8 @@ object DynamicBrokerConfig {
     DynamicListenerConfig.ReconfigurableConfigs
   private val ListenerMechanismConfigs = Set(KafkaConfig.SaslJaasConfigProp)
 
+  private val ReloadableFileConfigs = Set(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)
+
   val ListenerConfigRegex = """listener\.name\.[^.]*\.(.*)""".r
 
   private val DynamicPasswordConfigs = {
@@ -267,6 +269,27 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
     }
   }
 
+  /**
+   * All config updates through ZooKeeper are triggered through actual changes in values stored in ZooKeeper.
+   * For some configs like SSL keystores and truststores, we also want to reload the store if it was modified
+   * in-place, even though the actual value of the file path and password haven't changed. This scenario alone
+   * is handled here when a config update request using admin client is processed by AdminManager. If any of
+   * the SSL configs have changed, then the update will not be done here, but will be handled later when ZK
+   * changes are processed. At the moment, only listener configs are considered for reloading.
+   */
+  private[server] def reloadUpdatedFilesWithoutConfigChange(newProps: Properties): Unit = CoreUtils.inWriteLock(lock) {
+    reconfigurables
+      .filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
+      .foreach {
+        case reconfigurable: ListenerReconfigurable =>
+          val kafkaProps = validatedKafkaProps(newProps, perBrokerConfig = true)
+          val newConfig = new KafkaConfig(kafkaProps.asJava, false, None)
+          processListenerReconfigurable(reconfigurable, newConfig, Collections.emptyMap(), validateOnly = false, reloadOnly = true)
+        case reconfigurable =>
+          trace(s"Files will not be reloaded without config change for $reconfigurable")
+      }
+  }
+
   private def maybeCreatePasswordEncoder(secret: Option[Password]): Option[PasswordEncoder] = {
    secret.map { secret =>
       new PasswordEncoder(secret,
@@ -355,17 +378,28 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
     props
   }
 
-  private[server] def validate(props: Properties, perBrokerConfig: Boolean): Unit = CoreUtils.inReadLock(lock) {
-    validateConfigs(props, perBrokerConfig)
+  /**
+   * Validate the provided configs `propsOverride` and return the full Kafka configs with
+   * the configured defaults and these overrides.
+   *
+   * Note: The caller must acquire the read or write lock before invoking this method.
+   */
+  private def validatedKafkaProps(propsOverride: Properties, perBrokerConfig: Boolean): Map[String, String] = {
+    validateConfigs(propsOverride, perBrokerConfig)
     val newProps = mutable.Map[String, String]()
     newProps ++= staticBrokerConfigs
     if (perBrokerConfig) {
       overrideProps(newProps, dynamicDefaultConfigs)
-      overrideProps(newProps, props.asScala)
+      overrideProps(newProps, propsOverride.asScala)
     } else {
-      overrideProps(newProps, props.asScala)
+      overrideProps(newProps, propsOverride.asScala)
       overrideProps(newProps, dynamicBrokerConfigs)
     }
+    newProps
+  }
+
+  private[server] def validate(props: Properties, perBrokerConfig: Boolean): Unit = CoreUtils.inReadLock(lock) {
+    val newProps = validatedKafkaProps(props, perBrokerConfig)
     processReconfiguration(newProps, validateOnly = true)
   }
 
@@ -445,12 +479,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
         newConfig.valuesFromThisConfig.keySet.asScala.foreach(customConfigs.remove)
         reconfigurables.foreach {
           case listenerReconfigurable: ListenerReconfigurable =>
-            val listenerName = listenerReconfigurable.listenerName
-            val oldValues = currentConfig.valuesWithPrefixOverride(listenerName.configPrefix)
-            val newValues = newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
-            val updatedKeys = updatedConfigs(newValues, oldValues).keySet
-            if (needsReconfiguration(listenerReconfigurable.reconfigurableConfigs, updatedKeys))
-              processReconfigurable(listenerReconfigurable, updatedKeys, newValues, customConfigs, validateOnly)
+            processListenerReconfigurable(listenerReconfigurable, newConfig, customConfigs, validateOnly, reloadOnly = false)
           case reconfigurable =>
             if (needsReconfiguration(reconfigurable.reconfigurableConfigs, updatedMap.keySet))
               processReconfigurable(reconfigurable, updatedMap.keySet, newConfig.valuesFromThisConfig, customConfigs, validateOnly)
@@ -481,6 +510,21 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
     reconfigurableConfigs.asScala.intersect(updatedKeys).nonEmpty
   }
 
+  private def processListenerReconfigurable(listenerReconfigurable: ListenerReconfigurable,
+                                            newConfig: KafkaConfig,
+                                            customConfigs: util.Map[String, Object],
+                                            validateOnly: Boolean,
+                                            reloadOnly:  Boolean): Unit = {
+    val listenerName = listenerReconfigurable.listenerName
+    val oldValues = currentConfig.valuesWithPrefixOverride(listenerName.configPrefix)
+    val newValues = newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
+    val updatedKeys = updatedConfigs(newValues, oldValues).keySet
+    val configsChanged = needsReconfiguration(listenerReconfigurable.reconfigurableConfigs, updatedKeys)
+    // if `reloadOnly`, reconfigure if configs haven't changed. Otherwise reconfigure if configs have changed
+    if (reloadOnly != configsChanged)
+      processReconfigurable(listenerReconfigurable, updatedKeys, newValues, customConfigs, validateOnly)
+  }
+
   private def processReconfigurable(reconfigurable: Reconfigurable,
                                     updatedConfigNames: Set[String],
                                     allNewConfigs: util.Map[String, _],
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index f772e58..5d15cc4 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -300,6 +300,15 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
       (s"$prefix$SSL_TRUSTSTORE_LOCATION_CONFIG", sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)))
     verifyAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties2).build())
     verifySslProduceConsume(sslProperties1, "alter-truststore-3")
+
+    // Update same truststore file to contain both certificates without changing any configs.
+    // Clients should connect successfully with either keystore after admin client AlterConfigsRequest completes.
+    Files.copy(Paths.get(combinedStoreProps.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)),
+      Paths.get(sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)),
+      StandardCopyOption.REPLACE_EXISTING)
+    TestUtils.alterConfigs(servers, adminClients.head, oldTruststoreProps, perBrokerConfig = true)
+    verifySslProduceConsume(sslProperties1, "alter-truststore-4")
+    verifySslProduceConsume(sslProperties2, "alter-truststore-5")
   }
 
   @Test