You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/23 12:21:12 UTC

[GitHub] [kafka] rajinisivaram commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

rajinisivaram commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r493520165



##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -331,6 +334,50 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
       }
   }
 
+  private[server] def maybeAugmentSSLStorePaths(configProps: Properties, previousConfigProps: Map[String, String]): Unit ={
+    val processedFiles = new mutable.HashSet[String]
+    reconfigurables
+      .filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
+        .foreach({
+          case reconfigurable: ListenerReconfigurable =>
+            ReloadableFileConfigs.foreach(configName => {
+              val prefixedName = reconfigurable.listenerName.configPrefix + configName
+              if (!processedFiles.contains(prefixedName) && configProps.containsKey(prefixedName) &&
+                configProps.get(prefixedName).equals(previousConfigProps.getOrElse(prefixedName, ""))) {
+                val equivalentFileName = configProps.getProperty(prefixedName).replace("/", "//")
+                configProps.setProperty(prefixedName, equivalentFileName)
+                processedFiles.add(prefixedName)
+              }
+            })
+        })
+  }
+
+  private[server] def trimSSLStorePaths(configProps: Properties): Boolean = {
+    var fileChanged = false
+    val processedFiles = new mutable.HashSet[String]
+
+    reconfigurables
+      .filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
+      .foreach {
+        case reconfigurable: ListenerReconfigurable =>
+        ReloadableFileConfigs.foreach(configName => {
+          val prefixedName = reconfigurable.listenerName.configPrefix + configName
+          if (!processedFiles.contains(prefixedName) && configProps.containsKey(prefixedName)) {
+            val configFileName = configProps.getProperty(prefixedName)
+            val equivalentFileName = configFileName.replace("//", "/")
+            if (!configFileName.equals(equivalentFileName)) {
+              fileChanged = true

Review comment:
       This means update was requested, but not necessarily that file has changed?

##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -331,6 +334,50 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
       }
   }
 
+  private[server] def maybeAugmentSSLStorePaths(configProps: Properties, previousConfigProps: Map[String, String]): Unit ={

Review comment:
       nit: `SSL` => `Ssl`

##########
File path: core/src/main/scala/kafka/server/ConfigHandler.scala
##########
@@ -203,7 +203,13 @@ class BrokerConfigHandler(private val brokerConfig: KafkaConfig,
     if (brokerId == ConfigEntityName.Default)
       brokerConfig.dynamicConfig.updateDefaultConfig(properties)
     else if (brokerConfig.brokerId == brokerId.trim.toInt) {
-      brokerConfig.dynamicConfig.updateBrokerConfig(brokerConfig.brokerId, properties)
+      val persistentProps = brokerConfig.dynamicConfig.fromPersistentProps(properties, perBrokerConfig = true)
+      // The filepath was changed for equivalent replacement, which means we should reload
+      if (brokerConfig.dynamicConfig.trimSSLStorePaths(persistentProps)) {
+        brokerConfig.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(persistentProps)
+      }

Review comment:
       Can't we put this logic in `DynamicBrokerConfig`?

##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -331,6 +334,50 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
       }
   }
 
+  private[server] def maybeAugmentSSLStorePaths(configProps: Properties, previousConfigProps: Map[String, String]): Unit ={
+    val processedFiles = new mutable.HashSet[String]
+    reconfigurables
+      .filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
+        .foreach({
+          case reconfigurable: ListenerReconfigurable =>
+            ReloadableFileConfigs.foreach(configName => {
+              val prefixedName = reconfigurable.listenerName.configPrefix + configName
+              if (!processedFiles.contains(prefixedName) && configProps.containsKey(prefixedName) &&
+                configProps.get(prefixedName).equals(previousConfigProps.getOrElse(prefixedName, ""))) {
+                val equivalentFileName = configProps.getProperty(prefixedName).replace("/", "//")
+                configProps.setProperty(prefixedName, equivalentFileName)
+                processedFiles.add(prefixedName)
+              }
+            })
+        })
+  }
+
+  private[server] def trimSSLStorePaths(configProps: Properties): Boolean = {

Review comment:
       `SSL` => `Ssl`

##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -331,6 +334,50 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
       }
   }
 
+  private[server] def maybeAugmentSSLStorePaths(configProps: Properties, previousConfigProps: Map[String, String]): Unit ={
+    val processedFiles = new mutable.HashSet[String]
+    reconfigurables
+      .filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
+        .foreach({
+          case reconfigurable: ListenerReconfigurable =>
+            ReloadableFileConfigs.foreach(configName => {
+              val prefixedName = reconfigurable.listenerName.configPrefix + configName
+              if (!processedFiles.contains(prefixedName) && configProps.containsKey(prefixedName) &&
+                configProps.get(prefixedName).equals(previousConfigProps.getOrElse(prefixedName, ""))) {
+                val equivalentFileName = configProps.getProperty(prefixedName).replace("/", "//")
+                configProps.setProperty(prefixedName, equivalentFileName)
+                processedFiles.add(prefixedName)
+              }
+            })
+        })
+  }
+
+  private[server] def trimSSLStorePaths(configProps: Properties): Boolean = {
+    var fileChanged = false
+    val processedFiles = new mutable.HashSet[String]
+
+    reconfigurables
+      .filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
+      .foreach {
+        case reconfigurable: ListenerReconfigurable =>
+        ReloadableFileConfigs.foreach(configName => {
+          val prefixedName = reconfigurable.listenerName.configPrefix + configName
+          if (!processedFiles.contains(prefixedName) && configProps.containsKey(prefixedName)) {
+            val configFileName = configProps.getProperty(prefixedName)
+            val equivalentFileName = configFileName.replace("//", "/")
+            if (!configFileName.equals(equivalentFileName)) {
+              fileChanged = true

Review comment:
       This means update was requested, but not necessarily that file has changed?

##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -331,6 +334,50 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
       }
   }
 
+  private[server] def maybeAugmentSSLStorePaths(configProps: Properties, previousConfigProps: Map[String, String]): Unit ={
+    val processedFiles = new mutable.HashSet[String]
+    reconfigurables
+      .filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
+        .foreach({
+          case reconfigurable: ListenerReconfigurable =>
+            ReloadableFileConfigs.foreach(configName => {
+              val prefixedName = reconfigurable.listenerName.configPrefix + configName
+              if (!processedFiles.contains(prefixedName) && configProps.containsKey(prefixedName) &&
+                configProps.get(prefixedName).equals(previousConfigProps.getOrElse(prefixedName, ""))) {
+                val equivalentFileName = configProps.getProperty(prefixedName).replace("/", "//")

Review comment:
       Does this get reset somewhere or will we keep adding `/`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org