You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/11/09 21:44:44 UTC
[kafka] branch trunk updated: KAFKA-13417;
Ensure dynamic reconfigurations set old config properly (#11448)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 78a5e92 KAFKA-13417; Ensure dynamic reconfigurations set old config properly (#11448)
78a5e92 is described below
commit 78a5e921d4566f2e2d56714834e3c5dd6ad17125
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Nov 9 13:42:54 2021 -0800
KAFKA-13417; Ensure dynamic reconfigurations set old config properly (#11448)
This patch fixes a bug in `DynamicBrokerConfig` which causes some configuration changes to be ignored. In particular, the bug is the result of the reference to the old configuration getting indirectly mutated prior to the call to `BrokerReconfigurable.reconfigure`. This causes the first dynamic configuration update to pass effectively the same configuration as both `oldConfig` and `newConfig`. In cases such as in `DynamicThreadPool`, the update is ignored because the old configuration [...]
This bug only affects KRaft. It is protected in the zk broker by the call to `DynamicBrokerConfig.initialize()`, which overwrites the stored reference to the original configuration. The patch fixes the problem by ensuring that `initialize()` is also invoked in KRaft when `BrokerServer` starts up.
Reviewers: David Jacot <dj...@confluent.io>, Rajini Sivaram <ra...@googlemail.com>
---
.../src/main/scala/kafka/server/BrokerServer.scala | 2 +
.../scala/kafka/server/DynamicBrokerConfig.scala | 23 ++--
core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +-
core/src/main/scala/kafka/server/KafkaServer.scala | 18 +--
.../main/scala/kafka/server/ReplicaManager.scala | 4 +
.../kafka/server/DynamicBrokerConfigTest.scala | 136 +++++++++++++++++++--
6 files changed, 154 insertions(+), 31 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index ad549ff..839ee17 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -181,6 +181,8 @@ class BrokerServer(
try {
info("Starting broker")
+ config.dynamicConfig.initialize(zkClientOpt = None)
+
lifecycleManager = new BrokerLifecycleManager(config, time, threadNamePrefix)
/* start scheduler */
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 9bbf49d..e4d32fc 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -204,16 +204,19 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
private val reconfigurables = mutable.Buffer[Reconfigurable]()
private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()
private val lock = new ReentrantReadWriteLock
- private var currentConfig = kafkaConfig
+ private var currentConfig: KafkaConfig = null
private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
- private[server] def initialize(zkClient: KafkaZkClient): Unit = {
+ private[server] def initialize(zkClientOpt: Option[KafkaZkClient]): Unit = {
currentConfig = new KafkaConfig(kafkaConfig.props, false, None)
- val adminZkClient = new AdminZkClient(zkClient)
- updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default))
- val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, kafkaConfig.brokerId.toString)
- val brokerConfig = maybeReEncodePasswords(props, adminZkClient)
- updateBrokerConfig(kafkaConfig.brokerId, brokerConfig)
+
+ zkClientOpt.foreach { zkClient =>
+ val adminZkClient = new AdminZkClient(zkClient)
+ updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default))
+ val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, kafkaConfig.brokerId.toString)
+ val brokerConfig = maybeReEncodePasswords(props, adminZkClient)
+ updateBrokerConfig(kafkaConfig.brokerId, brokerConfig)
+ }
}
/**
@@ -516,6 +519,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
newProps ++= staticBrokerConfigs
overrideProps(newProps, dynamicDefaultConfigs)
overrideProps(newProps, dynamicBrokerConfigs)
+
val oldConfig = currentConfig
val (newConfig, brokerReconfigurablesToUpdate) = processReconfiguration(newProps, validateOnly = false)
if (newConfig ne currentConfig) {
@@ -719,7 +723,7 @@ class DynamicThreadPool(server: KafkaBroker) extends BrokerReconfigurable {
if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads)
server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, newConfig.numNetworkThreads)
if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers)
- server.replicaManager.replicaFetcherManager.resizeThreadPool(newConfig.numReplicaFetchers)
+ server.replicaManager.resizeFetcherThreadPool(newConfig.numReplicaFetchers)
if (newConfig.numRecoveryThreadsPerDataDir != oldConfig.numRecoveryThreadsPerDataDir)
server.logManager.resizeRecoveryThreadPool(newConfig.numRecoveryThreadsPerDataDir)
if (newConfig.backgroundThreads != oldConfig.backgroundThreads)
@@ -955,6 +959,3 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
listeners.map(e => (e.listenerName, e)).toMap
}
-
-
-
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 17da10e..531bdb8 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1392,7 +1392,7 @@ object KafkaConfig {
fromProps(props, doLog)
}
- def apply(props: java.util.Map[_, _]): KafkaConfig = new KafkaConfig(props, true)
+ def apply(props: java.util.Map[_, _], doLog: Boolean = true): KafkaConfig = new KafkaConfig(props, doLog)
private def typeOf(name: String): Option[ConfigDef.Type] = Option(configDef.configKeys.get(name)).map(_.`type`)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 88ccb6d..9f5674e 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -114,9 +114,9 @@ class KafkaServer(
var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
var logDirFailureChannel: LogDirFailureChannel = null
- var logManager: LogManager = null
+ @volatile private var _logManager: LogManager = null
- @volatile private[this] var _replicaManager: ReplicaManager = null
+ @volatile private var _replicaManager: ReplicaManager = null
var adminManager: ZkAdminManager = null
var tokenManager: DelegationTokenManager = null
@@ -129,7 +129,7 @@ class KafkaServer(
var transactionCoordinator: TransactionCoordinator = null
- var kafkaController: KafkaController = null
+ @volatile private var _kafkaController: KafkaController = null
var forwardingManager: Option[ForwardingManager] = None
@@ -173,7 +173,11 @@ class KafkaServer(
private[kafka] def featureChangeListener = _featureChangeListener
- def replicaManager: ReplicaManager = _replicaManager
+ override def replicaManager: ReplicaManager = _replicaManager
+
+ override def logManager: LogManager = _logManager
+
+ def kafkaController: KafkaController = _kafkaController
/**
* Start up API for bringing up a single instance of the Kafka server.
@@ -230,7 +234,7 @@ class KafkaServer(
// initialize dynamic broker configs from ZooKeeper. Any updates made after this will be
// applied after DynamicConfigManager starts.
- config.dynamicConfig.initialize(zkClient)
+ config.dynamicConfig.initialize(Some(zkClient))
/* start scheduler */
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
@@ -250,7 +254,7 @@ class KafkaServer(
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
/* start log manager */
- logManager = LogManager(config, initialOfflineDirs,
+ _logManager = LogManager(config, initialOfflineDirs,
new ZkConfigRepository(new AdminZkClient(zkClient)),
kafkaScheduler, time, brokerTopicStats, logDirFailureChannel, config.usesTopicId)
_brokerState = BrokerState.RECOVERY
@@ -327,7 +331,7 @@ class KafkaServer(
tokenManager.startup()
/* start kafka controller */
- kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, featureCache, threadNamePrefix)
+ _kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, featureCache, threadNamePrefix)
kafkaController.startup()
adminManager = new ZkAdminManager(config, metrics, metadataCache, zkClient)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 7dbdbd5..eb82fd4 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -280,6 +280,10 @@ class ReplicaManager(val config: KafkaConfig,
replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
}
+ def resizeFetcherThreadPool(newSize: Int): Unit = {
+ replicaFetcherManager.resizeThreadPool(newSize)
+ }
+
def getLog(topicPartition: TopicPartition): Option[UnifiedLog] = logManager.getLog(topicPartition)
def hasDelayedElectionOperations: Boolean = delayedElectLeaderPurgatory.numDelayed != 0
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index f09cb14..b940bc9 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -20,7 +20,12 @@ package kafka.server
import java.{lang, util}
import java.util.Properties
import java.util.concurrent.CompletionStage
-import kafka.utils.TestUtils
+import java.util.concurrent.atomic.AtomicReference
+
+import kafka.controller.KafkaController
+import kafka.log.{LogConfig, LogManager}
+import kafka.network.SocketServer
+import kafka.utils.{KafkaScheduler, TestUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.{Endpoint, Reconfigurable}
import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
@@ -30,6 +35,7 @@ import org.apache.kafka.server.authorizer._
import org.easymock.EasyMock
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
+import org.mockito.{ArgumentMatchers, Mockito}
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
@@ -44,7 +50,9 @@ class DynamicBrokerConfigTest {
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, oldKeystore)
val config = KafkaConfig(props)
val dynamicConfig = config.dynamicConfig
- assertSame(config, dynamicConfig.currentKafkaConfig)
+ dynamicConfig.initialize(None)
+
+ assertEquals(config, dynamicConfig.currentKafkaConfig)
assertEquals(oldKeystore, config.values.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
assertEquals(oldKeystore,
config.valuesFromThisConfigWithPrefixOverride("listener.name.external.").get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
@@ -80,12 +88,107 @@ class DynamicBrokerConfigTest {
}
}
+ @Test
+ def testEnableDefaultUncleanLeaderElection(): Unit = {
+ val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+ origProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, "false")
+
+ val config = KafkaConfig(origProps)
+ val serverMock = Mockito.mock(classOf[KafkaServer])
+ val controllerMock = Mockito.mock(classOf[KafkaController])
+ val logManagerMock = Mockito.mock(classOf[LogManager])
+
+ Mockito.when(serverMock.config).thenReturn(config)
+ Mockito.when(serverMock.kafkaController).thenReturn(controllerMock)
+ Mockito.when(serverMock.logManager).thenReturn(logManagerMock)
+ Mockito.when(logManagerMock.allLogs).thenReturn(Iterable.empty)
+
+ val currentDefaultLogConfig = new AtomicReference(LogConfig())
+ Mockito.when(logManagerMock.currentDefaultConfig).thenAnswer(_ => currentDefaultLogConfig.get())
+ Mockito.when(logManagerMock.reconfigureDefaultLogConfig(ArgumentMatchers.any(classOf[LogConfig])))
+ .thenAnswer(invocation => currentDefaultLogConfig.set(invocation.getArgument(0)))
+
+ config.dynamicConfig.initialize(None)
+ config.dynamicConfig.addBrokerReconfigurable(new DynamicLogConfig(logManagerMock, serverMock))
+
+ val props = new Properties()
+
+ props.put(KafkaConfig.UncleanLeaderElectionEnableProp, "true")
+ config.dynamicConfig.updateDefaultConfig(props)
+ assertTrue(config.uncleanLeaderElectionEnable)
+ Mockito.verify(controllerMock).enableDefaultUncleanLeaderElection()
+ }
+
+ @Test
+ def testUpdateDynamicThreadPool(): Unit = {
+ val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+ origProps.put(KafkaConfig.NumIoThreadsProp, "4")
+ origProps.put(KafkaConfig.NumNetworkThreadsProp, "2")
+ origProps.put(KafkaConfig.NumReplicaFetchersProp, "1")
+ origProps.put(KafkaConfig.NumRecoveryThreadsPerDataDirProp, "1")
+ origProps.put(KafkaConfig.BackgroundThreadsProp, "3")
+
+ val config = KafkaConfig(origProps)
+ val serverMock = Mockito.mock(classOf[KafkaBroker])
+ val handlerPoolMock = Mockito.mock(classOf[KafkaRequestHandlerPool])
+ val socketServerMock = Mockito.mock(classOf[SocketServer])
+ val replicaManagerMock = Mockito.mock(classOf[ReplicaManager])
+ val logManagerMock = Mockito.mock(classOf[LogManager])
+ val schedulerMock = Mockito.mock(classOf[KafkaScheduler])
+
+ Mockito.when(serverMock.config).thenReturn(config)
+ Mockito.when(serverMock.dataPlaneRequestHandlerPool).thenReturn(handlerPoolMock)
+ Mockito.when(serverMock.socketServer).thenReturn(socketServerMock)
+ Mockito.when(serverMock.replicaManager).thenReturn(replicaManagerMock)
+ Mockito.when(serverMock.logManager).thenReturn(logManagerMock)
+ Mockito.when(serverMock.kafkaScheduler).thenReturn(schedulerMock)
+
+ config.dynamicConfig.initialize(None)
+ config.dynamicConfig.addBrokerReconfigurable(new DynamicThreadPool(serverMock))
+
+ val props = new Properties()
+
+ props.put(KafkaConfig.NumIoThreadsProp, "8")
+ config.dynamicConfig.updateDefaultConfig(props)
+ assertEquals(8, config.numIoThreads)
+ Mockito.verify(handlerPoolMock).resizeThreadPool(newSize = 8)
+
+ props.put(KafkaConfig.NumNetworkThreadsProp, "4")
+ config.dynamicConfig.updateDefaultConfig(props)
+ assertEquals(4, config.numNetworkThreads)
+ Mockito.verify(socketServerMock).resizeThreadPool(oldNumNetworkThreads = 2, newNumNetworkThreads = 4)
+
+ props.put(KafkaConfig.NumReplicaFetchersProp, "2")
+ config.dynamicConfig.updateDefaultConfig(props)
+ assertEquals(2, config.numReplicaFetchers)
+ Mockito.verify(replicaManagerMock).resizeFetcherThreadPool(newSize = 2)
+
+ props.put(KafkaConfig.NumRecoveryThreadsPerDataDirProp, "2")
+ config.dynamicConfig.updateDefaultConfig(props)
+ assertEquals(2, config.numRecoveryThreadsPerDataDir)
+ Mockito.verify(logManagerMock).resizeRecoveryThreadPool(newSize = 2)
+
+ props.put(KafkaConfig.BackgroundThreadsProp, "6")
+ config.dynamicConfig.updateDefaultConfig(props)
+ assertEquals(6, config.backgroundThreads)
+ Mockito.verify(schedulerMock).resizeThreadPool(newSize = 6)
+
+ Mockito.verifyNoMoreInteractions(
+ handlerPoolMock,
+ socketServerMock,
+ replicaManagerMock,
+ logManagerMock,
+ schedulerMock
+ )
+ }
+
@nowarn("cat=deprecation")
@Test
def testConfigUpdateWithSomeInvalidConfigs(): Unit = {
val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
origProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS")
val config = KafkaConfig(origProps)
+ config.dynamicConfig.initialize(None)
val validProps = Map(s"listener.name.external.${SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG}" -> "ks.p12")
@@ -107,6 +210,8 @@ class DynamicBrokerConfigTest {
val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
origProps.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "100000000")
val config = KafkaConfig(origProps)
+ config.dynamicConfig.initialize(None)
+
val validProps = Map.empty[String, String]
val invalidProps = Map(KafkaConfig.LogCleanerThreadsProp -> "20")
@@ -209,6 +314,8 @@ class DynamicBrokerConfigTest {
val configProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
configProps.put(KafkaConfig.PasswordEncoderSecretProp, "broker.secret")
val config = KafkaConfig(configProps)
+ config.dynamicConfig.initialize(None)
+
val props = new Properties
props.put(name, value)
val oldValue = config.originals.get(name)
@@ -279,6 +386,7 @@ class DynamicBrokerConfigTest {
props.put(KafkaConfig.SaslJaasConfigProp, "staticLoginModule required;")
props.put(KafkaConfig.PasswordEncoderSecretProp, "config-encoder-secret")
val config = KafkaConfig(props)
+ config.dynamicConfig.initialize(None)
val dynamicProps = new Properties
dynamicProps.put(KafkaConfig.SaslJaasConfigProp, "dynamicLoginModule required;")
@@ -290,6 +398,7 @@ class DynamicBrokerConfigTest {
// New config with same secret should use the dynamic password config
val newConfigWithSameSecret = KafkaConfig(props)
+ newConfigWithSameSecret.dynamicConfig.initialize(None)
newConfigWithSameSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
assertEquals("dynamicLoginModule required;", newConfigWithSameSecret.values.get(KafkaConfig.SaslJaasConfigProp).asInstanceOf[Password].value)
@@ -328,6 +437,8 @@ class DynamicBrokerConfigTest {
def testAuthorizerConfig(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
val oldConfig = KafkaConfig.fromProps(props)
+ oldConfig.dynamicConfig.initialize(None)
+
val kafkaServer: KafkaServer = EasyMock.createMock(classOf[kafka.server.KafkaServer])
class TestAuthorizer extends Authorizer with Reconfigurable {
@@ -377,7 +488,7 @@ class DynamicBrokerConfigTest {
val oldConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092))
val dynamicBrokerConfig = new DynamicBrokerConfig(oldConfig)
- dynamicBrokerConfig.initialize(zkClient)
+ dynamicBrokerConfig.initialize(Some(zkClient))
dynamicBrokerConfig.addBrokerReconfigurable(new TestDynamicThreadPool)
val newprops = new Properties()
@@ -389,28 +500,29 @@ class DynamicBrokerConfigTest {
@Test
def testImproperConfigsAreRemoved(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
- val configs = KafkaConfig(props)
+ val config = KafkaConfig(props)
+ config.dynamicConfig.initialize(None)
- assertEquals(Defaults.MaxConnections, configs.maxConnections)
- assertEquals(Defaults.MessageMaxBytes, configs.messageMaxBytes)
+ assertEquals(Defaults.MaxConnections, config.maxConnections)
+ assertEquals(Defaults.MessageMaxBytes, config.messageMaxBytes)
var newProps = new Properties()
newProps.put(KafkaConfig.MaxConnectionsProp, "9999")
newProps.put(KafkaConfig.MessageMaxBytesProp, "2222")
- configs.dynamicConfig.updateDefaultConfig(newProps)
- assertEquals(9999, configs.maxConnections)
- assertEquals(2222, configs.messageMaxBytes)
+ config.dynamicConfig.updateDefaultConfig(newProps)
+ assertEquals(9999, config.maxConnections)
+ assertEquals(2222, config.messageMaxBytes)
newProps = new Properties()
newProps.put(KafkaConfig.MaxConnectionsProp, "INVALID_INT")
newProps.put(KafkaConfig.MessageMaxBytesProp, "1111")
- configs.dynamicConfig.updateDefaultConfig(newProps)
+ config.dynamicConfig.updateDefaultConfig(newProps)
// Invalid value should be skipped and reassigned as default value
- assertEquals(Defaults.MaxConnections, configs.maxConnections)
+ assertEquals(Defaults.MaxConnections, config.maxConnections)
// Even if One property is invalid, the below should get correctly updated.
- assertEquals(1111, configs.messageMaxBytes)
+ assertEquals(1111, config.messageMaxBytes)
}
}