You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/03/22 20:13:33 UTC
[kafka] branch 1.1 updated: MINOR: Fix
AdminClient.describeConfigs() of listener configs (#4747)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new 919e2f6 MINOR: Fix AdminClient.describeConfigs() of listener configs (#4747)
919e2f6 is described below
commit 919e2f6d17f0ee9e14f297748444b9a00ac58690
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Thu Mar 22 20:05:45 2018 +0000
MINOR: Fix AdminClient.describeConfigs() of listener configs (#4747)
Don't return config values from `describeConfigs` if the config type cannot be determined. Obtain config types correctly for listener configs for `describeConfigs` and password encryption.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../src/main/scala/kafka/server/AdminManager.scala | 10 ++++--
.../scala/kafka/server/DynamicBrokerConfig.scala | 27 ++++++++------
.../server/DynamicBrokerReconfigurationTest.scala | 41 ++++++++++++++++------
.../kafka/server/DynamicBrokerConfigTest.scala | 18 ++++++++--
4 files changed, 71 insertions(+), 25 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 8264f7c..d27b0c5 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -473,8 +473,14 @@ class AdminManager(val config: KafkaConfig,
(name: String, value: Any): DescribeConfigsResponse.ConfigEntry = {
val allNames = brokerSynonyms(name)
val configEntryType = configType(name, allNames)
- val isSensitive = configEntryType == ConfigDef.Type.PASSWORD
- val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType)
+ // If we can't determine the config entry type, treat it as a sensitive config to be safe
+ val isSensitive = configEntryType == ConfigDef.Type.PASSWORD || configEntryType == null
+ val valueAsString = if (isSensitive)
+ null
+ else value match {
+ case v: String => v
+ case _ => ConfigDef.convertToString(value, configEntryType)
+ }
val allSynonyms = configSynonyms(name, allNames, isSensitive)
.filter(perBrokerConfig || _.source == ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG)
val synonyms = if (!includeSynonyms) List.empty else allSynonyms
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 92fd5d7..766907a 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -85,14 +85,17 @@ object DynamicBrokerConfig {
private val PerBrokerConfigs = DynamicSecurityConfigs ++
DynamicListenerConfig.ReconfigurableConfigs
+ private val ListenerMechanismConfigs = Set(KafkaConfig.SaslJaasConfigProp)
val ListenerConfigRegex = """listener\.name\.[^.]*\.(.*)""".r
- private[server] val DynamicPasswordConfigs = {
+ private val DynamicPasswordConfigs = {
val passwordConfigs = KafkaConfig.configKeys.filter(_._2.`type` == ConfigDef.Type.PASSWORD).keySet
AllDynamicConfigs.intersect(passwordConfigs)
}
+ def isPasswordConfig(name: String): Boolean = DynamicBrokerConfig.DynamicPasswordConfigs.exists(name.endsWith)
+
def brokerConfigSynonyms(name: String, matchListenerOverride: Boolean): List[String] = {
name match {
case KafkaConfig.LogRollTimeMillisProp | KafkaConfig.LogRollTimeHoursProp =>
@@ -103,7 +106,12 @@ object DynamicBrokerConfig {
List(KafkaConfig.LogFlushIntervalMsProp, KafkaConfig.LogFlushSchedulerIntervalMsProp)
case KafkaConfig.LogRetentionTimeMillisProp | KafkaConfig.LogRetentionTimeMinutesProp | KafkaConfig.LogRetentionTimeHoursProp =>
List(KafkaConfig.LogRetentionTimeMillisProp, KafkaConfig.LogRetentionTimeMinutesProp, KafkaConfig.LogRetentionTimeHoursProp)
- case ListenerConfigRegex(baseName) if matchListenerOverride => List(name, baseName)
+ case ListenerConfigRegex(baseName) if matchListenerOverride =>
+ // `ListenerMechanismConfigs` are specified as listenerPrefix.mechanism.<configName>
+ // and other listener configs are specified as listenerPrefix.<configName>
+ // Add <configName> as a synonym in both cases.
+ val mechanismConfig = ListenerMechanismConfigs.find(baseName.endsWith)
+ List(name, mechanismConfig.getOrElse(baseName))
case _ => List(name)
}
}
@@ -220,15 +228,14 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
private[server] def toPersistentProps(configProps: Properties, perBrokerConfig: Boolean): Properties = {
val props = configProps.clone().asInstanceOf[Properties]
- def encodePassword(configName: String): Unit = {
- val value = props.getProperty(configName)
+ def encodePassword(configName: String, value: String): Unit = {
if (value != null) {
if (!perBrokerConfig)
throw new ConfigException("Password config can be defined only at broker level")
props.setProperty(configName, passwordEncoder.encode(new Password(value)))
}
}
- DynamicPasswordConfigs.foreach(encodePassword)
+ configProps.asScala.filterKeys(isPasswordConfig).foreach { case (name, value) => encodePassword(name, value) }
props
}
@@ -250,8 +257,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
if (!perBrokerConfig)
removeInvalidProps(perBrokerConfigs(props), "Per-broker configs defined at default cluster level will be ignored")
- def decodePassword(configName: String): Unit = {
- val value = props.getProperty(configName)
+ def decodePassword(configName: String, value: String): Unit = {
if (value != null) {
try {
props.setProperty(configName, passwordEncoder.decode(value).value)
@@ -263,7 +269,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
}
}
- DynamicPasswordConfigs.foreach(decodePassword)
+ props.asScala.filterKeys(isPasswordConfig).foreach { case (name, value) => decodePassword(name, value) }
props
}
@@ -273,10 +279,9 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
// have been removed during broker restart.
private def maybeReEncodePasswords(persistentProps: Properties, adminZkClient: AdminZkClient): Properties = {
val props = persistentProps.clone().asInstanceOf[Properties]
- if (!props.asScala.keySet.exists(DynamicPasswordConfigs.contains)) {
+ if (props.asScala.keySet.exists(isPasswordConfig)) {
maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderOldSecret).foreach { passwordDecoder =>
- DynamicPasswordConfigs.foreach { configName =>
- val value = props.getProperty(configName)
+ persistentProps.asScala.filterKeys(isPasswordConfig).foreach { case (configName, value) =>
if (value != null) {
val decoded = try {
Some(passwordDecoder.decode(value).value)
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 833a98e..e207de8 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -667,10 +667,12 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
// ZK with newly encoded values using password.encoder.secret.
servers.foreach { server =>
val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, server.config.brokerId.toString)
+ val propsEncodedWithOldSecret = props.clone().asInstanceOf[Properties]
val config = server.config
val secret = config.passwordEncoderSecret.getOrElse(throw new IllegalStateException("Password encoder secret not configured"))
val oldSecret = config.passwordEncoderOldSecret.getOrElse(throw new IllegalStateException("Password encoder old secret not configured"))
- val passwordConfigs = props.asScala.filterKeys(DynamicBrokerConfig.DynamicPasswordConfigs.contains)
+ val passwordConfigs = props.asScala.filterKeys(DynamicBrokerConfig.isPasswordConfig)
+ assertTrue("Password configs not found", passwordConfigs.nonEmpty)
val passwordDecoder = new PasswordEncoder(secret,
config.passwordEncoderKeyFactoryAlgorithm,
config.passwordEncoderCipherAlgorithm,
@@ -682,18 +684,19 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
config.passwordEncoderKeyLength,
config.passwordEncoderIterations)
passwordConfigs.foreach { case (name, value) =>
- val decoded = passwordDecoder.decode(value).value
- props.put(name, passwordEncoder.encode(new Password(decoded)))
+ val decoded = passwordDecoder.decode(value).value
+ propsEncodedWithOldSecret.put(name, passwordEncoder.encode(new Password(decoded)))
}
val brokerId = server.config.brokerId
- adminZkClient.changeBrokerConfig(Seq(brokerId), props)
+ adminZkClient.changeBrokerConfig(Seq(brokerId), propsEncodedWithOldSecret)
val updatedProps = adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString)
- passwordConfigs.foreach { case (name, value) => assertNotEquals(value, updatedProps.get(name)) }
+ passwordConfigs.foreach { case (name, value) => assertNotEquals(props.get(value), updatedProps.get(name)) }
server.startup()
TestUtils.retry(10000) {
val newProps = adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString)
- passwordConfigs.foreach { case (name, value) => assertEquals(value, newProps.get(name)) }
+ passwordConfigs.foreach { case (name, value) =>
+ assertEquals(passwordDecoder.decode(value), passwordDecoder.decode(newProps.getProperty(name))) }
}
}
@@ -725,7 +728,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
.map { case (name, protocol) => s"${name.value}:${protocol.name}" }
.mkString(",") + s",$listenerName:${securityProtocol.name}"
- val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, config.brokerId.toString)
+ val props = fetchBrokerConfigsFromZooKeeper(servers.head)
props.put(KafkaConfig.ListenersProp, listeners)
props.put(KafkaConfig.ListenerSecurityProtocolMapProp, listenerMap)
securityProtocol match {
@@ -739,6 +742,10 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
case SecurityProtocol.PLAINTEXT => // no additional props
}
+ // Add a config to verify that configs whose types are not known are not returned by describeConfigs()
+ val unknownConfig = "some.config"
+ props.put(unknownConfig, "some.config.value")
+
alterConfigs(adminClients.head, props, perBrokerConfig = true).all.get
TestUtils.waitUntilTrue(() => servers.forall(server => server.config.listeners.size == existingListenerCount + 1),
@@ -755,6 +762,15 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
saslMechanisms.foreach(mechanism => verifyListener(securityProtocol, Some(mechanism)))
else
verifyListener(securityProtocol, None)
+
+ val brokerConfigs = describeConfig(adminClients.head).entries.asScala
+ props.asScala.foreach { case (name, value) =>
+ val entry = brokerConfigs.find(_.name == name).getOrElse(throw new IllegalArgumentException(s"Config not found $name"))
+ if (DynamicBrokerConfig.isPasswordConfig(name) || name == unknownConfig)
+ assertNull(s"Password or unknown config returned $entry", entry.value)
+ else
+ assertEquals(value, entry.value)
+ }
}
private def verifyRemoveListener(listenerName: String, securityProtocol: SecurityProtocol,
@@ -778,7 +794,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
.map { case (listenerName, protocol) => s"${listenerName.value}:${protocol.name}" }
.mkString(",")
- val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, config.brokerId.toString)
+ val props = fetchBrokerConfigsFromZooKeeper(servers.head)
val listenerProps = props.asScala.keySet.filter(_.startsWith(new ListenerName(listenerName).configPrefix))
listenerProps.foreach(props.remove)
props.put(KafkaConfig.ListenersProp, listeners)
@@ -811,6 +827,11 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
verifyProduceConsume(producer, consumer, numRecords = 10, topic)
}
+ private def fetchBrokerConfigsFromZooKeeper(server: KafkaServer): Properties = {
+ val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, server.config.brokerId.toString)
+ server.config.dynamicConfig.fromPersistentProps(props, perBrokerConfig = true)
+ }
+
private def bootstrapServers: String = TestUtils.bootstrapServers(servers, new ListenerName(SecureExternal))
private def createProducer(trustStore: File, retries: Int,
@@ -1090,9 +1111,9 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
private def configureDynamicKeystoreInZooKeeper(kafkaConfig: KafkaConfig, brokers: Seq[Int], sslProperties: Properties): Unit = {
val keystoreProps = new Properties
addKeystoreWithListenerPrefix(sslProperties, keystoreProps, SecureExternal)
- kafkaConfig.dynamicConfig.toPersistentProps(keystoreProps, perBrokerConfig = true)
+ val persistentProps = kafkaConfig.dynamicConfig.toPersistentProps(keystoreProps, perBrokerConfig = true)
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
- adminZkClient.changeBrokerConfig(brokers, keystoreProps)
+ adminZkClient.changeBrokerConfig(brokers, persistentProps)
}
private def waitForConfig(propName: String, propValue: String, maxWaitMs: Long = 10000): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index bca98d2..5c88bf2 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -142,14 +142,16 @@ class DynamicBrokerConfigTest {
}
private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean) {
- val config = KafkaConfig(TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181))
+ val configProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+ configProps.put(KafkaConfig.PasswordEncoderSecretProp, "broker.secret")
+ val config = KafkaConfig(configProps)
val props = new Properties
props.put(name, value)
val oldValue = config.originals.get(name)
def updateConfig() = {
if (perBrokerConfig)
- config.dynamicConfig.updateBrokerConfig(0, props)
+ config.dynamicConfig.updateBrokerConfig(0, config.dynamicConfig.toPersistentProps(props, perBrokerConfig))
else
config.dynamicConfig.updateDefaultConfig(props)
}
@@ -267,6 +269,18 @@ class DynamicBrokerConfigTest {
}
@Test
+ def testSynonyms(): Unit = {
+ assertEquals(List("listener.name.secure.ssl.keystore.type", "ssl.keystore.type"),
+ DynamicBrokerConfig.brokerConfigSynonyms("listener.name.secure.ssl.keystore.type", matchListenerOverride = true))
+ assertEquals(List("listener.name.sasl_ssl.plain.sasl.jaas.config", "sasl.jaas.config"),
+ DynamicBrokerConfig.brokerConfigSynonyms("listener.name.sasl_ssl.plain.sasl.jaas.config", matchListenerOverride = true))
+ assertEquals(List("some.config"),
+ DynamicBrokerConfig.brokerConfigSynonyms("some.config", matchListenerOverride = true))
+ assertEquals(List(KafkaConfig.LogRollTimeMillisProp, KafkaConfig.LogRollTimeHoursProp),
+ DynamicBrokerConfig.brokerConfigSynonyms(KafkaConfig.LogRollTimeMillisProp, matchListenerOverride = true))
+ }
+
+ @Test
def testDynamicConfigInitializationWithoutConfigsInZK(): Unit = {
val zkClient = EasyMock.createMock(classOf[kafka.zk.KafkaZkClient])
EasyMock.expect(zkClient.getEntityConfigs(EasyMock.anyString(), EasyMock.anyString())).andReturn(new java.util.Properties()).anyTimes()
--
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.