You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2019/03/12 09:53:18 UTC

[kafka] branch 2.2 updated: KAFKA-7976; Update config before notifying controller of unclean leader update (#6426)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new c43b914  KAFKA-7976; Update config before notifying controller of unclean leader update (#6426)
c43b914 is described below

commit c43b914d909ab5460562f4c741b332f2a114a3a5
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Tue Mar 12 09:47:05 2019 +0000

    KAFKA-7976; Update config before notifying controller of unclean leader update (#6426)
    
    When unclean leader election is enabled dynamically on brokers, we notify controller of the update before updating KafkaConfig. When processing this event, controller's decision to elect unclean leaders is based on the current KafkaConfig, so there is a small timing window when the controller may not elect unclean leader because KafkaConfig of the server was not yet updated. The commit fixes this timing window by using the existing BrokerReconfigurable interface used by other classes  [...]
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
---
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 31 +++++++++++++++-------
 .../server/DynamicBrokerReconfigurationTest.scala  |  4 ++-
 2 files changed, 24 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 1c56572..b02b842 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -204,13 +204,25 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
     brokerReconfigurables.clear()
   }
 
+  /**
+   * Add reconfigurables to be notified when a dynamic broker config is updated.
+   *
+   * `Reconfigurable` is the public API used by configurable plugins like metrics reporter
+   * and quota callbacks. These are reconfigured before `KafkaConfig` is updated so that
+   * the update can be aborted if `reconfigure()` fails with an exception.
+   *
+   * `BrokerReconfigurable` is used for internal reconfigurable classes. These are
+   * reconfigured after `KafkaConfig` is updated so that they can access `KafkaConfig`
+   * directly. They are provided both old and new configs.
+   */
   def addReconfigurables(kafkaServer: KafkaServer): Unit = {
+    addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))
+    addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaServer))
+
     addBrokerReconfigurable(new DynamicThreadPool(kafkaServer))
     if (kafkaServer.logManager.cleaner != null)
       addBrokerReconfigurable(kafkaServer.logManager.cleaner)
-    addReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer))
-    addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))
-    addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaServer))
+    addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer))
     addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
     addBrokerReconfigurable(new DynamicConnectionQuota(kafkaServer))
   }
@@ -565,25 +577,24 @@ object DynamicLogConfig {
   val ReconfigurableConfigs = LogConfig.TopicConfigSynonyms.values.toSet -- ExcludedConfigs
   val KafkaConfigToLogConfigName = LogConfig.TopicConfigSynonyms.map { case (k, v) => (v, k) }
 }
-class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends Reconfigurable with Logging {
 
-  override def configure(configs: util.Map[String, _]): Unit = {}
+class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends BrokerReconfigurable with Logging {
 
-  override def reconfigurableConfigs(): util.Set[String] = {
-    DynamicLogConfig.ReconfigurableConfigs.asJava
+  override def reconfigurableConfigs: Set[String] = {
+    DynamicLogConfig.ReconfigurableConfigs
   }
 
-  override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
+  override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
     // For update of topic config overrides, only config names and types are validated
     // Names and types have already been validated. For consistency with topic config
     // validation, no additional validation is performed.
   }
 
-  override def reconfigure(configs: util.Map[String, _]): Unit = {
+  override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
     val currentLogConfig = logManager.currentDefaultConfig
     val origUncleanLeaderElectionEnable = logManager.currentDefaultConfig.uncleanLeaderElectionEnable
     val newBrokerDefaults = new util.HashMap[String, Object](currentLogConfig.originals)
-    configs.asScala.filterKeys(DynamicLogConfig.ReconfigurableConfigs.contains).foreach { case (k, v) =>
+    newConfig.valuesFromThisConfig.asScala.filterKeys(DynamicLogConfig.ReconfigurableConfigs.contains).foreach { case (k, v) =>
       if (v != null) {
         DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { configName =>
           newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef])
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 798961e..c28fcea 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -391,7 +391,9 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
 
     // Verify that configs of existing logs have been updated
     val newLogConfig = LogConfig(KafkaServer.copyKafkaConfigToLog(servers.head.config))
-    assertEquals(newLogConfig, servers.head.logManager.currentDefaultConfig)
+    TestUtils.waitUntilTrue(() => servers.head.logManager.currentDefaultConfig == newLogConfig,
+      "Config not updated in LogManager")
+
     val log = servers.head.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found"))
     TestUtils.waitUntilTrue(() => log.config.segmentSize == 4000, "Existing topic config using defaults not updated")
     props.asScala.foreach { case (k, v) =>