You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/03/24 03:55:54 UTC
[kafka] branch trunk updated: KAFKA-13672: Race condition in DynamicBrokerConfig (#11920)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e8f0900 KAFKA-13672: Race condition in DynamicBrokerConfig (#11920)
e8f0900 is described below
commit e8f09007e4259d8a6adcba162b0385b66783a7f8
Author: Liam Clarke-Hutchinson <li...@steelsky.co.nz>
AuthorDate: Thu Mar 24 16:54:05 2022 +1300
KAFKA-13672: Race condition in DynamicBrokerConfig (#11920)
Reviewers: David Jacot <dj...@confluent.io>, Luke Chen <sh...@gmail.com>
---
.../scala/kafka/server/DynamicBrokerConfig.scala | 22 +++++++++++++---------
.../server/DynamicBrokerReconfigurationTest.scala | 1 -
2 files changed, 13 insertions(+), 10 deletions(-)
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index cb6cd84..2a4fd95 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -19,6 +19,7 @@ package kafka.server
import java.util
import java.util.{Collections, Properties}
+import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.cluster.EndPoint
import kafka.log.{LogCleaner, LogConfig, LogManager}
@@ -201,8 +202,11 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
private[server] val staticDefaultConfigs = ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala
private val dynamicBrokerConfigs = mutable.Map[String, String]()
private val dynamicDefaultConfigs = mutable.Map[String, String]()
- private val reconfigurables = mutable.Buffer[Reconfigurable]()
- private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()
+
+ // Use COWArrayList to prevent concurrent modification exception when an item is added by one thread to these
+ // collections, while another thread is iterating over them.
+ private val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]()
+ private val brokerReconfigurables = new CopyOnWriteArrayList[BrokerReconfigurable]()
private val lock = new ReentrantReadWriteLock
private var currentConfig: KafkaConfig = null
private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
@@ -259,16 +263,16 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) {
verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala)
- reconfigurables += reconfigurable
+ reconfigurables.add(reconfigurable)
}
def addBrokerReconfigurable(reconfigurable: BrokerReconfigurable): Unit = CoreUtils.inWriteLock(lock) {
verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs)
- brokerReconfigurables += reconfigurable
+ brokerReconfigurables.add(reconfigurable)
}
def removeReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) {
- reconfigurables -= reconfigurable
+ reconfigurables.remove(reconfigurable)
}
private def verifyReconfigurableConfigs(configNames: Set[String]): Unit = CoreUtils.inWriteLock(lock) {
@@ -320,7 +324,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
* changes are processed. At the moment, only listener configs are considered for reloading.
*/
private[server] def reloadUpdatedFilesWithoutConfigChange(newProps: Properties): Unit = CoreUtils.inWriteLock(lock) {
- reconfigurables
+ reconfigurables.asScala
.filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
.foreach {
case reconfigurable: ListenerReconfigurable =>
@@ -535,8 +539,8 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
if (changeMap.nonEmpty || deletedKeySet.nonEmpty) {
try {
val customConfigs = new util.HashMap[String, Object](newConfig.originalsFromThisConfig) // non-Kafka configs
- newConfig.valuesFromThisConfig.keySet.forEach(customConfigs.remove(_))
- reconfigurables.foreach {
+ newConfig.valuesFromThisConfig.keySet.forEach(k => customConfigs.remove(k))
+ reconfigurables.forEach {
case listenerReconfigurable: ListenerReconfigurable =>
processListenerReconfigurable(listenerReconfigurable, newConfig, customConfigs, validateOnly, reloadOnly = false)
case reconfigurable =>
@@ -546,7 +550,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
// BrokerReconfigurable updates are processed after config is updated. Only do the validation here.
val brokerReconfigurablesToUpdate = mutable.Buffer[BrokerReconfigurable]()
- brokerReconfigurables.foreach { reconfigurable =>
+ brokerReconfigurables.forEach { reconfigurable =>
if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, changeMap.keySet, deletedKeySet)) {
reconfigurable.validateReconfiguration(newConfig)
if (!validateOnly)
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 1d64106..dee4f01 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -728,7 +728,6 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
}
@Test
- @Disabled // TODO: To be re-enabled once we can make it less flaky (KAFKA-13672)
def testThreadPoolResize(): Unit = {
val requestHandlerPrefix = "data-plane-kafka-request-handler-"
val networkThreadPrefix = "data-plane-kafka-network-thread-"