You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/04/09 09:43:00 UTC
[kafka] branch trunk updated: MINOR: A few code cleanups in DynamicBrokerConfig (#12015)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6e9cd0c7f5 MINOR: A few code cleanups in DynamicBrokerConfig (#12015)
6e9cd0c7f5 is described below
commit 6e9cd0c7f58a24b5ecef9746881ebcccd7633a7e
Author: David Jacot <dj...@confluent.io>
AuthorDate: Sat Apr 9 11:42:42 2022 +0200
MINOR: A few code cleanups in DynamicBrokerConfig (#12015)
Reviewers: Luke Chen <sh...@gmail.com>
---
.../scala/kafka/server/DynamicBrokerConfig.scala | 30 ++++++++++++----------
1 file changed, 17 insertions(+), 13 deletions(-)
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index f726569e84..18eeea6270 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -182,16 +182,18 @@ object DynamicBrokerConfig {
private[server] def dynamicConfigUpdateModes: util.Map[String, String] = {
AllDynamicConfigs.map { name =>
val mode = if (PerBrokerConfigs.contains(name)) "per-broker" else "cluster-wide"
- (name -> mode)
+ name -> mode
}.toMap.asJava
}
private[server] def resolveVariableConfigs(propsOriginal: Properties): Properties = {
val props = new Properties
val config = new AbstractConfig(new ConfigDef(), propsOriginal, false)
- config.originals.asScala.filter(!_._1.startsWith(AbstractConfig.CONFIG_PROVIDERS_CONFIG)).foreach {case (key: String, value: Object) => {
- props.put(key, value)
- }}
+ config.originals.forEach { (key, value) =>
+ if (!key.startsWith(AbstractConfig.CONFIG_PROVIDERS_CONFIG)) {
+ props.put(key, value)
+ }
+ }
props
}
}
@@ -253,7 +255,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
}
addReconfigurable(kafkaServer.kafkaYammerMetrics)
addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))
- addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaServer))
+ addReconfigurable(new DynamicClientQuotaCallback(kafkaServer))
addBrokerReconfigurable(new DynamicThreadPool(kafkaServer))
addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer))
@@ -421,7 +423,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
debug(s"Dynamic password config $configName could not be decoded using old secret, new secret will be used.")
None
}
- decoded.foreach { value => props.put(configName, passwordEncoder.encode(new Password(value))) }
+ decoded.foreach(value => props.put(configName, passwordEncoder.encode(new Password(value))))
}
}
adminZkClient.changeBrokerConfig(Some(kafkaConfig.brokerId), props)
@@ -487,8 +489,8 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
* Returns the change in configurations between the new props and current props by returning a
* map of the changed configs, as well as the set of deleted keys
*/
- private def updatedConfigs(newProps: java.util.Map[String, _], currentProps: java.util.Map[String, _]):
- (mutable.Map[String, _], Set[String]) = {
+ private def updatedConfigs(newProps: java.util.Map[String, _],
+ currentProps: java.util.Map[String, _]): (mutable.Map[String, _], Set[String]) = {
val changeMap = newProps.asScala.filter {
case (k, v) => v != currentProps.get(k)
}
@@ -597,7 +599,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
newCustomConfigs: util.Map[String, Object],
validateOnly: Boolean): Unit = {
val newConfigs = new util.HashMap[String, Object]
- allNewConfigs.forEach { (k, v) => newConfigs.put(k, v.asInstanceOf[AnyRef]) }
+ allNewConfigs.forEach((k, v) => newConfigs.put(k, v.asInstanceOf[AnyRef]))
newConfigs.putAll(newCustomConfigs)
try {
reconfigurable.validateReconfiguration(newConfigs)
@@ -651,7 +653,9 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends Brok
logManager.allLogs.foreach { log =>
val props = mutable.Map.empty[Any, Any]
props ++= newBrokerDefaults
- props ++= log.config.originals.asScala.filter { case (k, _) => log.config.overriddenConfigs.contains(k) }
+ props ++= log.config.originals.asScala.filter { case (k, _) =>
+ log.config.overriddenConfigs.contains(k)
+ }
val logConfig = LogConfig(props.asJava, log.config.overriddenConfigs)
log.updateConfig(logConfig)
@@ -797,8 +801,8 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaBroker) extends Reconf
private def createReporters(reporterClasses: util.List[String],
updatedConfigs: util.Map[String, _]): Unit = {
val props = new util.HashMap[String, AnyRef]
- updatedConfigs.forEach { (k, v) => props.put(k, v.asInstanceOf[AnyRef]) }
- propsOverride.forKeyValue { (k, v) => props.put(k, v) }
+ updatedConfigs.forEach((k, v) => props.put(k, v.asInstanceOf[AnyRef]))
+ propsOverride.forKeyValue((k, v) => props.put(k, v))
val reporters = dynamicConfig.currentKafkaConfig.getConfiguredInstances(reporterClasses, classOf[MetricsReporter], props)
// Call notifyMetricsReporters first to satisfy the contract for MetricsReporter.contextChange,
// which provides that MetricsReporter.contextChange must be called before the first call to MetricsReporter.init.
@@ -871,7 +875,7 @@ object DynamicListenerConfig {
)
}
-class DynamicClientQuotaCallback(brokerId: Int, server: KafkaBroker) extends Reconfigurable {
+class DynamicClientQuotaCallback(server: KafkaBroker) extends Reconfigurable {
override def configure(configs: util.Map[String, _]): Unit = {}