You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2020/05/05 21:19:15 UTC
[kafka] 01/02: KAFKA-9718;
Don't log passwords for AlterConfigs in request logs (#8294)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 177abc77ee29721164b8633a62d7b259eca4c7b3
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Mar 13 18:24:03 2020 +0000
KAFKA-9718; Don't log passwords for AlterConfigs in request logs (#8294)
Reviewers: Manikumar Reddy <ma...@gmail.com>
(cherry picked from commit f165cdc325388883541db381c4bdfd30da089b3b)
---
core/src/main/scala/kafka/log/LogConfig.scala | 4 +
.../main/scala/kafka/network/RequestChannel.scala | 54 +++++-
.../src/main/scala/kafka/server/AdminManager.scala | 21 +--
.../scala/kafka/server/DynamicBrokerConfig.scala | 6 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 14 ++
.../unit/kafka/network/RequestChannelTest.scala | 195 +++++++++++++++++++++
6 files changed, 277 insertions(+), 17 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 4f26716..59fe81c 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -299,6 +299,10 @@ object LogConfig {
def serverConfigName(configName: String): Option[String] = configDef.serverConfigName(configName)
+ def configType(configName: String): Option[ConfigDef.Type] = {
+ Option(configDef.configKeys.get(configName)).map(_.`type`)
+ }
+
/**
* Create a log config instance using the given properties and defaults
*/
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 531cac1..c9a7d03 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -23,9 +23,15 @@ import java.util.concurrent._
import com.typesafe.scalalogging.Logger
import com.yammer.metrics.core.Meter
+import kafka.log.LogConfig
import kafka.metrics.KafkaMetricsGroup
+import kafka.server.KafkaConfig
import kafka.utils.{Logging, NotNothing, Pool}
+import org.apache.kafka.common.config.types.Password
+import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData._
import org.apache.kafka.common.network.Send
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests._
@@ -98,7 +104,7 @@ object RequestChannel extends Logging {
releaseBuffer()
}
- def requestDesc(details: Boolean): String = s"$header -- ${body[AbstractRequest].toString(details)}"
+ def requestDesc(details: Boolean): String = s"$header -- ${loggableRequest.toString(details)}"
def body[T <: AbstractRequest](implicit classTag: ClassTag[T], nn: NotNothing[T]): T = {
bodyAndSize.request match {
@@ -108,6 +114,52 @@ object RequestChannel extends Logging {
}
}
+ def loggableRequest: AbstractRequest = {
+
+ def loggableValue(resourceType: ConfigResource.Type, name: String, value: String): String = {
+ val maybeSensitive = resourceType match {
+ case ConfigResource.Type.BROKER => KafkaConfig.maybeSensitive(KafkaConfig.configType(name))
+ case ConfigResource.Type.TOPIC => KafkaConfig.maybeSensitive(LogConfig.configType(name))
+ case ConfigResource.Type.BROKER_LOGGER => false
+ case _ => true
+ }
+ if (maybeSensitive) Password.HIDDEN else value
+ }
+
+ bodyAndSize.request match {
+ case alterConfigs: AlterConfigsRequest =>
+ val loggableConfigs = alterConfigs.configs().asScala.map { case (resource, config) =>
+ val loggableEntries = new AlterConfigsRequest.Config(config.entries.asScala.map { entry =>
+ new AlterConfigsRequest.ConfigEntry(entry.name, loggableValue(resource.`type`, entry.name, entry.value))
+ }.asJavaCollection)
+ (resource, loggableEntries)
+ }.asJava
+ new AlterConfigsRequest.Builder(loggableConfigs, alterConfigs.validateOnly).build(alterConfigs.version())
+
+ case alterConfigs: IncrementalAlterConfigsRequest =>
+ val resources = new AlterConfigsResourceCollection(alterConfigs.data.resources.size)
+ alterConfigs.data().resources().asScala.foreach { resource =>
+ val newResource = new AlterConfigsResource()
+ .setResourceName(resource.resourceName)
+ .setResourceType(resource.resourceType)
+ resource.configs.asScala.foreach { config =>
+ newResource.configs.add(new AlterableConfig()
+ .setName(config.name)
+ .setValue(loggableValue(ConfigResource.Type.forId(resource.resourceType), config.name, config.value))
+ .setConfigOperation(config.configOperation))
+ }
+ resources.add(newResource)
+ }
+ val data = new IncrementalAlterConfigsRequestData()
+ .setValidateOnly(alterConfigs.data().validateOnly())
+ .setResources(resources)
+ new IncrementalAlterConfigsRequest.Builder(data).build(alterConfigs.version)
+
+ case _ =>
+ bodyAndSize.request
+ }
+ }
+
trace(s"Processor $processor received request: ${requestDesc(true)}")
def requestThreadTimeNanos: Long = {
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 43f6e46..72b8d98 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -636,14 +636,6 @@ class AdminManager(val config: KafkaConfig,
DynamicBrokerConfig.brokerConfigSynonyms(name, matchListenerOverride = true)
}
- private def configType(name: String, synonyms: List[String]): ConfigDef.Type = {
- val configType = config.typeOf(name)
- if (configType != null)
- configType
- else
- synonyms.iterator.map(config.typeOf).find(_ != null).orNull
- }
-
private def configSynonyms(name: String, synonyms: List[String], isSensitive: Boolean): List[DescribeConfigsResponse.ConfigSynonym] = {
val dynamicConfig = config.dynamicConfig
val allSynonyms = mutable.Buffer[DescribeConfigsResponse.ConfigSynonym]()
@@ -664,9 +656,9 @@ class AdminManager(val config: KafkaConfig,
private def createTopicConfigEntry(logConfig: LogConfig, topicProps: Properties, includeSynonyms: Boolean)
(name: String, value: Any): DescribeConfigsResponse.ConfigEntry = {
- val configEntryType = logConfig.typeOf(name)
- val isSensitive = configEntryType == ConfigDef.Type.PASSWORD
- val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType)
+ val configEntryType = LogConfig.configType(name)
+ val isSensitive = KafkaConfig.maybeSensitive(configEntryType)
+ val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType.orNull)
val allSynonyms = {
val list = LogConfig.TopicConfigSynonyms.get(name)
.map(s => configSynonyms(s, brokerSynonyms(s), isSensitive))
@@ -684,14 +676,13 @@ class AdminManager(val config: KafkaConfig,
private def createBrokerConfigEntry(perBrokerConfig: Boolean, includeSynonyms: Boolean)
(name: String, value: Any): DescribeConfigsResponse.ConfigEntry = {
val allNames = brokerSynonyms(name)
- val configEntryType = configType(name, allNames)
- // If we can't determine the config entry type, treat it as a sensitive config to be safe
- val isSensitive = configEntryType == ConfigDef.Type.PASSWORD || configEntryType == null
+ val configEntryType = KafkaConfig.configType(name)
+ val isSensitive = KafkaConfig.maybeSensitive(configEntryType)
val valueAsString = if (isSensitive)
null
else value match {
case v: String => v
- case _ => ConfigDef.convertToString(value, configEntryType)
+ case _ => ConfigDef.convertToString(value, configEntryType.orNull)
}
val allSynonyms = configSynonyms(name, allNames, isSensitive)
.filter(perBrokerConfig || _.source == ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG)
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 7508184..92aa048 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -87,7 +87,11 @@ object DynamicBrokerConfig {
private val ClusterLevelListenerConfigs = Set(KafkaConfig.MaxConnectionsProp)
private val PerBrokerConfigs = DynamicSecurityConfigs ++
DynamicListenerConfig.ReconfigurableConfigs -- ClusterLevelListenerConfigs
- private val ListenerMechanismConfigs = Set(KafkaConfig.SaslJaasConfigProp)
+ private val ListenerMechanismConfigs = Set(KafkaConfig.SaslJaasConfigProp,
+ KafkaConfig.SaslLoginCallbackHandlerClassProp,
+ KafkaConfig.SaslLoginClassProp,
+ KafkaConfig.SaslServerCallbackHandlerClassProp,
+ KafkaConfig.ConnectionsMaxReauthMsProp)
private val ReloadableFileConfigs = Set(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 8650041..dc5c3c7 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1229,6 +1229,20 @@ object KafkaConfig {
def apply(props: java.util.Map[_, _]): KafkaConfig = new KafkaConfig(props, true)
+ def configType(configName: String): Option[ConfigDef.Type] = {
+ def typeOf(name: String): Option[ConfigDef.Type] = Option(configDef.configKeys.get(name)).map(_.`type`)
+
+ typeOf(configName) match {
+ case Some(t) => Some(t)
+ case None =>
+ DynamicBrokerConfig.brokerConfigSynonyms(configName, matchListenerOverride = true).flatMap(typeOf).headOption
+ }
+ }
+
+ def maybeSensitive(configType: Option[ConfigDef.Type]): Boolean = {
+ // If we can't determine the config entry type, treat it as a sensitive config to be safe
+ configType.isEmpty || configType.contains(ConfigDef.Type.PASSWORD)
+ }
}
class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigOverride: Option[DynamicBrokerConfig])
diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
new file mode 100644
index 0000000..7f570e6
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+
+import java.net.InetAddress
+import java.nio.ByteBuffer
+import java.util.Collections
+
+import kafka.network
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType
+import org.apache.kafka.common.config.types.Password
+import org.apache.kafka.common.config.{ConfigResource, SaslConfigs, SslConfigs, TopicConfig}
+import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData._
+import org.apache.kafka.common.network.{ClientInformation, ListenerName}
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.requests.AlterConfigsRequest._
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.easymock.EasyMock._
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.JavaConverters._
+
+class RequestChannelTest {
+
+ @Test
+ def testAlterRequests(): Unit = {
+
+ val sensitiveValue = "secret"
+ def verifyConfig(resource: ConfigResource, entries: Seq[ConfigEntry], expectedValues: Map[String, String]): Unit = {
+ val alterConfigs = request(new AlterConfigsRequest.Builder(Collections.singletonMap(resource,
+ new Config(entries.asJavaCollection)), true).build())
+ val loggableAlterConfigs = alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest]
+ val loggedConfig = loggableAlterConfigs.configs.get(resource)
+ assertEquals(expectedValues, toMap(loggedConfig))
+ val alterConfigsDesc = alterConfigs.requestDesc(details = true)
+ assertFalse(s"Sensitive config logged $alterConfigsDesc", alterConfigsDesc.contains(sensitiveValue))
+ }
+
+ val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, "1")
+ val keystorePassword = new ConfigEntry(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sensitiveValue)
+ verifyConfig(brokerResource, Seq(keystorePassword), Map(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> Password.HIDDEN))
+
+ val keystoreLocation = new ConfigEntry(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/path/to/keystore")
+ verifyConfig(brokerResource, Seq(keystoreLocation), Map(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG -> "/path/to/keystore"))
+ verifyConfig(brokerResource, Seq(keystoreLocation, keystorePassword),
+ Map(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG -> "/path/to/keystore", SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> Password.HIDDEN))
+
+ val listenerKeyPassword = new ConfigEntry(s"listener.name.internal.${SslConfigs.SSL_KEY_PASSWORD_CONFIG}", sensitiveValue)
+ verifyConfig(brokerResource, Seq(listenerKeyPassword), Map(listenerKeyPassword.name -> Password.HIDDEN))
+
+ val listenerKeystore = new ConfigEntry(s"listener.name.internal.${SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG}", "/path/to/keystore")
+ verifyConfig(brokerResource, Seq(listenerKeystore), Map(listenerKeystore.name -> "/path/to/keystore"))
+
+ val plainJaasConfig = new ConfigEntry(s"listener.name.internal.plain.${SaslConfigs.SASL_JAAS_CONFIG}", sensitiveValue)
+ verifyConfig(brokerResource, Seq(plainJaasConfig), Map(plainJaasConfig.name -> Password.HIDDEN))
+
+ val plainLoginCallback = new ConfigEntry(s"listener.name.internal.plain.${SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS}", "test.LoginClass")
+ verifyConfig(brokerResource, Seq(plainLoginCallback), Map(plainLoginCallback.name -> plainLoginCallback.value))
+
+ val customConfig = new ConfigEntry("custom.config", sensitiveValue)
+ verifyConfig(brokerResource, Seq(customConfig), Map(customConfig.name -> Password.HIDDEN))
+
+ val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, "testTopic")
+ val compressionType = new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4")
+ verifyConfig(topicResource, Seq(compressionType), Map(TopicConfig.COMPRESSION_TYPE_CONFIG -> "lz4"))
+ verifyConfig(topicResource, Seq(customConfig), Map(customConfig.name -> Password.HIDDEN))
+
+ // Verify empty request
+ val alterConfigs = request(new AlterConfigsRequest.Builder(Collections.emptyMap[ConfigResource, Config], true).build())
+ assertEquals(Collections.emptyMap, alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest].configs)
+ }
+
+ @Test
+ def testIncrementalAlterRequests(): Unit = {
+
+ def incrementalAlterConfigs(resource: ConfigResource,
+ entries: Map[String, String], op: OpType): IncrementalAlterConfigsRequest = {
+ val data = new IncrementalAlterConfigsRequestData()
+ val alterableConfigs = new AlterableConfigCollection()
+ entries.foreach { case (name, value) =>
+ alterableConfigs.add(new AlterableConfig().setName(name).setValue(value).setConfigOperation(op.id))
+ }
+ data.resources.add(new AlterConfigsResource()
+ .setResourceName(resource.name).setResourceType(resource.`type`.id)
+ .setConfigs(alterableConfigs))
+ new IncrementalAlterConfigsRequest.Builder(data).build()
+ }
+
+ val sensitiveValue = "secret"
+ def verifyConfig(resource: ConfigResource,
+ op: OpType,
+ entries: Map[String, String],
+ expectedValues: Map[String, String]): Unit = {
+ val alterConfigs = request(incrementalAlterConfigs(resource, entries, op))
+ val loggableAlterConfigs = alterConfigs.loggableRequest.asInstanceOf[IncrementalAlterConfigsRequest]
+ val loggedConfig = loggableAlterConfigs.data.resources.find(resource.`type`.id, resource.name).configs
+ assertEquals(expectedValues, toMap(loggedConfig))
+ val alterConfigsDesc = alterConfigs.requestDesc(details = true)
+ assertFalse(s"Sensitive config logged $alterConfigsDesc", alterConfigsDesc.contains(sensitiveValue))
+ }
+
+ val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, "1")
+ val keystorePassword = Map(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> sensitiveValue)
+ verifyConfig(brokerResource, OpType.SET, keystorePassword, Map(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> Password.HIDDEN))
+
+ val keystoreLocation = Map(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG -> "/path/to/keystore")
+ verifyConfig(brokerResource, OpType.SET, keystoreLocation, keystoreLocation)
+ verifyConfig(brokerResource, OpType.SET, keystoreLocation ++ keystorePassword,
+ Map(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG -> "/path/to/keystore", SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> Password.HIDDEN))
+
+ val listenerKeyPassword = Map(s"listener.name.internal.${SslConfigs.SSL_KEY_PASSWORD_CONFIG}" -> sensitiveValue)
+ verifyConfig(brokerResource, OpType.SET, listenerKeyPassword,
+ Map(s"listener.name.internal.${SslConfigs.SSL_KEY_PASSWORD_CONFIG}" -> Password.HIDDEN))
+
+ val listenerKeystore = Map(s"listener.name.internal.${SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG}" -> "/path/to/keystore")
+ verifyConfig(brokerResource, OpType.SET, listenerKeystore, listenerKeystore)
+
+ val plainJaasConfig = Map(s"listener.name.internal.plain.${SaslConfigs.SASL_JAAS_CONFIG}" -> sensitiveValue)
+ verifyConfig(brokerResource, OpType.SET, plainJaasConfig,
+ Map(s"listener.name.internal.plain.${SaslConfigs.SASL_JAAS_CONFIG}" -> Password.HIDDEN))
+
+ val plainLoginCallback = Map(s"listener.name.internal.plain.${SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS}" -> "test.LoginClass")
+ verifyConfig(brokerResource, OpType.SET, plainLoginCallback, plainLoginCallback)
+
+ val sslProtocols = Map(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG -> "TLSv1.1")
+ verifyConfig(brokerResource, OpType.APPEND, sslProtocols, Map(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG -> "TLSv1.1"))
+ verifyConfig(brokerResource, OpType.SUBTRACT, sslProtocols, Map(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG -> "TLSv1.1"))
+ val cipherSuites = Map(SslConfigs.SSL_CIPHER_SUITES_CONFIG -> null)
+ verifyConfig(brokerResource, OpType.DELETE, cipherSuites, cipherSuites)
+
+ val customConfig = Map("custom.config" -> sensitiveValue)
+ verifyConfig(brokerResource, OpType.SET, customConfig, Map("custom.config" -> Password.HIDDEN))
+
+ val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, "testTopic")
+ val compressionType = Map(TopicConfig.COMPRESSION_TYPE_CONFIG -> "lz4")
+ verifyConfig(topicResource, OpType.SET, compressionType, compressionType)
+ verifyConfig(topicResource, OpType.SET, customConfig, Map("custom.config" -> Password.HIDDEN))
+ }
+
+ @Test
+ def testNonAlterRequestsNotTransformed(): Unit = {
+ val metadataRequest = request(new MetadataRequest.Builder(List("topic").asJava, true).build())
+ assertSame(metadataRequest.body[MetadataRequest], metadataRequest.loggableRequest)
+ }
+
+ def request(req: AbstractRequest): RequestChannel.Request = {
+ val buffer = req.serialize(new RequestHeader(req.api, req.version, "client-id", 1))
+ val requestContext = newRequestContext(buffer)
+ new network.RequestChannel.Request(processor = 1,
+ requestContext,
+ startTimeNanos = 0,
+ createNiceMock(classOf[MemoryPool]),
+ buffer,
+ createNiceMock(classOf[RequestChannel.Metrics])
+ )
+ }
+
+ private def newRequestContext(buffer: ByteBuffer): RequestContext = {
+ new RequestContext(
+ RequestHeader.parse(buffer),
+ "connection-id",
+ InetAddress.getLoopbackAddress,
+ new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user"),
+ ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+ SecurityProtocol.PLAINTEXT,
+ new ClientInformation("name", "version"))
+ }
+
+ private def toMap(config: Config): Map[String, String] = {
+ config.entries.asScala.map(e => e.name -> e.value).toMap
+ }
+
+ private def toMap(config: IncrementalAlterConfigsRequestData.AlterableConfigCollection): Map[String, String] = {
+ config.asScala.map(e => e.name -> e.value).toMap
+ }
+}
\ No newline at end of file