You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2020/05/22 01:18:25 UTC
[kafka] branch 2.5 updated: KAFKA-10004: ConfigCommand fails to
find default broker configs without ZK (#8675)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new a947720 KAFKA-10004: ConfigCommand fails to find default broker configs without ZK (#8675)
a947720 is described below
commit a947720a7ecd5b1e1b8350472a3ccfa02959a939
Author: showuon <43...@users.noreply.github.com>
AuthorDate: Tue May 19 09:43:45 2020 +0800
KAFKA-10004: ConfigCommand fails to find default broker configs without ZK (#8675)
Reviewers: Brian Byrne <bb...@confluent.io>, Colin P. McCabe <cm...@apache.org>
(cherry picked from commit ad0850659f5d536d43f09221c941022fc273e6d5)
---
.../src/main/scala/kafka/admin/ConfigCommand.scala | 7 ++--
.../scala/unit/kafka/admin/ConfigCommandTest.scala | 48 ++++++++++++++++++----
2 files changed, 45 insertions(+), 10 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 562a91a..5291a64 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -60,6 +60,7 @@ import scala.collection._
*/
object ConfigCommand extends Config {
+ val BrokerDefaultEntityName = ""
val BrokerLoggerConfigType = "broker-loggers"
val BrokerSupportedConfigTypes = Seq(ConfigType.Topic, ConfigType.Broker, BrokerLoggerConfigType)
val DefaultScramIterations = 4096
@@ -378,12 +379,12 @@ object ConfigCommand extends Config {
case ConfigType.Topic =>
adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get().asScala.toSeq
case ConfigType.Broker | BrokerLoggerConfigType =>
- adminClient.describeCluster(new DescribeClusterOptions()).nodes().get().asScala.map(_.idString).toSeq :+ ConfigEntityName.Default
+ adminClient.describeCluster(new DescribeClusterOptions()).nodes().get().asScala.map(_.idString).toSeq :+ BrokerDefaultEntityName
})
entities.foreach { entity =>
entity match {
- case "" =>
+ case BrokerDefaultEntityName =>
println(s"Default configs for $entityType in the cluster are:")
case _ =>
val configSourceStr = if (describeAll) "All" else "Dynamic"
@@ -408,7 +409,7 @@ object ConfigCommand extends Config {
Topic.validate(entityName)
(ConfigResource.Type.TOPIC, Some(ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG))
case ConfigType.Broker => entityName match {
- case "" =>
+ case BrokerDefaultEntityName =>
(ConfigResource.Type.BROKER, Some(ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG))
case _ =>
validateBrokerId()
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index e938a6d..b57238d 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -439,12 +439,6 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
}
@Test
- def shouldAddBrokerDynamicConfig(): Unit = {
- val node = new Node(1, "localhost", 9092)
- verifyAlterBrokerConfig(node, "1", List("--entity-name", "1"))
- }
-
- @Test
def shouldAddBrokerLoggerConfig(): Unit = {
val node = new Node(1, "localhost", 9092)
verifyAlterBrokerLoggerConfig(node, "1", "1", List(
@@ -456,7 +450,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
@Test
def testNoSpecifiedEntityOptionWithDescribeBrokersInZKIsAllowed(): Unit = {
- val optsList = List("--zookeeper", "localhost:9092",
+ val optsList = List("--zookeeper", zkConnect,
"--entity-type", ConfigType.Broker,
"--describe"
)
@@ -546,6 +540,12 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
verifyAlterBrokerConfig(node, "", List("--entity-default"))
}
+ @Test
+ def shouldAddBrokerDynamicConfig(): Unit = {
+ val node = new Node(1, "localhost", 9092)
+ verifyAlterBrokerConfig(node, "1", List("--entity-name", "1"))
+ }
+
def verifyAlterBrokerConfig(node: Node, resourceName: String, resourceOpts: List[String]): Unit = {
val optsList = List("--bootstrap-server", "localhost:9092",
"--entity-type", "brokers",
@@ -592,6 +592,40 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
EasyMock.reset(alterResult, describeResult)
}
+ @Test
+ def shouldDescribeConfigBrokerWithoutEntityName(): Unit = {
+ val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092",
+ "--entity-type", "brokers",
+ "--describe"))
+
+ val BrokerDefaultEntityName = ""
+ val resourceCustom = new ConfigResource(ConfigResource.Type.BROKER, "1")
+ val resourceDefault = new ConfigResource(ConfigResource.Type.BROKER, BrokerDefaultEntityName)
+ val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]]
+ val emptyConfig = new Config(util.Collections.emptyList[ConfigEntry])
+ val resultMap = Map(resourceCustom -> emptyConfig, resourceDefault -> emptyConfig).asJava
+ future.complete(resultMap)
+ val describeResult: DescribeConfigsResult = EasyMock.createNiceMock(classOf[DescribeConfigsResult])
+ // make sure it will be called 2 times: (1) for broker "1" (2) for default broker ""
+ EasyMock.expect(describeResult.all()).andReturn(future).times(2)
+
+ val node = new Node(1, "localhost", 9092)
+ val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) {
+ override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = {
+ assertTrue("Synonyms not requested", options.includeSynonyms())
+ val resource = resources.iterator.next
+ assertEquals(ConfigResource.Type.BROKER, resource.`type`)
+ assertTrue(resourceCustom.name == resource.name || resourceDefault.name == resource.name)
+ assertEquals(1, resources.size)
+ describeResult
+ }
+ }
+ EasyMock.replay(describeResult)
+ ConfigCommand.describeConfig(mockAdminClient, describeOpts)
+ EasyMock.verify(describeResult)
+ EasyMock.reset(describeResult)
+ }
+
def verifyAlterBrokerLoggerConfig(node: Node, resourceName: String, entityName: String,
describeConfigEntries: List[ConfigEntry]): Unit = {
val optsList = List("--bootstrap-server", "localhost:9092",