You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/03/24 03:55:54 UTC

[kafka] branch trunk updated: KAFKA-13672: Race condition in DynamicBrokerConfig (#11920)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e8f0900  KAFKA-13672: Race condition in DynamicBrokerConfig (#11920)
e8f0900 is described below

commit e8f09007e4259d8a6adcba162b0385b66783a7f8
Author: Liam Clarke-Hutchinson <li...@steelsky.co.nz>
AuthorDate: Thu Mar 24 16:54:05 2022 +1300

    KAFKA-13672: Race condition in DynamicBrokerConfig (#11920)
    
    Reviewers: David Jacot <dj...@confluent.io>, Luke Chen <sh...@gmail.com>
---
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 22 +++++++++++++---------
 .../server/DynamicBrokerReconfigurationTest.scala  |  1 -
 2 files changed, 13 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index cb6cd84..2a4fd95 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -19,6 +19,7 @@ package kafka.server
 
 import java.util
 import java.util.{Collections, Properties}
+import java.util.concurrent.CopyOnWriteArrayList
 import java.util.concurrent.locks.ReentrantReadWriteLock
 import kafka.cluster.EndPoint
 import kafka.log.{LogCleaner, LogConfig, LogManager}
@@ -201,8 +202,11 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
   private[server] val staticDefaultConfigs = ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala
   private val dynamicBrokerConfigs = mutable.Map[String, String]()
   private val dynamicDefaultConfigs = mutable.Map[String, String]()
-  private val reconfigurables = mutable.Buffer[Reconfigurable]()
-  private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()
+
+  // Use COWArrayList to prevent concurrent modification exception when an item is added by one thread to these
+  // collections, while another thread is iterating over them.
+  private val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]()
+  private val brokerReconfigurables = new CopyOnWriteArrayList[BrokerReconfigurable]()
   private val lock = new ReentrantReadWriteLock
   private var currentConfig: KafkaConfig = null
   private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
@@ -259,16 +263,16 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
 
   def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) {
     verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala)
-    reconfigurables += reconfigurable
+    reconfigurables.add(reconfigurable)
   }
 
   def addBrokerReconfigurable(reconfigurable: BrokerReconfigurable): Unit = CoreUtils.inWriteLock(lock) {
     verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs)
-    brokerReconfigurables += reconfigurable
+    brokerReconfigurables.add(reconfigurable)
   }
 
   def removeReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) {
-    reconfigurables -= reconfigurable
+    reconfigurables.remove(reconfigurable)
   }
 
   private def verifyReconfigurableConfigs(configNames: Set[String]): Unit = CoreUtils.inWriteLock(lock) {
@@ -320,7 +324,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
    * changes are processed. At the moment, only listener configs are considered for reloading.
    */
   private[server] def reloadUpdatedFilesWithoutConfigChange(newProps: Properties): Unit = CoreUtils.inWriteLock(lock) {
-    reconfigurables
+    reconfigurables.asScala
       .filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
       .foreach {
         case reconfigurable: ListenerReconfigurable =>
@@ -535,8 +539,8 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
     if (changeMap.nonEmpty || deletedKeySet.nonEmpty) {
       try {
         val customConfigs = new util.HashMap[String, Object](newConfig.originalsFromThisConfig) // non-Kafka configs
-        newConfig.valuesFromThisConfig.keySet.forEach(customConfigs.remove(_))
-        reconfigurables.foreach {
+        newConfig.valuesFromThisConfig.keySet.forEach(k => customConfigs.remove(k))
+        reconfigurables.forEach {
           case listenerReconfigurable: ListenerReconfigurable =>
             processListenerReconfigurable(listenerReconfigurable, newConfig, customConfigs, validateOnly, reloadOnly = false)
           case reconfigurable =>
@@ -546,7 +550,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
 
         // BrokerReconfigurable updates are processed after config is updated. Only do the validation here.
         val brokerReconfigurablesToUpdate = mutable.Buffer[BrokerReconfigurable]()
-        brokerReconfigurables.foreach { reconfigurable =>
+        brokerReconfigurables.forEach { reconfigurable =>
           if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, changeMap.keySet, deletedKeySet)) {
             reconfigurable.validateReconfiguration(newConfig)
             if (!validateOnly)
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 1d64106..dee4f01 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -728,7 +728,6 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
   }
 
   @Test
-  @Disabled // TODO: To be re-enabled once we can make it less flaky (KAFKA-13672)
   def testThreadPoolResize(): Unit = {
     val requestHandlerPrefix = "data-plane-kafka-request-handler-"
     val networkThreadPrefix = "data-plane-kafka-network-thread-"