You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/10/28 21:07:55 UTC

[GitHub] [kafka] hachikuji opened a new pull request #11448: KAFKA-13417; Ensure dynamic reconfigurations set old config properly

hachikuji opened a new pull request #11448:
URL: https://github.com/apache/kafka/pull/11448


   This patch fixes a bug in `DynamicBrokerConfig` which causes some configuration changes to be ignored. In particular, the bug is the result of the reference to the old configuration getting indirectly mutated. This causes the first dynamic configuration update to pass effectively the same configuration as both `oldConfig` and `newConfig`. In cases such as in `DynamicThreadPool`, the update is ignored because the old configuration value matches the new configuration value.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #11448: KAFKA-13417; Ensure dynamic reconfigurations set old config properly

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #11448:
URL: https://github.com/apache/kafka/pull/11448#discussion_r745938340



##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -204,16 +204,19 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
   private val reconfigurables = mutable.Buffer[Reconfigurable]()
   private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()
   private val lock = new ReentrantReadWriteLock
-  private var currentConfig = kafkaConfig
+  private var currentConfig: KafkaConfig = null
   private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
 
-  private[server] def initialize(zkClient: KafkaZkClient): Unit = {
+  private[server] def initialize(zkClientOpt: Option[KafkaZkClient]): Unit = {

Review comment:
       In the ZK world, all ZK configs are static configs. We initialize dynamic configs very early on, once we have a ZK client that we create with ZK configs.
   
   For the non-ZK world, I wasn't sure if the initialization required some dynamic configs. For example, we allow SSL keystore passwords to be stored in ZK prior to starting up brokers to avoid specifying any passwords in server.properties. Do we ensure that SocketServer only starts up after dynamic configs are initialized with KRaft?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11448: KAFKA-13417; Ensure dynamic reconfigurations set old config properly

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11448:
URL: https://github.com/apache/kafka/pull/11448#discussion_r745021117



##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -516,7 +516,15 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
     newProps ++= staticBrokerConfigs
     overrideProps(newProps, dynamicDefaultConfigs)
     overrideProps(newProps, dynamicBrokerConfigs)
-    val oldConfig = currentConfig
+
+    // We need a copy of the current config since `currentConfig` is initialized with `kafkaConfig`
+    // which means the call to `updateCurrentConfig` would end up mutating `oldConfig`.
+    val oldConfig = if (kafkaConfig eq currentConfig) {

Review comment:
       I pushed a patch which initializes `currentConfig` as null in order to make the call to `initialize` required. Let me know if that seems like a reasonable approach.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11448: KAFKA-13417; Ensure dynamic reconfigurations set old config properly

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11448:
URL: https://github.com/apache/kafka/pull/11448#discussion_r745944930



##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -204,16 +204,19 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
   private val reconfigurables = mutable.Buffer[Reconfigurable]()
   private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()
   private val lock = new ReentrantReadWriteLock
-  private var currentConfig = kafkaConfig
+  private var currentConfig: KafkaConfig = null
   private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
 
-  private[server] def initialize(zkClient: KafkaZkClient): Unit = {
+  private[server] def initialize(zkClientOpt: Option[KafkaZkClient]): Unit = {

Review comment:
       Colin has an incoming KIP which will cover secret storage. It is a known gap at the moment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11448: KAFKA-13417; Ensure dynamic reconfigurations set old config properly

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11448:
URL: https://github.com/apache/kafka/pull/11448#discussion_r739089348



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1341,7 +1341,7 @@ object KafkaConfig {
     fromProps(props, doLog)
   }
 
-  def apply(props: java.util.Map[_, _]): KafkaConfig = new KafkaConfig(props, true)
+  def apply(props: java.util.Map[_, _], doLog: Boolean = true): KafkaConfig = new KafkaConfig(props, true)

Review comment:
       Did you aim to pass 'doLog' to 'KafkaConfig' as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #11448: KAFKA-13417; Ensure dynamic reconfigurations set old config properly

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #11448:
URL: https://github.com/apache/kafka/pull/11448#discussion_r745962245



##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -204,16 +204,19 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
   private val reconfigurables = mutable.Buffer[Reconfigurable]()
   private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()
   private val lock = new ReentrantReadWriteLock
-  private var currentConfig = kafkaConfig
+  private var currentConfig: KafkaConfig = null
   private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
 
-  private[server] def initialize(zkClient: KafkaZkClient): Unit = {
+  private[server] def initialize(zkClientOpt: Option[KafkaZkClient]): Unit = {

Review comment:
       Sounds good.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #11448: KAFKA-13417; Ensure dynamic reconfigurations set old config properly

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #11448:
URL: https://github.com/apache/kafka/pull/11448


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #11448: KAFKA-13417; Ensure dynamic reconfigurations set old config properly

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #11448:
URL: https://github.com/apache/kafka/pull/11448#discussion_r740091138



##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -516,7 +516,15 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
     newProps ++= staticBrokerConfigs
     overrideProps(newProps, dynamicDefaultConfigs)
     overrideProps(newProps, dynamicBrokerConfigs)
-    val oldConfig = currentConfig
+
+    // We need a copy of the current config since `currentConfig` is initialized with `kafkaConfig`
+    // which means the call to `updateCurrentConfig` would end up mutating `oldConfig`.
+    val oldConfig = if (kafkaConfig eq currentConfig) {

Review comment:
       @hachikuji Thanks for the PR. Do you know if there is an issue only with KRraft or is there an issue with ZK as well? 
   
   With ZK, the sequence is:
   1. Create KafkaConfig with static configs from server.properties
   2. Initialize KafkaConfig with initial configs from ZooKeeper using `DynamicBrokerConfig.initialize(zkClient)`. This always creates a new KafkaConfig, so we don't need this check?
   3. Start DynamicConfigManager. All ZK updates after 2) are handled through change notifications. 
     
   With KRaft, there doesn't seem to be an initialize() for initializing the state from existing dynamic configs, so are all configs handled similar to 3)? In which case, we should perhaps move the initialization of `currentConfig` from `DynamicBrokerConfig.initialize(zkClient)` to somewhere common for KRaft and avoid this check here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #11448: KAFKA-13417; Ensure dynamic reconfigurations set old config properly

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #11448:
URL: https://github.com/apache/kafka/pull/11448#discussion_r745400319



##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -114,7 +114,9 @@ class KafkaServer(
   var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
 
   var logDirFailureChannel: LogDirFailureChannel = null
-  var logManager: LogManager = null
+  var _logManager: LogManager = null

Review comment:
       Do we want to make this private?

##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -204,16 +204,19 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
   private val reconfigurables = mutable.Buffer[Reconfigurable]()
   private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()
   private val lock = new ReentrantReadWriteLock
-  private var currentConfig = kafkaConfig
+  private var currentConfig: KafkaConfig = null
   private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
 
-  private[server] def initialize(zkClient: KafkaZkClient): Unit = {
+  private[server] def initialize(zkClientOpt: Option[KafkaZkClient]): Unit = {

Review comment:
       In the non-ZK world, does the initialization using existing dynamic configs happen later? Or does the original Kafka config contain the existing dynamic configs as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11448: KAFKA-13417; Ensure dynamic reconfigurations set old config properly

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11448:
URL: https://github.com/apache/kafka/pull/11448#discussion_r744968165



##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -516,7 +516,15 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
     newProps ++= staticBrokerConfigs
     overrideProps(newProps, dynamicDefaultConfigs)
     overrideProps(newProps, dynamicBrokerConfigs)
-    val oldConfig = currentConfig
+
+    // We need a copy of the current config since `currentConfig` is initialized with `kafkaConfig`
+    // which means the call to `updateCurrentConfig` would end up mutating `oldConfig`.
+    val oldConfig = if (kafkaConfig eq currentConfig) {

Review comment:
       @rajinisivaram Hmm, good point. It looks like the bug only affects KRaft since there is no call to `initialize`. The code still feels a little slippery though, so maybe there is room for some defensiveness. Let me take a look.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11448: KAFKA-13417; Ensure dynamic reconfigurations set old config properly

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11448:
URL: https://github.com/apache/kafka/pull/11448#discussion_r745839478



##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -204,16 +204,19 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
   private val reconfigurables = mutable.Buffer[Reconfigurable]()
   private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()
   private val lock = new ReentrantReadWriteLock
-  private var currentConfig = kafkaConfig
+  private var currentConfig: KafkaConfig = null
   private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
 
-  private[server] def initialize(zkClient: KafkaZkClient): Unit = {
+  private[server] def initialize(zkClientOpt: Option[KafkaZkClient]): Unit = {

Review comment:
       It looks like dynamic configs get initialized later when we load the metadata log. I think this probably could be improved. Do you see any specific issues?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #11448: KAFKA-13417; Ensure dynamic reconfigurations set old config properly

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #11448:
URL: https://github.com/apache/kafka/pull/11448#issuecomment-954220239


   Note I am planning to add some additional test cases here. We should cover all of the `DynamicThreadPool` configurations and unclean leader election at a minimum.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org