You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/03/22 20:05:50 UTC

[kafka] branch trunk updated: MINOR: Fix AdminClient.describeConfigs() of listener configs (#4747)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 57b1c28  MINOR: Fix AdminClient.describeConfigs() of listener configs (#4747)
57b1c28 is described below

commit 57b1c28d60e05f2cf7394ef727a86df92b1bb02b
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Thu Mar 22 20:05:45 2018 +0000

    MINOR: Fix AdminClient.describeConfigs() of listener configs (#4747)
    
    Don't return config values from `describeConfigs` if the config type cannot be determined. Obtain config types correctly for listener configs for `describeConfigs` and password encryption.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../src/main/scala/kafka/server/AdminManager.scala | 10 ++++--
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 27 ++++++++------
 .../server/DynamicBrokerReconfigurationTest.scala  | 41 ++++++++++++++++------
 .../kafka/server/DynamicBrokerConfigTest.scala     | 18 ++++++++--
 4 files changed, 71 insertions(+), 25 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 9014fab..b54defc 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -473,8 +473,14 @@ class AdminManager(val config: KafkaConfig,
                                      (name: String, value: Any): DescribeConfigsResponse.ConfigEntry = {
     val allNames = brokerSynonyms(name)
     val configEntryType = configType(name, allNames)
-    val isSensitive = configEntryType == ConfigDef.Type.PASSWORD
-    val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType)
+    // If we can't determine the config entry type, treat it as a sensitive config to be safe
+    val isSensitive = configEntryType == ConfigDef.Type.PASSWORD || configEntryType == null
+    val valueAsString = if (isSensitive)
+      null
+    else value match {
+      case v: String => v
+      case _ => ConfigDef.convertToString(value, configEntryType)
+    }
     val allSynonyms = configSynonyms(name, allNames, isSensitive)
         .filter(perBrokerConfig || _.source == ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG)
     val synonyms = if (!includeSynonyms) List.empty else allSynonyms
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 92fd5d7..766907a 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -85,14 +85,17 @@ object DynamicBrokerConfig {
 
   private val PerBrokerConfigs = DynamicSecurityConfigs  ++
     DynamicListenerConfig.ReconfigurableConfigs
+  private val ListenerMechanismConfigs = Set(KafkaConfig.SaslJaasConfigProp)
 
   val ListenerConfigRegex = """listener\.name\.[^.]*\.(.*)""".r
 
-  private[server] val DynamicPasswordConfigs = {
+  private val DynamicPasswordConfigs = {
     val passwordConfigs = KafkaConfig.configKeys.filter(_._2.`type` == ConfigDef.Type.PASSWORD).keySet
     AllDynamicConfigs.intersect(passwordConfigs)
   }
 
+  def isPasswordConfig(name: String): Boolean = DynamicBrokerConfig.DynamicPasswordConfigs.exists(name.endsWith)
+
   def brokerConfigSynonyms(name: String, matchListenerOverride: Boolean): List[String] = {
     name match {
       case KafkaConfig.LogRollTimeMillisProp | KafkaConfig.LogRollTimeHoursProp =>
@@ -103,7 +106,12 @@ object DynamicBrokerConfig {
         List(KafkaConfig.LogFlushIntervalMsProp, KafkaConfig.LogFlushSchedulerIntervalMsProp)
       case KafkaConfig.LogRetentionTimeMillisProp | KafkaConfig.LogRetentionTimeMinutesProp | KafkaConfig.LogRetentionTimeHoursProp =>
         List(KafkaConfig.LogRetentionTimeMillisProp, KafkaConfig.LogRetentionTimeMinutesProp, KafkaConfig.LogRetentionTimeHoursProp)
-      case ListenerConfigRegex(baseName) if matchListenerOverride => List(name, baseName)
+      case ListenerConfigRegex(baseName) if matchListenerOverride =>
+        // `ListenerMechanismConfigs` are specified as listenerPrefix.mechanism.<configName>
+        // and other listener configs are specified as listenerPrefix.<configName>
+        // Add <configName> as a synonym in both cases.
+        val mechanismConfig = ListenerMechanismConfigs.find(baseName.endsWith)
+        List(name, mechanismConfig.getOrElse(baseName))
       case _ => List(name)
     }
   }
@@ -220,15 +228,14 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
   private[server] def toPersistentProps(configProps: Properties, perBrokerConfig: Boolean): Properties = {
     val props = configProps.clone().asInstanceOf[Properties]
 
-    def encodePassword(configName: String): Unit = {
-      val value = props.getProperty(configName)
+    def encodePassword(configName: String, value: String): Unit = {
       if (value != null) {
         if (!perBrokerConfig)
           throw new ConfigException("Password config can be defined only at broker level")
         props.setProperty(configName, passwordEncoder.encode(new Password(value)))
       }
     }
-    DynamicPasswordConfigs.foreach(encodePassword)
+    configProps.asScala.filterKeys(isPasswordConfig).foreach { case (name, value) => encodePassword(name, value) }
     props
   }
 
@@ -250,8 +257,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
     if (!perBrokerConfig)
       removeInvalidProps(perBrokerConfigs(props), "Per-broker configs defined at default cluster level will be ignored")
 
-    def decodePassword(configName: String): Unit = {
-      val value = props.getProperty(configName)
+    def decodePassword(configName: String, value: String): Unit = {
       if (value != null) {
         try {
           props.setProperty(configName, passwordEncoder.decode(value).value)
@@ -263,7 +269,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
       }
     }
 
-    DynamicPasswordConfigs.foreach(decodePassword)
+    props.asScala.filterKeys(isPasswordConfig).foreach { case (name, value) => decodePassword(name, value) }
     props
   }
 
@@ -273,10 +279,9 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
   // have been removed during broker restart.
   private def maybeReEncodePasswords(persistentProps: Properties, adminZkClient: AdminZkClient): Properties = {
     val props = persistentProps.clone().asInstanceOf[Properties]
-    if (!props.asScala.keySet.exists(DynamicPasswordConfigs.contains)) {
+    if (props.asScala.keySet.exists(isPasswordConfig)) {
       maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderOldSecret).foreach { passwordDecoder =>
-        DynamicPasswordConfigs.foreach { configName =>
-          val value = props.getProperty(configName)
+        persistentProps.asScala.filterKeys(isPasswordConfig).foreach { case (configName, value) =>
           if (value != null) {
             val decoded = try {
               Some(passwordDecoder.decode(value).value)
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 833a98e..e207de8 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -667,10 +667,12 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     // ZK with newly encoded values using password.encoder.secret.
     servers.foreach { server =>
       val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, server.config.brokerId.toString)
+      val propsEncodedWithOldSecret = props.clone().asInstanceOf[Properties]
       val config = server.config
       val secret = config.passwordEncoderSecret.getOrElse(throw new IllegalStateException("Password encoder secret not configured"))
       val oldSecret = config.passwordEncoderOldSecret.getOrElse(throw new IllegalStateException("Password encoder old secret not configured"))
-      val passwordConfigs = props.asScala.filterKeys(DynamicBrokerConfig.DynamicPasswordConfigs.contains)
+      val passwordConfigs = props.asScala.filterKeys(DynamicBrokerConfig.isPasswordConfig)
+      assertTrue("Password configs not found", passwordConfigs.nonEmpty)
       val passwordDecoder = new PasswordEncoder(secret,
         config.passwordEncoderKeyFactoryAlgorithm,
         config.passwordEncoderCipherAlgorithm,
@@ -682,18 +684,19 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
         config.passwordEncoderKeyLength,
         config.passwordEncoderIterations)
       passwordConfigs.foreach { case (name, value) =>
-          val decoded = passwordDecoder.decode(value).value
-          props.put(name, passwordEncoder.encode(new Password(decoded)))
+        val decoded = passwordDecoder.decode(value).value
+        propsEncodedWithOldSecret.put(name, passwordEncoder.encode(new Password(decoded)))
       }
       val brokerId = server.config.brokerId
-      adminZkClient.changeBrokerConfig(Seq(brokerId), props)
+      adminZkClient.changeBrokerConfig(Seq(brokerId), propsEncodedWithOldSecret)
       val updatedProps = adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString)
-      passwordConfigs.foreach { case (name, value) => assertNotEquals(value, updatedProps.get(name)) }
+      passwordConfigs.foreach { case (name, value) => assertNotEquals(props.get(value), updatedProps.get(name)) }
 
       server.startup()
       TestUtils.retry(10000) {
         val newProps = adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString)
-        passwordConfigs.foreach { case (name, value) => assertEquals(value, newProps.get(name)) }
+        passwordConfigs.foreach { case (name, value) =>
+          assertEquals(passwordDecoder.decode(value), passwordDecoder.decode(newProps.getProperty(name))) }
       }
     }
 
@@ -725,7 +728,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
       .map { case (name, protocol) => s"${name.value}:${protocol.name}" }
       .mkString(",") + s",$listenerName:${securityProtocol.name}"
 
-    val props =  adminZkClient.fetchEntityConfig(ConfigType.Broker, config.brokerId.toString)
+    val props = fetchBrokerConfigsFromZooKeeper(servers.head)
     props.put(KafkaConfig.ListenersProp, listeners)
     props.put(KafkaConfig.ListenerSecurityProtocolMapProp, listenerMap)
     securityProtocol match {
@@ -739,6 +742,10 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
       case SecurityProtocol.PLAINTEXT => // no additional props
     }
 
+    // Add a config to verify that configs whose types are not known are not returned by describeConfigs()
+    val unknownConfig = "some.config"
+    props.put(unknownConfig, "some.config.value")
+
     alterConfigs(adminClients.head, props, perBrokerConfig = true).all.get
 
     TestUtils.waitUntilTrue(() => servers.forall(server => server.config.listeners.size == existingListenerCount + 1),
@@ -755,6 +762,15 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
       saslMechanisms.foreach(mechanism => verifyListener(securityProtocol, Some(mechanism)))
     else
       verifyListener(securityProtocol, None)
+
+    val brokerConfigs = describeConfig(adminClients.head).entries.asScala
+    props.asScala.foreach { case (name, value) =>
+      val entry = brokerConfigs.find(_.name == name).getOrElse(throw new IllegalArgumentException(s"Config not found $name"))
+      if (DynamicBrokerConfig.isPasswordConfig(name) || name == unknownConfig)
+        assertNull(s"Password or unknown config returned $entry", entry.value)
+      else
+        assertEquals(value, entry.value)
+    }
   }
 
   private def verifyRemoveListener(listenerName: String, securityProtocol: SecurityProtocol,
@@ -778,7 +794,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
       .map { case (listenerName, protocol) => s"${listenerName.value}:${protocol.name}" }
       .mkString(",")
 
-    val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, config.brokerId.toString)
+    val props = fetchBrokerConfigsFromZooKeeper(servers.head)
     val listenerProps = props.asScala.keySet.filter(_.startsWith(new ListenerName(listenerName).configPrefix))
     listenerProps.foreach(props.remove)
     props.put(KafkaConfig.ListenersProp, listeners)
@@ -811,6 +827,11 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     verifyProduceConsume(producer, consumer, numRecords = 10, topic)
   }
 
+  private def fetchBrokerConfigsFromZooKeeper(server: KafkaServer): Properties = {
+    val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, server.config.brokerId.toString)
+    server.config.dynamicConfig.fromPersistentProps(props, perBrokerConfig = true)
+  }
+
   private def bootstrapServers: String = TestUtils.bootstrapServers(servers, new ListenerName(SecureExternal))
 
   private def createProducer(trustStore: File, retries: Int,
@@ -1090,9 +1111,9 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
   private def configureDynamicKeystoreInZooKeeper(kafkaConfig: KafkaConfig, brokers: Seq[Int], sslProperties: Properties): Unit = {
     val keystoreProps = new Properties
     addKeystoreWithListenerPrefix(sslProperties, keystoreProps, SecureExternal)
-    kafkaConfig.dynamicConfig.toPersistentProps(keystoreProps, perBrokerConfig = true)
+    val persistentProps = kafkaConfig.dynamicConfig.toPersistentProps(keystoreProps, perBrokerConfig = true)
     zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
-    adminZkClient.changeBrokerConfig(brokers, keystoreProps)
+    adminZkClient.changeBrokerConfig(brokers, persistentProps)
   }
 
   private def waitForConfig(propName: String, propValue: String, maxWaitMs: Long = 10000): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index bca98d2..5c88bf2 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -142,14 +142,16 @@ class DynamicBrokerConfigTest {
   }
 
   private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean) {
-    val config = KafkaConfig(TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181))
+    val configProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+    configProps.put(KafkaConfig.PasswordEncoderSecretProp, "broker.secret")
+    val config = KafkaConfig(configProps)
     val props = new Properties
     props.put(name, value)
     val oldValue = config.originals.get(name)
 
     def updateConfig() = {
       if (perBrokerConfig)
-        config.dynamicConfig.updateBrokerConfig(0, props)
+        config.dynamicConfig.updateBrokerConfig(0, config.dynamicConfig.toPersistentProps(props, perBrokerConfig))
       else
         config.dynamicConfig.updateDefaultConfig(props)
     }
@@ -267,6 +269,18 @@ class DynamicBrokerConfigTest {
   }
 
   @Test
+  def testSynonyms(): Unit = {
+    assertEquals(List("listener.name.secure.ssl.keystore.type", "ssl.keystore.type"),
+      DynamicBrokerConfig.brokerConfigSynonyms("listener.name.secure.ssl.keystore.type", matchListenerOverride = true))
+    assertEquals(List("listener.name.sasl_ssl.plain.sasl.jaas.config", "sasl.jaas.config"),
+      DynamicBrokerConfig.brokerConfigSynonyms("listener.name.sasl_ssl.plain.sasl.jaas.config", matchListenerOverride = true))
+    assertEquals(List("some.config"),
+      DynamicBrokerConfig.brokerConfigSynonyms("some.config", matchListenerOverride = true))
+    assertEquals(List(KafkaConfig.LogRollTimeMillisProp, KafkaConfig.LogRollTimeHoursProp),
+      DynamicBrokerConfig.brokerConfigSynonyms(KafkaConfig.LogRollTimeMillisProp, matchListenerOverride = true))
+  }
+
+  @Test
   def testDynamicConfigInitializationWithoutConfigsInZK(): Unit = {
     val zkClient = EasyMock.createMock(classOf[kafka.zk.KafkaZkClient])
     EasyMock.expect(zkClient.getEntityConfigs(EasyMock.anyString(), EasyMock.anyString())).andReturn(new java.util.Properties()).anyTimes()

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.