You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/11/09 21:44:44 UTC

[kafka] branch trunk updated: KAFKA-13417; Ensure dynamic reconfigurations set old config properly (#11448)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 78a5e92  KAFKA-13417; Ensure dynamic reconfigurations set old config properly (#11448)
78a5e92 is described below

commit 78a5e921d4566f2e2d56714834e3c5dd6ad17125
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Nov 9 13:42:54 2021 -0800

    KAFKA-13417; Ensure dynamic reconfigurations set old config properly (#11448)
    
    This patch fixes a bug in `DynamicBrokerConfig` which causes some configuration changes to be ignored. In particular, the bug is the result of the reference to the old configuration getting indirectly mutated prior to the call to `BrokerReconfigurable.reconfigure`. This causes the first dynamic configuration update to pass effectively the same configuration as both `oldConfig` and `newConfig`. In cases such as in `DynamicThreadPool`, the update is ignored because the old configuration [...]
    
    This bug only affects KRaft. It is protected in the zk broker by the call to `DynamicBrokerConfig.initialize()`, which overwrites the stored reference to the original configuration. The patch fixes the problem by ensuring that `initialize()` is also invoked in KRaft when `BrokerServer` starts up.
    
    Reviewers: David Jacot <dj...@confluent.io>, Rajini Sivaram <ra...@googlemail.com>
---
 .../src/main/scala/kafka/server/BrokerServer.scala |   2 +
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  23 ++--
 core/src/main/scala/kafka/server/KafkaConfig.scala |   2 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |  18 +--
 .../main/scala/kafka/server/ReplicaManager.scala   |   4 +
 .../kafka/server/DynamicBrokerConfigTest.scala     | 136 +++++++++++++++++++--
 6 files changed, 154 insertions(+), 31 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index ad549ff..839ee17 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -181,6 +181,8 @@ class BrokerServer(
     try {
       info("Starting broker")
 
+      config.dynamicConfig.initialize(zkClientOpt = None)
+
       lifecycleManager = new BrokerLifecycleManager(config, time, threadNamePrefix)
 
       /* start scheduler */
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 9bbf49d..e4d32fc 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -204,16 +204,19 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
   private val reconfigurables = mutable.Buffer[Reconfigurable]()
   private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()
   private val lock = new ReentrantReadWriteLock
-  private var currentConfig = kafkaConfig
+  private var currentConfig: KafkaConfig = null
   private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
 
-  private[server] def initialize(zkClient: KafkaZkClient): Unit = {
+  private[server] def initialize(zkClientOpt: Option[KafkaZkClient]): Unit = {
     currentConfig = new KafkaConfig(kafkaConfig.props, false, None)
-    val adminZkClient = new AdminZkClient(zkClient)
-    updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default))
-    val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, kafkaConfig.brokerId.toString)
-    val brokerConfig = maybeReEncodePasswords(props, adminZkClient)
-    updateBrokerConfig(kafkaConfig.brokerId, brokerConfig)
+
+    zkClientOpt.foreach { zkClient =>
+      val adminZkClient = new AdminZkClient(zkClient)
+      updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default))
+      val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, kafkaConfig.brokerId.toString)
+      val brokerConfig = maybeReEncodePasswords(props, adminZkClient)
+      updateBrokerConfig(kafkaConfig.brokerId, brokerConfig)
+    }
   }
 
   /**
@@ -516,6 +519,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
     newProps ++= staticBrokerConfigs
     overrideProps(newProps, dynamicDefaultConfigs)
     overrideProps(newProps, dynamicBrokerConfigs)
+
     val oldConfig = currentConfig
     val (newConfig, brokerReconfigurablesToUpdate) = processReconfiguration(newProps, validateOnly = false)
     if (newConfig ne currentConfig) {
@@ -719,7 +723,7 @@ class DynamicThreadPool(server: KafkaBroker) extends BrokerReconfigurable {
     if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads)
       server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, newConfig.numNetworkThreads)
     if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers)
-      server.replicaManager.replicaFetcherManager.resizeThreadPool(newConfig.numReplicaFetchers)
+      server.replicaManager.resizeFetcherThreadPool(newConfig.numReplicaFetchers)
     if (newConfig.numRecoveryThreadsPerDataDir != oldConfig.numRecoveryThreadsPerDataDir)
       server.logManager.resizeRecoveryThreadPool(newConfig.numRecoveryThreadsPerDataDir)
     if (newConfig.backgroundThreads != oldConfig.backgroundThreads)
@@ -955,6 +959,3 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
     listeners.map(e => (e.listenerName, e)).toMap
 
 }
-
-
-
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 17da10e..531bdb8 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1392,7 +1392,7 @@ object KafkaConfig {
     fromProps(props, doLog)
   }
 
-  def apply(props: java.util.Map[_, _]): KafkaConfig = new KafkaConfig(props, true)
+  def apply(props: java.util.Map[_, _], doLog: Boolean = true): KafkaConfig = new KafkaConfig(props, doLog)
 
   private def typeOf(name: String): Option[ConfigDef.Type] = Option(configDef.configKeys.get(name)).map(_.`type`)
 
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 88ccb6d..9f5674e 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -114,9 +114,9 @@ class KafkaServer(
   var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
 
   var logDirFailureChannel: LogDirFailureChannel = null
-  var logManager: LogManager = null
+  @volatile private var _logManager: LogManager = null
 
-  @volatile private[this] var _replicaManager: ReplicaManager = null
+  @volatile private var _replicaManager: ReplicaManager = null
   var adminManager: ZkAdminManager = null
   var tokenManager: DelegationTokenManager = null
 
@@ -129,7 +129,7 @@ class KafkaServer(
 
   var transactionCoordinator: TransactionCoordinator = null
 
-  var kafkaController: KafkaController = null
+  @volatile private var _kafkaController: KafkaController = null
 
   var forwardingManager: Option[ForwardingManager] = None
 
@@ -173,7 +173,11 @@ class KafkaServer(
 
   private[kafka] def featureChangeListener = _featureChangeListener
 
-  def replicaManager: ReplicaManager = _replicaManager
+  override def replicaManager: ReplicaManager = _replicaManager
+
+  override def logManager: LogManager = _logManager
+
+  def kafkaController: KafkaController = _kafkaController
 
   /**
    * Start up API for bringing up a single instance of the Kafka server.
@@ -230,7 +234,7 @@ class KafkaServer(
 
         // initialize dynamic broker configs from ZooKeeper. Any updates made after this will be
         // applied after DynamicConfigManager starts.
-        config.dynamicConfig.initialize(zkClient)
+        config.dynamicConfig.initialize(Some(zkClient))
 
         /* start scheduler */
         kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
@@ -250,7 +254,7 @@ class KafkaServer(
         logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
 
         /* start log manager */
-        logManager = LogManager(config, initialOfflineDirs,
+        _logManager = LogManager(config, initialOfflineDirs,
           new ZkConfigRepository(new AdminZkClient(zkClient)),
           kafkaScheduler, time, brokerTopicStats, logDirFailureChannel, config.usesTopicId)
         _brokerState = BrokerState.RECOVERY
@@ -327,7 +331,7 @@ class KafkaServer(
         tokenManager.startup()
 
         /* start kafka controller */
-        kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, featureCache, threadNamePrefix)
+        _kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, featureCache, threadNamePrefix)
         kafkaController.startup()
 
         adminManager = new ZkAdminManager(config, metrics, metadataCache, zkClient)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 7dbdbd5..eb82fd4 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -280,6 +280,10 @@ class ReplicaManager(val config: KafkaConfig,
     replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
   }
 
+  def resizeFetcherThreadPool(newSize: Int): Unit = {
+    replicaFetcherManager.resizeThreadPool(newSize)
+  }
+
   def getLog(topicPartition: TopicPartition): Option[UnifiedLog] = logManager.getLog(topicPartition)
 
   def hasDelayedElectionOperations: Boolean = delayedElectLeaderPurgatory.numDelayed != 0
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index f09cb14..b940bc9 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -20,7 +20,12 @@ package kafka.server
 import java.{lang, util}
 import java.util.Properties
 import java.util.concurrent.CompletionStage
-import kafka.utils.TestUtils
+import java.util.concurrent.atomic.AtomicReference
+
+import kafka.controller.KafkaController
+import kafka.log.{LogConfig, LogManager}
+import kafka.network.SocketServer
+import kafka.utils.{KafkaScheduler, TestUtils}
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.{Endpoint, Reconfigurable}
 import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
@@ -30,6 +35,7 @@ import org.apache.kafka.server.authorizer._
 import org.easymock.EasyMock
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
+import org.mockito.{ArgumentMatchers, Mockito}
 
 import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
@@ -44,7 +50,9 @@ class DynamicBrokerConfigTest {
     props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, oldKeystore)
     val config = KafkaConfig(props)
     val dynamicConfig = config.dynamicConfig
-    assertSame(config, dynamicConfig.currentKafkaConfig)
+    dynamicConfig.initialize(None)
+
+    assertEquals(config, dynamicConfig.currentKafkaConfig)
     assertEquals(oldKeystore, config.values.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
     assertEquals(oldKeystore,
       config.valuesFromThisConfigWithPrefixOverride("listener.name.external.").get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
@@ -80,12 +88,107 @@ class DynamicBrokerConfigTest {
     }
   }
 
+  @Test
+  def testEnableDefaultUncleanLeaderElection(): Unit = {
+    val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+    origProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, "false")
+
+    val config = KafkaConfig(origProps)
+    val serverMock = Mockito.mock(classOf[KafkaServer])
+    val controllerMock = Mockito.mock(classOf[KafkaController])
+    val logManagerMock = Mockito.mock(classOf[LogManager])
+
+    Mockito.when(serverMock.config).thenReturn(config)
+    Mockito.when(serverMock.kafkaController).thenReturn(controllerMock)
+    Mockito.when(serverMock.logManager).thenReturn(logManagerMock)
+    Mockito.when(logManagerMock.allLogs).thenReturn(Iterable.empty)
+
+    val currentDefaultLogConfig = new AtomicReference(LogConfig())
+    Mockito.when(logManagerMock.currentDefaultConfig).thenAnswer(_ => currentDefaultLogConfig.get())
+    Mockito.when(logManagerMock.reconfigureDefaultLogConfig(ArgumentMatchers.any(classOf[LogConfig])))
+      .thenAnswer(invocation => currentDefaultLogConfig.set(invocation.getArgument(0)))
+
+    config.dynamicConfig.initialize(None)
+    config.dynamicConfig.addBrokerReconfigurable(new DynamicLogConfig(logManagerMock, serverMock))
+
+    val props = new Properties()
+
+    props.put(KafkaConfig.UncleanLeaderElectionEnableProp, "true")
+    config.dynamicConfig.updateDefaultConfig(props)
+    assertTrue(config.uncleanLeaderElectionEnable)
+    Mockito.verify(controllerMock).enableDefaultUncleanLeaderElection()
+  }
+
+  @Test
+  def testUpdateDynamicThreadPool(): Unit = {
+    val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+    origProps.put(KafkaConfig.NumIoThreadsProp, "4")
+    origProps.put(KafkaConfig.NumNetworkThreadsProp, "2")
+    origProps.put(KafkaConfig.NumReplicaFetchersProp, "1")
+    origProps.put(KafkaConfig.NumRecoveryThreadsPerDataDirProp, "1")
+    origProps.put(KafkaConfig.BackgroundThreadsProp, "3")
+
+    val config = KafkaConfig(origProps)
+    val serverMock = Mockito.mock(classOf[KafkaBroker])
+    val handlerPoolMock = Mockito.mock(classOf[KafkaRequestHandlerPool])
+    val socketServerMock = Mockito.mock(classOf[SocketServer])
+    val replicaManagerMock = Mockito.mock(classOf[ReplicaManager])
+    val logManagerMock = Mockito.mock(classOf[LogManager])
+    val schedulerMock = Mockito.mock(classOf[KafkaScheduler])
+
+    Mockito.when(serverMock.config).thenReturn(config)
+    Mockito.when(serverMock.dataPlaneRequestHandlerPool).thenReturn(handlerPoolMock)
+    Mockito.when(serverMock.socketServer).thenReturn(socketServerMock)
+    Mockito.when(serverMock.replicaManager).thenReturn(replicaManagerMock)
+    Mockito.when(serverMock.logManager).thenReturn(logManagerMock)
+    Mockito.when(serverMock.kafkaScheduler).thenReturn(schedulerMock)
+
+    config.dynamicConfig.initialize(None)
+    config.dynamicConfig.addBrokerReconfigurable(new DynamicThreadPool(serverMock))
+
+    val props = new Properties()
+
+    props.put(KafkaConfig.NumIoThreadsProp, "8")
+    config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(8, config.numIoThreads)
+    Mockito.verify(handlerPoolMock).resizeThreadPool(newSize = 8)
+
+    props.put(KafkaConfig.NumNetworkThreadsProp, "4")
+    config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(4, config.numNetworkThreads)
+    Mockito.verify(socketServerMock).resizeThreadPool(oldNumNetworkThreads = 2, newNumNetworkThreads = 4)
+
+    props.put(KafkaConfig.NumReplicaFetchersProp, "2")
+    config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(2, config.numReplicaFetchers)
+    Mockito.verify(replicaManagerMock).resizeFetcherThreadPool(newSize = 2)
+
+    props.put(KafkaConfig.NumRecoveryThreadsPerDataDirProp, "2")
+    config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(2, config.numRecoveryThreadsPerDataDir)
+    Mockito.verify(logManagerMock).resizeRecoveryThreadPool(newSize = 2)
+
+    props.put(KafkaConfig.BackgroundThreadsProp, "6")
+    config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(6, config.backgroundThreads)
+    Mockito.verify(schedulerMock).resizeThreadPool(newSize = 6)
+
+    Mockito.verifyNoMoreInteractions(
+      handlerPoolMock,
+      socketServerMock,
+      replicaManagerMock,
+      logManagerMock,
+      schedulerMock
+    )
+  }
+
   @nowarn("cat=deprecation")
   @Test
   def testConfigUpdateWithSomeInvalidConfigs(): Unit = {
     val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
     origProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS")
     val config = KafkaConfig(origProps)
+    config.dynamicConfig.initialize(None)
 
     val validProps = Map(s"listener.name.external.${SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG}" -> "ks.p12")
 
@@ -107,6 +210,8 @@ class DynamicBrokerConfigTest {
     val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
     origProps.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "100000000")
     val config = KafkaConfig(origProps)
+    config.dynamicConfig.initialize(None)
+
     val validProps = Map.empty[String, String]
     val invalidProps = Map(KafkaConfig.LogCleanerThreadsProp -> "20")
 
@@ -209,6 +314,8 @@ class DynamicBrokerConfigTest {
     val configProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
     configProps.put(KafkaConfig.PasswordEncoderSecretProp, "broker.secret")
     val config = KafkaConfig(configProps)
+    config.dynamicConfig.initialize(None)
+
     val props = new Properties
     props.put(name, value)
     val oldValue = config.originals.get(name)
@@ -279,6 +386,7 @@ class DynamicBrokerConfigTest {
     props.put(KafkaConfig.SaslJaasConfigProp, "staticLoginModule required;")
     props.put(KafkaConfig.PasswordEncoderSecretProp, "config-encoder-secret")
     val config = KafkaConfig(props)
+    config.dynamicConfig.initialize(None)
     val dynamicProps = new Properties
     dynamicProps.put(KafkaConfig.SaslJaasConfigProp, "dynamicLoginModule required;")
 
@@ -290,6 +398,7 @@ class DynamicBrokerConfigTest {
 
     // New config with same secret should use the dynamic password config
     val newConfigWithSameSecret = KafkaConfig(props)
+    newConfigWithSameSecret.dynamicConfig.initialize(None)
     newConfigWithSameSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
     assertEquals("dynamicLoginModule required;", newConfigWithSameSecret.values.get(KafkaConfig.SaslJaasConfigProp).asInstanceOf[Password].value)
 
@@ -328,6 +437,8 @@ class DynamicBrokerConfigTest {
   def testAuthorizerConfig(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
     val oldConfig =  KafkaConfig.fromProps(props)
+    oldConfig.dynamicConfig.initialize(None)
+
     val kafkaServer: KafkaServer = EasyMock.createMock(classOf[kafka.server.KafkaServer])
 
     class TestAuthorizer extends Authorizer with Reconfigurable {
@@ -377,7 +488,7 @@ class DynamicBrokerConfigTest {
 
     val oldConfig =  KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092))
     val dynamicBrokerConfig = new DynamicBrokerConfig(oldConfig)
-    dynamicBrokerConfig.initialize(zkClient)
+    dynamicBrokerConfig.initialize(Some(zkClient))
     dynamicBrokerConfig.addBrokerReconfigurable(new TestDynamicThreadPool)
 
     val newprops = new Properties()
@@ -389,28 +500,29 @@ class DynamicBrokerConfigTest {
   @Test
   def testImproperConfigsAreRemoved(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
-    val configs = KafkaConfig(props)
+    val config = KafkaConfig(props)
+    config.dynamicConfig.initialize(None)
 
-    assertEquals(Defaults.MaxConnections, configs.maxConnections)
-    assertEquals(Defaults.MessageMaxBytes, configs.messageMaxBytes)
+    assertEquals(Defaults.MaxConnections, config.maxConnections)
+    assertEquals(Defaults.MessageMaxBytes, config.messageMaxBytes)
 
     var newProps = new Properties()
     newProps.put(KafkaConfig.MaxConnectionsProp, "9999")
     newProps.put(KafkaConfig.MessageMaxBytesProp, "2222")
 
-    configs.dynamicConfig.updateDefaultConfig(newProps)
-    assertEquals(9999, configs.maxConnections)
-    assertEquals(2222, configs.messageMaxBytes)
+    config.dynamicConfig.updateDefaultConfig(newProps)
+    assertEquals(9999, config.maxConnections)
+    assertEquals(2222, config.messageMaxBytes)
 
     newProps = new Properties()
     newProps.put(KafkaConfig.MaxConnectionsProp, "INVALID_INT")
     newProps.put(KafkaConfig.MessageMaxBytesProp, "1111")
 
-    configs.dynamicConfig.updateDefaultConfig(newProps)
+    config.dynamicConfig.updateDefaultConfig(newProps)
     // Invalid value should be skipped and reassigned as default value
-    assertEquals(Defaults.MaxConnections, configs.maxConnections)
+    assertEquals(Defaults.MaxConnections, config.maxConnections)
     // Even if One property is invalid, the below should get correctly updated.
-    assertEquals(1111, configs.messageMaxBytes)
+    assertEquals(1111, config.messageMaxBytes)
   }
 }