You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/03/19 22:22:56 UTC
[kafka] branch trunk updated: KAFKA-6680: Fix issues related to
Dynamic Broker configs (#4731)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new aefe35e KAFKA-6680: Fix issues related to Dynamic Broker configs (#4731)
aefe35e is described below
commit aefe35e493758e439d7dd5cb5aa50762132a5e5d
Author: Manikumar Reddy O <ma...@gmail.com>
AuthorDate: Tue Mar 20 03:52:53 2018 +0530
KAFKA-6680: Fix issues related to Dynamic Broker configs (#4731)
- Fix kafkaConfig initialization if there are no dynamic configs defined in ZK.
- Update DynamicListenerConfig.validateReconfiguration() to check new Listeners must be subset of listener map
Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
.../scala/kafka/server/DynamicBrokerConfig.scala | 5 +-
.../kafka/server/DynamicBrokerConfigTest.scala | 54 ++++++++++++++++++++++
2 files changed, 57 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 3236af0..92fd5d7 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -137,6 +137,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
private[server] def initialize(zkClient: KafkaZkClient): Unit = {
+ currentConfig = new KafkaConfig(kafkaConfig.props, false, None)
val adminZkClient = new AdminZkClient(zkClient)
updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default))
val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, kafkaConfig.brokerId.toString)
@@ -719,8 +720,8 @@ class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable wi
val oldListeners = listenersToMap(oldConfig.listeners)
if (!newAdvertisedListeners.keySet.subsetOf(newListeners.keySet))
throw new ConfigException(s"Advertised listeners '$newAdvertisedListeners' must be a subset of listeners '$newListeners'")
- if (newListeners.keySet != newConfig.listenerSecurityProtocolMap.keySet)
- throw new ConfigException(s"Listeners '$newListeners' and listener map '${newConfig.listenerSecurityProtocolMap}' don't match")
+ if (!newListeners.keySet.subsetOf(newConfig.listenerSecurityProtocolMap.keySet))
+ throw new ConfigException(s"Listeners '$newListeners' must be subset of listener map '${newConfig.listenerSecurityProtocolMap}'")
newListeners.keySet.intersect(oldListeners.keySet).foreach { listenerName =>
val prefix = listenerName.configPrefix
val newListenerProps = immutableListenerConfigs(newConfig, prefix)
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 2e3e274..bca98d2 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -24,10 +24,12 @@ import kafka.utils.TestUtils
import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.{ConfigException, SslConfigs}
+import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.Test
import scala.collection.JavaConverters._
+import scala.collection.Set
class DynamicBrokerConfigTest {
@@ -248,4 +250,56 @@ class DynamicBrokerConfigTest {
newConfigWithNewSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
assertEquals("staticLoginModule required;", newConfigWithNewSecret.values.get(KafkaConfig.SaslJaasConfigProp).asInstanceOf[Password].value)
}
+
+ @Test
+ def testDynamicListenerConfig(): Unit = {
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
+ val oldConfig = KafkaConfig.fromProps(props)
+ val kafkaServer = EasyMock.createMock(classOf[kafka.server.KafkaServer])
+ EasyMock.expect(kafkaServer.config).andReturn(oldConfig).anyTimes()
+ EasyMock.replay(kafkaServer)
+
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://hostname:9092,SASL_PLAINTEXT://hostname:9093")
+ val newConfig = KafkaConfig(props)
+
+ val dynamicListenerConfig = new DynamicListenerConfig(kafkaServer)
+ dynamicListenerConfig.validateReconfiguration(newConfig)
+ }
+
+ @Test
+ def testDynamicConfigInitializationWithoutConfigsInZK(): Unit = {
+ val zkClient = EasyMock.createMock(classOf[kafka.zk.KafkaZkClient])
+ EasyMock.expect(zkClient.getEntityConfigs(EasyMock.anyString(), EasyMock.anyString())).andReturn(new java.util.Properties()).anyTimes()
+ EasyMock.replay(zkClient)
+
+ val oldConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092))
+ val dynamicBrokerConfig = new DynamicBrokerConfig(oldConfig)
+ dynamicBrokerConfig.initialize(zkClient)
+ dynamicBrokerConfig.addBrokerReconfigurable(new TestDynamicThreadPool)
+
+ val newprops = new Properties()
+ newprops.put(KafkaConfig.NumIoThreadsProp, "10")
+ newprops.put(KafkaConfig.BackgroundThreadsProp, "100")
+ dynamicBrokerConfig.updateBrokerConfig(0, newprops)
+ }
}
+
+class TestDynamicThreadPool() extends BrokerReconfigurable {
+
+ override def reconfigurableConfigs: Set[String] = {
+ DynamicThreadPool.ReconfigurableConfigs
+ }
+
+ override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
+ assertEquals(Defaults.NumIoThreads, oldConfig.numIoThreads)
+ assertEquals(Defaults.BackgroundThreads, oldConfig.backgroundThreads)
+
+ assertEquals(10, newConfig.numIoThreads)
+ assertEquals(100, newConfig.backgroundThreads)
+ }
+
+ override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+ assertEquals(10, newConfig.numIoThreads)
+ assertEquals(100, newConfig.backgroundThreads)
+ }
+}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.