You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2020/05/05 21:19:16 UTC

[kafka] 02/02: KAFKA-9625: Fix altering and describing dynamic broker configurations (#8260)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit e5b5cbea70f5e02767c6df84f4cc7fbfb9288db2
Author: Sanjana Kaundinya <sk...@gmail.com>
AuthorDate: Tue Mar 17 23:02:33 2020 -0700

    KAFKA-9625: Fix altering and describing dynamic broker configurations (#8260)
    
    * Broker throttles were incorrectly marked as sensitive configurations.  Fix this, so that their values can be returned via DescribeConfigs as expected.
    
    * Previously, changes to broker configs that consisted only of deletions were ignored by the brokers because of faulty delta calculation logic that didn't consider deletions as changes, only alterations as changes.  Fix this and add a regression test.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
    (cherry picked from commit 5fc3cd61fcb73da8b52f34b72fe6bb7457f46ce2)
---
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 33 ++++++---
 .../main/scala/kafka/server/DynamicConfig.scala    |  2 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala | 22 +++++-
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 79 +++++++++++++++++++++-
 4 files changed, 121 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 92aa048..2f47c21 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -468,10 +468,19 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
       reconfigurable.reconfigure(newConfig)
   }
 
-  private def updatedConfigs(newProps: java.util.Map[String, _], currentProps: java.util.Map[_, _]): mutable.Map[String, _] = {
-    newProps.asScala.filter {
+  /**
+   * Returns the change in configurations between the new props and current props by returning a
+   * map of the changed configs, as well as the set of deleted keys
+   */
+  private def updatedConfigs(newProps: java.util.Map[String, _], currentProps: java.util.Map[String, _]):
+  (mutable.Map[String, _], Set[String]) = {
+    val changeMap = newProps.asScala.filter {
       case (k, v) => v != currentProps.get(k)
     }
+    val deletedKeySet = currentProps.asScala.filter {
+      case (k, _) => !newProps.containsKey(k)
+    }.keySet
+    (changeMap, deletedKeySet)
   }
 
   /**
@@ -510,8 +519,8 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
 
   private def processReconfiguration(newProps: Map[String, String], validateOnly: Boolean): (KafkaConfig, List[BrokerReconfigurable]) = {
     val newConfig = new KafkaConfig(newProps.asJava, !validateOnly, None)
-    val updatedMap = updatedConfigs(newConfig.originalsFromThisConfig, currentConfig.originals)
-    if (updatedMap.nonEmpty) {
+    val (changeMap, deletedKeySet) = updatedConfigs(newConfig.originalsFromThisConfig, currentConfig.originals)
+    if (changeMap.nonEmpty || deletedKeySet.nonEmpty) {
       try {
         val customConfigs = new util.HashMap[String, Object](newConfig.originalsFromThisConfig) // non-Kafka configs
         newConfig.valuesFromThisConfig.keySet.asScala.foreach(customConfigs.remove)
@@ -519,14 +528,14 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
           case listenerReconfigurable: ListenerReconfigurable =>
             processListenerReconfigurable(listenerReconfigurable, newConfig, customConfigs, validateOnly, reloadOnly = false)
           case reconfigurable =>
-            if (needsReconfiguration(reconfigurable.reconfigurableConfigs, updatedMap.keySet))
-              processReconfigurable(reconfigurable, updatedMap.keySet, newConfig.valuesFromThisConfig, customConfigs, validateOnly)
+            if (needsReconfiguration(reconfigurable.reconfigurableConfigs, changeMap.keySet, deletedKeySet))
+              processReconfigurable(reconfigurable, changeMap.keySet, newConfig.valuesFromThisConfig, customConfigs, validateOnly)
         }
 
         // BrokerReconfigurable updates are processed after config is updated. Only do the validation here.
         val brokerReconfigurablesToUpdate = mutable.Buffer[BrokerReconfigurable]()
         brokerReconfigurables.foreach { reconfigurable =>
-          if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, updatedMap.keySet)) {
+          if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, changeMap.keySet, deletedKeySet)) {
             reconfigurable.validateReconfiguration(newConfig)
             if (!validateOnly)
               brokerReconfigurablesToUpdate += reconfigurable
@@ -544,8 +553,9 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
       (currentConfig, List.empty)
   }
 
-  private def needsReconfiguration(reconfigurableConfigs: util.Set[String], updatedKeys: Set[String]): Boolean = {
-    reconfigurableConfigs.asScala.intersect(updatedKeys).nonEmpty
+  private def needsReconfiguration(reconfigurableConfigs: util.Set[String], updatedKeys: Set[String], deletedKeys: Set[String]): Boolean = {
+    reconfigurableConfigs.asScala.intersect(updatedKeys).nonEmpty ||
+      reconfigurableConfigs.asScala.intersect(deletedKeys).nonEmpty
   }
 
   private def processListenerReconfigurable(listenerReconfigurable: ListenerReconfigurable,
@@ -556,8 +566,9 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
     val listenerName = listenerReconfigurable.listenerName
     val oldValues = currentConfig.valuesWithPrefixOverride(listenerName.configPrefix)
     val newValues = newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
-    val updatedKeys = updatedConfigs(newValues, oldValues).keySet
-    val configsChanged = needsReconfiguration(listenerReconfigurable.reconfigurableConfigs, updatedKeys)
+    val (changeMap, deletedKeys) = updatedConfigs(newValues, oldValues)
+    val updatedKeys = changeMap.keySet
+    val configsChanged = needsReconfiguration(listenerReconfigurable.reconfigurableConfigs, updatedKeys, deletedKeys)
     // if `reloadOnly`, reconfigure if configs haven't changed. Otherwise reconfigure if configs have changed
     if (reloadOnly != configsChanged)
       processReconfigurable(listenerReconfigurable, updatedKeys, newValues, customConfigs, validateOnly)
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala
index e5974d3..b70579e 100644
--- a/core/src/main/scala/kafka/server/DynamicConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfig.scala
@@ -54,7 +54,7 @@ object DynamicConfig {
       s"This property can be only set dynamically. It is suggested that the limit be kept above 1MB/s for accurate behaviour."
 
     //Definitions
-    private val brokerConfigDef = new ConfigDef()
+    val brokerConfigDef = new ConfigDef()
       //round minimum value down, to make it easier for users.
       .define(LeaderReplicationThrottledRateProp, LONG, DefaultReplicationThrottledRate, atLeast(0), MEDIUM, LeaderReplicationThrottledRateDoc)
       .define(FollowerReplicationThrottledRateProp, LONG, DefaultReplicationThrottledRate, atLeast(0), MEDIUM, FollowerReplicationThrottledRateDoc)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index dc5c3c7..fe0f8df 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1229,9 +1229,13 @@ object KafkaConfig {
 
   def apply(props: java.util.Map[_, _]): KafkaConfig = new KafkaConfig(props, true)
 
-  def configType(configName: String): Option[ConfigDef.Type] = {
-    def typeOf(name: String): Option[ConfigDef.Type] = Option(configDef.configKeys.get(name)).map(_.`type`)
+  private def typeOf(name: String): Option[ConfigDef.Type] = Option(configDef.configKeys.get(name)).map(_.`type`)
 
+  def configType(configName: String): Option[ConfigDef.Type] = {
+    val configType = configTypeExact(configName)
+    if (configType.isDefined) {
+      return configType
+    }
     typeOf(configName) match {
       case Some(t) => Some(t)
       case None =>
@@ -1239,6 +1243,20 @@ object KafkaConfig {
     }
   }
 
+  private def configTypeExact(exactName: String): Option[ConfigDef.Type] = {
+    val configType = typeOf(exactName).orNull
+    if (configType != null) {
+      Some(configType)
+    } else {
+      val configKey = DynamicConfig.Broker.brokerConfigDef.configKeys().get(exactName)
+      if (configKey != null) {
+        Some(configKey.`type`)
+      } else {
+        None
+      }
+    }
+  }
+
   def maybeSensitive(configType: Option[ConfigDef.Type]): Boolean = {
     // If we can't determine the config entry type, treat it as a sensitive config to be safe
     configType.isEmpty || configType.contains(ConfigDef.Type.PASSWORD)
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index c33df2d..4a27b2c 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -27,7 +27,7 @@ import java.{time, util}
 
 import kafka.log.LogConfig
 import kafka.security.authorizer.AclEntry
-import kafka.server.{Defaults, KafkaConfig, KafkaServer}
+import kafka.server.{Defaults, DynamicConfig, KafkaConfig, KafkaServer}
 import kafka.utils.TestUtils._
 import kafka.utils.{Log4jController, TestUtils}
 import kafka.zk.KafkaZkClient
@@ -1727,6 +1727,83 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
   }
 
   @Test
+  def testIncrementalAlterConfigsDeleteAndSetBrokerConfigs(): Unit = {
+    client = Admin.create(createConfig())
+    val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
+    client.incrementalAlterConfigs(Map(broker0Resource ->
+      Seq(new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "123"),
+          AlterConfigOp.OpType.SET),
+        new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "456"),
+          AlterConfigOp.OpType.SET)
+      ).asJavaCollection).asJava).all().get()
+    TestUtils.waitUntilTrue(() => {
+      val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava).
+        all().get().get(broker0Resource).entries().asScala.map {
+        case entry => (entry.name, entry.value)
+      }.toMap
+      ("123".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "")) &&
+        "456".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "")))
+    }, "Expected to see the broker properties we just set", pause=25)
+    client.incrementalAlterConfigs(Map(broker0Resource ->
+      Seq(new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, ""),
+        AlterConfigOp.OpType.DELETE),
+        new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "654"),
+          AlterConfigOp.OpType.SET),
+        new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp, "987"),
+          AlterConfigOp.OpType.SET)
+      ).asJavaCollection).asJava).all().get()
+    TestUtils.waitUntilTrue(() => {
+      val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava).
+        all().get().get(broker0Resource).entries().asScala.map {
+        case entry => (entry.name, entry.value)
+      }.toMap
+      ("".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "")) &&
+        "654".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "")) &&
+        "987".equals(broker0Configs.getOrElse(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp, "")))
+    }, "Expected to see the broker properties we just modified", pause=25)
+  }
+
+  @Test
+  def testIncrementalAlterConfigsDeleteBrokerConfigs(): Unit = {
+    client = Admin.create(createConfig())
+    val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0")
+    client.incrementalAlterConfigs(Map(broker0Resource ->
+      Seq(new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "123"),
+        AlterConfigOp.OpType.SET),
+        new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "456"),
+          AlterConfigOp.OpType.SET),
+        new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp, "789"),
+          AlterConfigOp.OpType.SET)
+      ).asJavaCollection).asJava).all().get()
+    TestUtils.waitUntilTrue(() => {
+      val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava).
+        all().get().get(broker0Resource).entries().asScala.map {
+        case entry => (entry.name, entry.value)
+      }.toMap
+      ("123".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "")) &&
+        "456".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "")) &&
+        "789".equals(broker0Configs.getOrElse(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp, "")))
+    }, "Expected to see the broker properties we just set", pause=25)
+    client.incrementalAlterConfigs(Map(broker0Resource ->
+      Seq(new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, ""),
+        AlterConfigOp.OpType.DELETE),
+        new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, ""),
+          AlterConfigOp.OpType.DELETE),
+        new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp, ""),
+          AlterConfigOp.OpType.DELETE)
+      ).asJavaCollection).asJava).all().get()
+    TestUtils.waitUntilTrue(() => {
+      val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava).
+        all().get().get(broker0Resource).entries().asScala.map {
+        case entry => (entry.name, entry.value)
+      }.toMap
+      ("".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "")) &&
+        "".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "")) &&
+        "".equals(broker0Configs.getOrElse(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp, "")))
+    }, "Expected to see the broker properties we just removed to be deleted", pause=25)
+  }
+
+  @Test
   def testInvalidIncrementalAlterConfigs(): Unit = {
     client = Admin.create(createConfig)