You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/03/19 22:22:56 UTC

[kafka] branch trunk updated: KAFKA-6680: Fix issues related to Dynamic Broker configs (#4731)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new aefe35e   KAFKA-6680: Fix issues related to Dynamic Broker configs (#4731)
aefe35e is described below

commit aefe35e493758e439d7dd5cb5aa50762132a5e5d
Author: Manikumar Reddy O <ma...@gmail.com>
AuthorDate: Tue Mar 20 03:52:53 2018 +0530

     KAFKA-6680: Fix issues related to Dynamic Broker configs (#4731)
    
    - Fix kafkaConfig initialization if there are no dynamic configs defined in ZK.
    - Update DynamicListenerConfig.validateReconfiguration() to check new Listeners must be subset of listener map
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  5 +-
 .../kafka/server/DynamicBrokerConfigTest.scala     | 54 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 3236af0..92fd5d7 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -137,6 +137,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
   private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
 
   private[server] def initialize(zkClient: KafkaZkClient): Unit = {
+    currentConfig = new KafkaConfig(kafkaConfig.props, false, None)
     val adminZkClient = new AdminZkClient(zkClient)
     updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default))
     val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, kafkaConfig.brokerId.toString)
@@ -719,8 +720,8 @@ class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable wi
     val oldListeners = listenersToMap(oldConfig.listeners)
     if (!newAdvertisedListeners.keySet.subsetOf(newListeners.keySet))
       throw new ConfigException(s"Advertised listeners '$newAdvertisedListeners' must be a subset of listeners '$newListeners'")
-    if (newListeners.keySet != newConfig.listenerSecurityProtocolMap.keySet)
-      throw new ConfigException(s"Listeners '$newListeners' and listener map '${newConfig.listenerSecurityProtocolMap}' don't match")
+    if (!newListeners.keySet.subsetOf(newConfig.listenerSecurityProtocolMap.keySet))
+      throw new ConfigException(s"Listeners '$newListeners' must be subset of listener map '${newConfig.listenerSecurityProtocolMap}'")
     newListeners.keySet.intersect(oldListeners.keySet).foreach { listenerName =>
       val prefix = listenerName.configPrefix
       val newListenerProps = immutableListenerConfigs(newConfig, prefix)
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 2e3e274..bca98d2 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -24,10 +24,12 @@ import kafka.utils.TestUtils
 import org.apache.kafka.common.Reconfigurable
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.config.{ConfigException, SslConfigs}
+import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.Test
 
 import scala.collection.JavaConverters._
+import scala.collection.Set
 
 class DynamicBrokerConfigTest {
 
@@ -248,4 +250,56 @@ class DynamicBrokerConfigTest {
     newConfigWithNewSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
     assertEquals("staticLoginModule required;", newConfigWithNewSecret.values.get(KafkaConfig.SaslJaasConfigProp).asInstanceOf[Password].value)
   }
+
+  @Test
+  def testDynamicListenerConfig(): Unit = {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
+    val oldConfig =  KafkaConfig.fromProps(props)
+    val kafkaServer = EasyMock.createMock(classOf[kafka.server.KafkaServer])
+    EasyMock.expect(kafkaServer.config).andReturn(oldConfig).anyTimes()
+    EasyMock.replay(kafkaServer)
+
+    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://hostname:9092,SASL_PLAINTEXT://hostname:9093")
+    val newConfig = KafkaConfig(props)
+
+    val dynamicListenerConfig = new DynamicListenerConfig(kafkaServer)
+    dynamicListenerConfig.validateReconfiguration(newConfig)
+  }
+
+  @Test
+  def testDynamicConfigInitializationWithoutConfigsInZK(): Unit = {
+    val zkClient = EasyMock.createMock(classOf[kafka.zk.KafkaZkClient])
+    EasyMock.expect(zkClient.getEntityConfigs(EasyMock.anyString(), EasyMock.anyString())).andReturn(new java.util.Properties()).anyTimes()
+    EasyMock.replay(zkClient)
+
+    val oldConfig =  KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092))
+    val dynamicBrokerConfig = new DynamicBrokerConfig(oldConfig)
+    dynamicBrokerConfig.initialize(zkClient)
+    dynamicBrokerConfig.addBrokerReconfigurable(new TestDynamicThreadPool)
+
+    val newprops = new Properties()
+    newprops.put(KafkaConfig.NumIoThreadsProp, "10")
+    newprops.put(KafkaConfig.BackgroundThreadsProp, "100")
+    dynamicBrokerConfig.updateBrokerConfig(0, newprops)
+  }
 }
+
+class TestDynamicThreadPool() extends BrokerReconfigurable {
+
+  override def reconfigurableConfigs: Set[String] = {
+    DynamicThreadPool.ReconfigurableConfigs
+  }
+
+  override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
+    assertEquals(Defaults.NumIoThreads, oldConfig.numIoThreads)
+    assertEquals(Defaults.BackgroundThreads, oldConfig.backgroundThreads)
+
+    assertEquals(10, newConfig.numIoThreads)
+    assertEquals(100, newConfig.backgroundThreads)
+  }
+
+  override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+    assertEquals(10, newConfig.numIoThreads)
+    assertEquals(100, newConfig.backgroundThreads)
+  }
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.