You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/04/09 09:43:00 UTC

[kafka] branch trunk updated: MINOR: A few code cleanups in DynamicBrokerConfig (#12015)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6e9cd0c7f5 MINOR: A few code cleanups in DynamicBrokerConfig (#12015)
6e9cd0c7f5 is described below

commit 6e9cd0c7f58a24b5ecef9746881ebcccd7633a7e
Author: David Jacot <dj...@confluent.io>
AuthorDate: Sat Apr 9 11:42:42 2022 +0200

    MINOR: A few code cleanups in DynamicBrokerConfig (#12015)
    
    Reviewers: Luke Chen <sh...@gmail.com>
---
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 30 ++++++++++++----------
 1 file changed, 17 insertions(+), 13 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index f726569e84..18eeea6270 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -182,16 +182,18 @@ object DynamicBrokerConfig {
   private[server] def dynamicConfigUpdateModes: util.Map[String, String] = {
     AllDynamicConfigs.map { name =>
       val mode = if (PerBrokerConfigs.contains(name)) "per-broker" else "cluster-wide"
-      (name -> mode)
+      name -> mode
     }.toMap.asJava
   }
 
   private[server] def resolveVariableConfigs(propsOriginal: Properties): Properties = {
     val props = new Properties
     val config = new AbstractConfig(new ConfigDef(), propsOriginal, false)
-    config.originals.asScala.filter(!_._1.startsWith(AbstractConfig.CONFIG_PROVIDERS_CONFIG)).foreach {case (key: String, value: Object) => {
-      props.put(key, value)
-    }}
+    config.originals.forEach { (key, value) =>
+      if (!key.startsWith(AbstractConfig.CONFIG_PROVIDERS_CONFIG)) {
+        props.put(key, value)
+      }
+    }
     props
   }
 }
@@ -253,7 +255,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
     }
     addReconfigurable(kafkaServer.kafkaYammerMetrics)
     addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))
-    addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaServer))
+    addReconfigurable(new DynamicClientQuotaCallback(kafkaServer))
 
     addBrokerReconfigurable(new DynamicThreadPool(kafkaServer))
     addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer))
@@ -421,7 +423,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
                 debug(s"Dynamic password config $configName could not be decoded using old secret, new secret will be used.")
                 None
             }
-            decoded.foreach { value => props.put(configName, passwordEncoder.encode(new Password(value))) }
+            decoded.foreach(value => props.put(configName, passwordEncoder.encode(new Password(value))))
           }
         }
         adminZkClient.changeBrokerConfig(Some(kafkaConfig.brokerId), props)
@@ -487,8 +489,8 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
    * Returns the change in configurations between the new props and current props by returning a
    * map of the changed configs, as well as the set of deleted keys
    */
-  private def updatedConfigs(newProps: java.util.Map[String, _], currentProps: java.util.Map[String, _]):
-  (mutable.Map[String, _], Set[String]) = {
+  private def updatedConfigs(newProps: java.util.Map[String, _],
+                             currentProps: java.util.Map[String, _]): (mutable.Map[String, _], Set[String]) = {
     val changeMap = newProps.asScala.filter {
       case (k, v) => v != currentProps.get(k)
     }
@@ -597,7 +599,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
                                     newCustomConfigs: util.Map[String, Object],
                                     validateOnly: Boolean): Unit = {
     val newConfigs = new util.HashMap[String, Object]
-    allNewConfigs.forEach { (k, v) => newConfigs.put(k, v.asInstanceOf[AnyRef]) }
+    allNewConfigs.forEach((k, v) => newConfigs.put(k, v.asInstanceOf[AnyRef]))
     newConfigs.putAll(newCustomConfigs)
     try {
       reconfigurable.validateReconfiguration(newConfigs)
@@ -651,7 +653,9 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends Brok
     logManager.allLogs.foreach { log =>
       val props = mutable.Map.empty[Any, Any]
       props ++= newBrokerDefaults
-      props ++= log.config.originals.asScala.filter { case (k, _) => log.config.overriddenConfigs.contains(k) }
+      props ++= log.config.originals.asScala.filter { case (k, _) =>
+        log.config.overriddenConfigs.contains(k)
+      }
 
       val logConfig = LogConfig(props.asJava, log.config.overriddenConfigs)
       log.updateConfig(logConfig)
@@ -797,8 +801,8 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaBroker) extends Reconf
   private def createReporters(reporterClasses: util.List[String],
                               updatedConfigs: util.Map[String, _]): Unit = {
     val props = new util.HashMap[String, AnyRef]
-    updatedConfigs.forEach { (k, v) => props.put(k, v.asInstanceOf[AnyRef]) }
-    propsOverride.forKeyValue { (k, v) => props.put(k, v) }
+    updatedConfigs.forEach((k, v) => props.put(k, v.asInstanceOf[AnyRef]))
+    propsOverride.forKeyValue((k, v) => props.put(k, v))
     val reporters = dynamicConfig.currentKafkaConfig.getConfiguredInstances(reporterClasses, classOf[MetricsReporter], props)
     // Call notifyMetricsReporters first to satisfy the contract for MetricsReporter.contextChange,
     // which provides that MetricsReporter.contextChange must be called before the first call to MetricsReporter.init.
@@ -871,7 +875,7 @@ object DynamicListenerConfig {
   )
 }
 
-class DynamicClientQuotaCallback(brokerId: Int, server: KafkaBroker) extends Reconfigurable {
+class DynamicClientQuotaCallback(server: KafkaBroker) extends Reconfigurable {
 
   override def configure(configs: util.Map[String, _]): Unit = {}