You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2020/05/05 21:19:14 UTC

[kafka] branch 2.5 updated (89317c6 -> e5b5cbe)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a change to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from 89317c6  KAFKA-9633: Ensure ConfigProviders are closed (#8204)
     new 177abc7  KAFKA-9718; Don't log passwords for AlterConfigs in request logs (#8294)
     new e5b5cbe  KAFKA-9625: Fix altering and describing dynamic broker configurations (#8260)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 core/src/main/scala/kafka/log/LogConfig.scala      |   4 +
 .../main/scala/kafka/network/RequestChannel.scala  |  54 +++++-
 .../src/main/scala/kafka/server/AdminManager.scala |  21 +--
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  39 +++--
 .../main/scala/kafka/server/DynamicConfig.scala    |   2 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  32 ++++
 .../kafka/api/PlaintextAdminIntegrationTest.scala  |  79 ++++++++-
 .../unit/kafka/network/RequestChannelTest.scala    | 195 +++++++++++++++++++++
 8 files changed, 396 insertions(+), 30 deletions(-)
 create mode 100644 core/src/test/scala/unit/kafka/network/RequestChannelTest.scala


[kafka] 01/02: KAFKA-9718; Don't log passwords for AlterConfigs in request logs (#8294)

Posted by cm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 177abc77ee29721164b8633a62d7b259eca4c7b3
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Mar 13 18:24:03 2020 +0000

    KAFKA-9718; Don't log passwords for AlterConfigs in request logs (#8294)
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
    (cherry picked from commit f165cdc325388883541db381c4bdfd30da089b3b)
---
 core/src/main/scala/kafka/log/LogConfig.scala      |   4 +
 .../main/scala/kafka/network/RequestChannel.scala  |  54 +++++-
 .../src/main/scala/kafka/server/AdminManager.scala |  21 +--
 .../scala/kafka/server/DynamicBrokerConfig.scala   |   6 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  14 ++
 .../unit/kafka/network/RequestChannelTest.scala    | 195 +++++++++++++++++++++
 6 files changed, 277 insertions(+), 17 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 4f26716..59fe81c 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -299,6 +299,10 @@ object LogConfig {
 
   def serverConfigName(configName: String): Option[String] = configDef.serverConfigName(configName)
 
+  def configType(configName: String): Option[ConfigDef.Type] = {
+    Option(configDef.configKeys.get(configName)).map(_.`type`)
+  }
+
   /**
    * Create a log config instance using the given properties and defaults
    */
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 531cac1..c9a7d03 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -23,9 +23,15 @@ import java.util.concurrent._
 
 import com.typesafe.scalalogging.Logger
 import com.yammer.metrics.core.Meter
+import kafka.log.LogConfig
 import kafka.metrics.KafkaMetricsGroup
+import kafka.server.KafkaConfig
 import kafka.utils.{Logging, NotNothing, Pool}
+import org.apache.kafka.common.config.types.Password
+import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData._
 import org.apache.kafka.common.network.Send
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests._
@@ -98,7 +104,7 @@ object RequestChannel extends Logging {
       releaseBuffer()
     }
 
-    def requestDesc(details: Boolean): String = s"$header -- ${body[AbstractRequest].toString(details)}"
+    def requestDesc(details: Boolean): String = s"$header -- ${loggableRequest.toString(details)}"
 
     def body[T <: AbstractRequest](implicit classTag: ClassTag[T], nn: NotNothing[T]): T = {
       bodyAndSize.request match {
@@ -108,6 +114,52 @@ object RequestChannel extends Logging {
       }
     }
 
+    def loggableRequest: AbstractRequest = {
+
+      def loggableValue(resourceType: ConfigResource.Type, name: String, value: String): String = {
+        val maybeSensitive = resourceType match {
+          case ConfigResource.Type.BROKER => KafkaConfig.maybeSensitive(KafkaConfig.configType(name))
+          case ConfigResource.Type.TOPIC => KafkaConfig.maybeSensitive(LogConfig.configType(name))
+          case ConfigResource.Type.BROKER_LOGGER => false
+          case _ => true
+        }
+        if (maybeSensitive) Password.HIDDEN else value
+      }
+
+      bodyAndSize.request match {
+        case alterConfigs: AlterConfigsRequest =>
+          val loggableConfigs = alterConfigs.configs().asScala.map { case (resource, config) =>
+            val loggableEntries = new AlterConfigsRequest.Config(config.entries.asScala.map { entry =>
+                new AlterConfigsRequest.ConfigEntry(entry.name, loggableValue(resource.`type`, entry.name, entry.value))
+            }.asJavaCollection)
+            (resource, loggableEntries)
+          }.asJava
+          new AlterConfigsRequest.Builder(loggableConfigs, alterConfigs.validateOnly).build(alterConfigs.version())
+
+        case alterConfigs: IncrementalAlterConfigsRequest =>
+          val resources = new AlterConfigsResourceCollection(alterConfigs.data.resources.size)
+          alterConfigs.data().resources().asScala.foreach { resource =>
+            val newResource = new AlterConfigsResource()
+              .setResourceName(resource.resourceName)
+              .setResourceType(resource.resourceType)
+            resource.configs.asScala.foreach { config =>
+              newResource.configs.add(new AlterableConfig()
+                .setName(config.name)
+                .setValue(loggableValue(ConfigResource.Type.forId(resource.resourceType), config.name, config.value))
+                .setConfigOperation(config.configOperation))
+            }
+            resources.add(newResource)
+          }
+          val data = new IncrementalAlterConfigsRequestData()
+            .setValidateOnly(alterConfigs.data().validateOnly())
+            .setResources(resources)
+          new IncrementalAlterConfigsRequest.Builder(data).build(alterConfigs.version)
+
+        case _ =>
+          bodyAndSize.request
+      }
+    }
+
     trace(s"Processor $processor received request: ${requestDesc(true)}")
 
     def requestThreadTimeNanos: Long = {
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 43f6e46..72b8d98 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -636,14 +636,6 @@ class AdminManager(val config: KafkaConfig,
     DynamicBrokerConfig.brokerConfigSynonyms(name, matchListenerOverride = true)
   }
 
-  private def configType(name: String, synonyms: List[String]): ConfigDef.Type = {
-    val configType = config.typeOf(name)
-    if (configType != null)
-      configType
-    else
-      synonyms.iterator.map(config.typeOf).find(_ != null).orNull
-  }
-
   private def configSynonyms(name: String, synonyms: List[String], isSensitive: Boolean): List[DescribeConfigsResponse.ConfigSynonym] = {
     val dynamicConfig = config.dynamicConfig
     val allSynonyms = mutable.Buffer[DescribeConfigsResponse.ConfigSynonym]()
@@ -664,9 +656,9 @@ class AdminManager(val config: KafkaConfig,
 
   private def createTopicConfigEntry(logConfig: LogConfig, topicProps: Properties, includeSynonyms: Boolean)
                                     (name: String, value: Any): DescribeConfigsResponse.ConfigEntry = {
-    val configEntryType = logConfig.typeOf(name)
-    val isSensitive = configEntryType == ConfigDef.Type.PASSWORD
-    val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType)
+    val configEntryType = LogConfig.configType(name)
+    val isSensitive = KafkaConfig.maybeSensitive(configEntryType)
+    val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType.orNull)
     val allSynonyms = {
       val list = LogConfig.TopicConfigSynonyms.get(name)
         .map(s => configSynonyms(s, brokerSynonyms(s), isSensitive))
@@ -684,14 +676,13 @@ class AdminManager(val config: KafkaConfig,
   private def createBrokerConfigEntry(perBrokerConfig: Boolean, includeSynonyms: Boolean)
                                      (name: String, value: Any): DescribeConfigsResponse.ConfigEntry = {
     val allNames = brokerSynonyms(name)
-    val configEntryType = configType(name, allNames)
-    // If we can't determine the config entry type, treat it as a sensitive config to be safe
-    val isSensitive = configEntryType == ConfigDef.Type.PASSWORD || configEntryType == null
+    val configEntryType = KafkaConfig.configType(name)
+    val isSensitive = KafkaConfig.maybeSensitive(configEntryType)
     val valueAsString = if (isSensitive)
       null
     else value match {
       case v: String => v
-      case _ => ConfigDef.convertToString(value, configEntryType)
+      case _ => ConfigDef.convertToString(value, configEntryType.orNull)
     }
     val allSynonyms = configSynonyms(name, allNames, isSensitive)
         .filter(perBrokerConfig || _.source == ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG)
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 7508184..92aa048 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -87,7 +87,11 @@ object DynamicBrokerConfig {
   private val ClusterLevelListenerConfigs = Set(KafkaConfig.MaxConnectionsProp)
   private val PerBrokerConfigs = DynamicSecurityConfigs  ++
     DynamicListenerConfig.ReconfigurableConfigs -- ClusterLevelListenerConfigs
-  private val ListenerMechanismConfigs = Set(KafkaConfig.SaslJaasConfigProp)
+  private val ListenerMechanismConfigs = Set(KafkaConfig.SaslJaasConfigProp,
+    KafkaConfig.SaslLoginCallbackHandlerClassProp,
+    KafkaConfig.SaslLoginClassProp,
+    KafkaConfig.SaslServerCallbackHandlerClassProp,
+    KafkaConfig.ConnectionsMaxReauthMsProp)
 
   private val ReloadableFileConfigs = Set(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)
 
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 8650041..dc5c3c7 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1229,6 +1229,20 @@ object KafkaConfig {
 
   def apply(props: java.util.Map[_, _]): KafkaConfig = new KafkaConfig(props, true)
 
+  def configType(configName: String): Option[ConfigDef.Type] = {
+    def typeOf(name: String): Option[ConfigDef.Type] = Option(configDef.configKeys.get(name)).map(_.`type`)
+
+    typeOf(configName) match {
+      case Some(t) => Some(t)
+      case None =>
+        DynamicBrokerConfig.brokerConfigSynonyms(configName, matchListenerOverride = true).flatMap(typeOf).headOption
+    }
+  }
+
+  def maybeSensitive(configType: Option[ConfigDef.Type]): Boolean = {
+    // If we can't determine the config entry type, treat it as a sensitive config to be safe
+    configType.isEmpty || configType.contains(ConfigDef.Type.PASSWORD)
+  }
 }
 
 class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigOverride: Option[DynamicBrokerConfig])
diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
new file mode 100644
index 0000000..7f570e6
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+
+import java.net.InetAddress
+import java.nio.ByteBuffer
+import java.util.Collections
+
+import kafka.network
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType
+import org.apache.kafka.common.config.types.Password
+import org.apache.kafka.common.config.{ConfigResource, SaslConfigs, SslConfigs, TopicConfig}
+import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData._
+import org.apache.kafka.common.network.{ClientInformation, ListenerName}
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.requests.AlterConfigsRequest._
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.easymock.EasyMock._
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.JavaConverters._
+
+class RequestChannelTest {
+
+  @Test
+  def testAlterRequests(): Unit = {
+
+    val sensitiveValue = "secret"
+    def verifyConfig(resource: ConfigResource, entries: Seq[ConfigEntry], expectedValues: Map[String, String]): Unit = {
+      val alterConfigs = request(new AlterConfigsRequest.Builder(Collections.singletonMap(resource,
+        new Config(entries.asJavaCollection)), true).build())
+      val loggableAlterConfigs = alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest]
+      val loggedConfig = loggableAlterConfigs.configs.get(resource)
+      assertEquals(expectedValues, toMap(loggedConfig))
+      val alterConfigsDesc = alterConfigs.requestDesc(details = true)
+      assertFalse(s"Sensitive config logged $alterConfigsDesc", alterConfigsDesc.contains(sensitiveValue))
+    }
+
+    val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, "1")
+    val keystorePassword = new ConfigEntry(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sensitiveValue)
+    verifyConfig(brokerResource, Seq(keystorePassword), Map(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> Password.HIDDEN))
+
+    val keystoreLocation = new ConfigEntry(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/path/to/keystore")
+    verifyConfig(brokerResource, Seq(keystoreLocation), Map(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG -> "/path/to/keystore"))
+    verifyConfig(brokerResource, Seq(keystoreLocation, keystorePassword),
+      Map(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG -> "/path/to/keystore", SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> Password.HIDDEN))
+
+    val listenerKeyPassword = new ConfigEntry(s"listener.name.internal.${SslConfigs.SSL_KEY_PASSWORD_CONFIG}", sensitiveValue)
+    verifyConfig(brokerResource, Seq(listenerKeyPassword), Map(listenerKeyPassword.name -> Password.HIDDEN))
+
+    val listenerKeystore = new ConfigEntry(s"listener.name.internal.${SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG}", "/path/to/keystore")
+    verifyConfig(brokerResource, Seq(listenerKeystore), Map(listenerKeystore.name -> "/path/to/keystore"))
+
+    val plainJaasConfig = new ConfigEntry(s"listener.name.internal.plain.${SaslConfigs.SASL_JAAS_CONFIG}", sensitiveValue)
+    verifyConfig(brokerResource, Seq(plainJaasConfig), Map(plainJaasConfig.name -> Password.HIDDEN))
+
+    val plainLoginCallback = new ConfigEntry(s"listener.name.internal.plain.${SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS}", "test.LoginClass")
+    verifyConfig(brokerResource, Seq(plainLoginCallback), Map(plainLoginCallback.name -> plainLoginCallback.value))
+
+    val customConfig = new ConfigEntry("custom.config", sensitiveValue)
+    verifyConfig(brokerResource, Seq(customConfig), Map(customConfig.name -> Password.HIDDEN))
+
+    val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, "testTopic")
+    val compressionType = new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4")
+    verifyConfig(topicResource, Seq(compressionType), Map(TopicConfig.COMPRESSION_TYPE_CONFIG -> "lz4"))
+    verifyConfig(topicResource, Seq(customConfig), Map(customConfig.name -> Password.HIDDEN))
+
+    // Verify empty request
+    val alterConfigs = request(new AlterConfigsRequest.Builder(Collections.emptyMap[ConfigResource, Config], true).build())
+    assertEquals(Collections.emptyMap, alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest].configs)
+  }
+
+  @Test
+  def testIncrementalAlterRequests(): Unit = {
+
+    def incrementalAlterConfigs(resource: ConfigResource,
+                                entries: Map[String, String], op: OpType): IncrementalAlterConfigsRequest = {
+      val data = new IncrementalAlterConfigsRequestData()
+      val alterableConfigs = new AlterableConfigCollection()
+      entries.foreach { case (name, value) =>
+        alterableConfigs.add(new AlterableConfig().setName(name).setValue(value).setConfigOperation(op.id))
+      }
+      data.resources.add(new AlterConfigsResource()
+        .setResourceName(resource.name).setResourceType(resource.`type`.id)
+        .setConfigs(alterableConfigs))
+      new IncrementalAlterConfigsRequest.Builder(data).build()
+    }
+
+    val sensitiveValue = "secret"
+    def verifyConfig(resource: ConfigResource,
+                     op: OpType,
+                     entries: Map[String, String],
+                     expectedValues: Map[String, String]): Unit = {
+      val alterConfigs = request(incrementalAlterConfigs(resource, entries, op))
+      val loggableAlterConfigs = alterConfigs.loggableRequest.asInstanceOf[IncrementalAlterConfigsRequest]
+      val loggedConfig = loggableAlterConfigs.data.resources.find(resource.`type`.id, resource.name).configs
+      assertEquals(expectedValues, toMap(loggedConfig))
+      val alterConfigsDesc = alterConfigs.requestDesc(details = true)
+      assertFalse(s"Sensitive config logged $alterConfigsDesc", alterConfigsDesc.contains(sensitiveValue))
+    }
+
+    val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, "1")
+    val keystorePassword = Map(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> sensitiveValue)
+    verifyConfig(brokerResource, OpType.SET, keystorePassword, Map(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> Password.HIDDEN))
+
+    val keystoreLocation = Map(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG -> "/path/to/keystore")
+    verifyConfig(brokerResource, OpType.SET, keystoreLocation, keystoreLocation)
+    verifyConfig(brokerResource, OpType.SET, keystoreLocation ++ keystorePassword,
+      Map(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG -> "/path/to/keystore", SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> Password.HIDDEN))
+
+    val listenerKeyPassword = Map(s"listener.name.internal.${SslConfigs.SSL_KEY_PASSWORD_CONFIG}" -> sensitiveValue)
+    verifyConfig(brokerResource, OpType.SET, listenerKeyPassword,
+      Map(s"listener.name.internal.${SslConfigs.SSL_KEY_PASSWORD_CONFIG}" -> Password.HIDDEN))
+
+    val listenerKeystore = Map(s"listener.name.internal.${SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG}" -> "/path/to/keystore")
+    verifyConfig(brokerResource, OpType.SET, listenerKeystore, listenerKeystore)
+
+    val plainJaasConfig = Map(s"listener.name.internal.plain.${SaslConfigs.SASL_JAAS_CONFIG}" -> sensitiveValue)
+    verifyConfig(brokerResource, OpType.SET, plainJaasConfig,
+      Map(s"listener.name.internal.plain.${SaslConfigs.SASL_JAAS_CONFIG}" -> Password.HIDDEN))
+
+    val plainLoginCallback = Map(s"listener.name.internal.plain.${SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS}" -> "test.LoginClass")
+    verifyConfig(brokerResource, OpType.SET, plainLoginCallback, plainLoginCallback)
+
+    val sslProtocols = Map(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG -> "TLSv1.1")
+    verifyConfig(brokerResource, OpType.APPEND, sslProtocols, Map(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG -> "TLSv1.1"))
+    verifyConfig(brokerResource, OpType.SUBTRACT, sslProtocols, Map(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG -> "TLSv1.1"))
+    val cipherSuites = Map(SslConfigs.SSL_CIPHER_SUITES_CONFIG -> null)
+    verifyConfig(brokerResource, OpType.DELETE, cipherSuites, cipherSuites)
+
+    val customConfig = Map("custom.config" -> sensitiveValue)
+    verifyConfig(brokerResource, OpType.SET, customConfig, Map("custom.config" -> Password.HIDDEN))
+
+    val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, "testTopic")
+    val compressionType = Map(TopicConfig.COMPRESSION_TYPE_CONFIG -> "lz4")
+    verifyConfig(topicResource, OpType.SET, compressionType, compressionType)
+    verifyConfig(topicResource, OpType.SET, customConfig, Map("custom.config" -> Password.HIDDEN))
+  }
+
+  @Test
+  def testNonAlterRequestsNotTransformed(): Unit = {
+    val metadataRequest = request(new MetadataRequest.Builder(List("topic").asJava, true).build())
+    assertSame(metadataRequest.body[MetadataRequest], metadataRequest.loggableRequest)
+  }
+
+  def request(req: AbstractRequest): RequestChannel.Request = {
+    val buffer = req.serialize(new RequestHeader(req.api, req.version, "client-id", 1))
+    val requestContext = newRequestContext(buffer)
+    new network.RequestChannel.Request(processor = 1,
+      requestContext,
+      startTimeNanos = 0,
+      createNiceMock(classOf[MemoryPool]),
+      buffer,
+      createNiceMock(classOf[RequestChannel.Metrics])
+    )
+  }
+
+  private def newRequestContext(buffer: ByteBuffer): RequestContext = {
+    new RequestContext(
+      RequestHeader.parse(buffer),
+      "connection-id",
+      InetAddress.getLoopbackAddress,
+      new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user"),
+      ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+      SecurityProtocol.PLAINTEXT,
+      new ClientInformation("name", "version"))
+  }
+
+  private def toMap(config: Config): Map[String, String] = {
+    config.entries.asScala.map(e => e.name -> e.value).toMap
+  }
+
+  private def toMap(config: IncrementalAlterConfigsRequestData.AlterableConfigCollection): Map[String, String] = {
+    config.asScala.map(e => e.name -> e.value).toMap
+  }
+}
\ No newline at end of file


[kafka] 02/02: KAFKA-9625: Fix altering and describing dynamic broker configurations (#8260)

Posted by cm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit e5b5cbea70f5e02767c6df84f4cc7fbfb9288db2
Author: Sanjana Kaundinya <sk...@gmail.com>
AuthorDate: Tue Mar 17 23:02:33 2020 -0700

    KAFKA-9625: Fix altering and describing dynamic broker configurations (#8260)
    
    * Broker throttles were incorrectly marked as sensitive configurations.  Fix this, so that their values can be returned via DescribeConfigs as expected.
    
    * Previously, changes to broker configs that consisted only of deletions were ignored by the brokers because of faulty delta calculation logic that didn't consider deletions as changes, only alterations as changes.  Fix this and add a regression test.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
    (cherry picked from commit 5fc3cd61fcb73da8b52f34b72fe6bb7457f46ce2)
---
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 33 ++++++---
 .../main/scala/kafka/server/DynamicConfig.scala    |  2 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala | 22 +++++-
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 79 +++++++++++++++++++++-
 4 files changed, 121 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 92aa048..2f47c21 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -468,10 +468,19 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
       reconfigurable.reconfigure(newConfig)
   }
 
-  private def updatedConfigs(newProps: java.util.Map[String, _], currentProps: java.util.Map[_, _]): mutable.Map[String, _] = {
-    newProps.asScala.filter {
+  /**
+   * Returns the change in configurations between the new props and current props by returning a
+   * map of the changed configs, as well as the set of deleted keys
+   */
+  private def updatedConfigs(newProps: java.util.Map[String, _], currentProps: java.util.Map[String, _]):
+  (mutable.Map[String, _], Set[String]) = {
+    val changeMap = newProps.asScala.filter {
       case (k, v) => v != currentProps.get(k)
     }
+    val deletedKeySet = currentProps.asScala.filter {
+      case (k, _) => !newProps.containsKey(k)
+    }.keySet
+    (changeMap, deletedKeySet)
   }
 
   /**
@@ -510,8 +519,8 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
 
   private def processReconfiguration(newProps: Map[String, String], validateOnly: Boolean): (KafkaConfig, List[BrokerReconfigurable]) = {
     val newConfig = new KafkaConfig(newProps.asJava, !validateOnly, None)
-    val updatedMap = updatedConfigs(newConfig.originalsFromThisConfig, currentConfig.originals)
-    if (updatedMap.nonEmpty) {
+    val (changeMap, deletedKeySet) = updatedConfigs(newConfig.originalsFromThisConfig, currentConfig.originals)
+    if (changeMap.nonEmpty || deletedKeySet.nonEmpty) {
       try {
         val customConfigs = new util.HashMap[String, Object](newConfig.originalsFromThisConfig) // non-Kafka configs
         newConfig.valuesFromThisConfig.keySet.asScala.foreach(customConfigs.remove)
@@ -519,14 +528,14 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
           case listenerReconfigurable: ListenerReconfigurable =>
             processListenerReconfigurable(listenerReconfigurable, newConfig, customConfigs, validateOnly, reloadOnly = false)
           case reconfigurable =>
-            if (needsReconfiguration(reconfigurable.reconfigurableConfigs, updatedMap.keySet))
-              processReconfigurable(reconfigurable, updatedMap.keySet, newConfig.valuesFromThisConfig, customConfigs, validateOnly)
+            if (needsReconfiguration(reconfigurable.reconfigurableConfigs, changeMap.keySet, deletedKeySet))
+              processReconfigurable(reconfigurable, changeMap.keySet, newConfig.valuesFromThisConfig, customConfigs, validateOnly)
         }
 
         // BrokerReconfigurable updates are processed after config is updated. Only do the validation here.
         val brokerReconfigurablesToUpdate = mutable.Buffer[BrokerReconfigurable]()
         brokerReconfigurables.foreach { reconfigurable =>
-          if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, updatedMap.keySet)) {
+          if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, changeMap.keySet, deletedKeySet)) {
             reconfigurable.validateReconfiguration(newConfig)
             if (!validateOnly)
               brokerReconfigurablesToUpdate += reconfigurable
@@ -544,8 +553,9 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
       (currentConfig, List.empty)
   }
 
-  private def needsReconfiguration(reconfigurableConfigs: util.Set[String], updatedKeys: Set[String]): Boolean = {
-    reconfigurableConfigs.asScala.intersect(updatedKeys).nonEmpty
+  private def needsReconfiguration(reconfigurableConfigs: util.Set[String], updatedKeys: Set[String], deletedKeys: Set[String]): Boolean = {
+    reconfigurableConfigs.asScala.intersect(updatedKeys).nonEmpty ||
+      reconfigurableConfigs.asScala.intersect(deletedKeys).nonEmpty
   }
 
   private def processListenerReconfigurable(listenerReconfigurable: ListenerReconfigurable,
@@ -556,8 +566,9 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
     val listenerName = listenerReconfigurable.listenerName
     val oldValues = currentConfig.valuesWithPrefixOverride(listenerName.configPrefix)
     val newValues = newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
-    val updatedKeys = updatedConfigs(newValues, oldValues).keySet
-    val configsChanged = needsReconfiguration(listenerReconfigurable.reconfigurableConfigs, updatedKeys)
+    val (changeMap, deletedKeys) = updatedConfigs(newValues, oldValues)
+    val updatedKeys = changeMap.keySet
+    val configsChanged = needsReconfiguration(listenerReconfigurable.reconfigurableConfigs, updatedKeys, deletedKeys)
     // if `reloadOnly`, reconfigure if configs haven't changed. Otherwise reconfigure if configs have changed
     if (reloadOnly != configsChanged)
       processReconfigurable(listenerReconfigurable, updatedKeys, newValues, customConfigs, validateOnly)
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala
index e5974d3..b70579e 100644
--- a/core/src/main/scala/kafka/server/DynamicConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfig.scala
@@ -54,7 +54,7 @@ object DynamicConfig {
       s"This property can be only set dynamically. It is suggested that the limit be kept above 1MB/s for accurate behaviour."
 
     //Definitions
-    private val brokerConfigDef = new ConfigDef()
+    val brokerConfigDef = new ConfigDef()
       //round minimum value down, to make it easier for users.
       .define(LeaderReplicationThrottledRateProp, LONG, DefaultReplicationThrottledRate, atLeast(0), MEDIUM, LeaderReplicationThrottledRateDoc)
       .define(FollowerReplicationThrottledRateProp, LONG, DefaultReplicationThrottledRate, atLeast(0), MEDIUM, FollowerReplicationThrottledRateDoc)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index dc5c3c7..fe0f8df 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1229,9 +1229,13 @@ object KafkaConfig {
 
   def apply(props: java.util.Map[_, _]): KafkaConfig = new KafkaConfig(props, true)
 
-  def configType(configName: String): Option[ConfigDef.Type] = {
-    def typeOf(name: String): Option[ConfigDef.Type] = Option(configDef.configKeys.get(name)).map(_.`type`)
+  private def typeOf(name: String): Option[ConfigDef.Type] = Option(configDef.configKeys.get(name)).map(_.`type`)
 
+  def configType(configName: String): Option[ConfigDef.Type] = {
+    val configType = configTypeExact(configName)
+    if (configType.isDefined) {
+      return configType
+    }
     typeOf(configName) match {
       case Some(t) => Some(t)
       case None =>
@@ -1239,6 +1243,20 @@ object KafkaConfig {
     }
   }
 
+  private def configTypeExact(exactName: String): Option[ConfigDef.Type] = {
+    val configType = typeOf(exactName).orNull
+    if (configType != null) {
+      Some(configType)
+    } else {
+      val configKey = DynamicConfig.Broker.brokerConfigDef.configKeys().get(exactName)
+      if (configKey != null) {
+        Some(configKey.`type`)
+      } else {
+        None
+      }
+    }
+  }
+
   def maybeSensitive(configType: Option[ConfigDef.Type]): Boolean = {
     // If we can't determine the config entry type, treat it as a sensitive config to be safe
     configType.isEmpty || configType.contains(ConfigDef.Type.PASSWORD)
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index c33df2d..4a27b2c 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -27,7 +27,7 @@ import java.{time, util}
 
 import kafka.log.LogConfig
 import kafka.security.authorizer.AclEntry
-import kafka.server.{Defaults, KafkaConfig, KafkaServer}
+import kafka.server.{Defaults, DynamicConfig, KafkaConfig, KafkaServer}
 import kafka.utils.TestUtils._
 import kafka.utils.{Log4jController, TestUtils}
 import kafka.zk.KafkaZkClient
@@ -1727,6 +1727,83 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
   }
 
   @Test
+  def testIncrementalAlterConfigsDeleteAndSetBrokerConfigs(): Unit = {
+    client = Admin.create(createConfig())
+    val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
+    client.incrementalAlterConfigs(Map(broker0Resource ->
+      Seq(new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "123"),
+          AlterConfigOp.OpType.SET),
+        new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "456"),
+          AlterConfigOp.OpType.SET)
+      ).asJavaCollection).asJava).all().get()
+    TestUtils.waitUntilTrue(() => {
+      val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava).
+        all().get().get(broker0Resource).entries().asScala.map {
+        case entry => (entry.name, entry.value)
+      }.toMap
+      ("123".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "")) &&
+        "456".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "")))
+    }, "Expected to see the broker properties we just set", pause=25)
+    client.incrementalAlterConfigs(Map(broker0Resource ->
+      Seq(new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, ""),
+        AlterConfigOp.OpType.DELETE),
+        new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "654"),
+          AlterConfigOp.OpType.SET),
+        new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp, "987"),
+          AlterConfigOp.OpType.SET)
+      ).asJavaCollection).asJava).all().get()
+    TestUtils.waitUntilTrue(() => {
+      val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava).
+        all().get().get(broker0Resource).entries().asScala.map {
+        case entry => (entry.name, entry.value)
+      }.toMap
+      ("".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "")) &&
+        "654".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "")) &&
+        "987".equals(broker0Configs.getOrElse(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp, "")))
+    }, "Expected to see the broker properties we just modified", pause=25)
+  }
+
+  @Test
+  def testIncrementalAlterConfigsDeleteBrokerConfigs(): Unit = {
+    client = Admin.create(createConfig())
+    val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
+    client.incrementalAlterConfigs(Map(broker0Resource ->
+      Seq(new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "123"),
+        AlterConfigOp.OpType.SET),
+        new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "456"),
+          AlterConfigOp.OpType.SET),
+        new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp, "789"),
+          AlterConfigOp.OpType.SET)
+      ).asJavaCollection).asJava).all().get()
+    TestUtils.waitUntilTrue(() => {
+      val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava).
+        all().get().get(broker0Resource).entries().asScala.map {
+        case entry => (entry.name, entry.value)
+      }.toMap
+      ("123".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "")) &&
+        "456".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "")) &&
+        "789".equals(broker0Configs.getOrElse(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp, "")))
+    }, "Expected to see the broker properties we just set", pause=25)
+    client.incrementalAlterConfigs(Map(broker0Resource ->
+      Seq(new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, ""),
+        AlterConfigOp.OpType.DELETE),
+        new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, ""),
+          AlterConfigOp.OpType.DELETE),
+        new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp, ""),
+          AlterConfigOp.OpType.DELETE)
+      ).asJavaCollection).asJava).all().get()
+    TestUtils.waitUntilTrue(() => {
+      val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava).
+        all().get().get(broker0Resource).entries().asScala.map {
+        case entry => (entry.name, entry.value)
+      }.toMap
+      ("".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "")) &&
+        "".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "")) &&
+        "".equals(broker0Configs.getOrElse(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp, "")))
+    }, "Expected to see the broker properties we just removed to be deleted", pause=25)
+  }
+
+  @Test
   def testInvalidIncrementalAlterConfigs(): Unit = {
     client = Admin.create(createConfig)