You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2020/05/22 01:18:25 UTC

[kafka] branch 2.5 updated: KAFKA-10004: ConfigCommand fails to find default broker configs without ZK (#8675)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new a947720  KAFKA-10004: ConfigCommand fails to find default broker configs without ZK (#8675)
a947720 is described below

commit a947720a7ecd5b1e1b8350472a3ccfa02959a939
Author: showuon <43...@users.noreply.github.com>
AuthorDate: Tue May 19 09:43:45 2020 +0800

    KAFKA-10004: ConfigCommand fails to find default broker configs without ZK (#8675)
    
    Reviewers: Brian Byrne <bb...@confluent.io>, Colin P. McCabe <cm...@apache.org>
    (cherry picked from commit ad0850659f5d536d43f09221c941022fc273e6d5)
---
 .../src/main/scala/kafka/admin/ConfigCommand.scala |  7 ++--
 .../scala/unit/kafka/admin/ConfigCommandTest.scala | 48 ++++++++++++++++++----
 2 files changed, 45 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 562a91a..5291a64 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -60,6 +60,7 @@ import scala.collection._
  */
 object ConfigCommand extends Config {
 
+  val BrokerDefaultEntityName = ""
   val BrokerLoggerConfigType = "broker-loggers"
   val BrokerSupportedConfigTypes = Seq(ConfigType.Topic, ConfigType.Broker, BrokerLoggerConfigType)
   val DefaultScramIterations = 4096
@@ -378,12 +379,12 @@ object ConfigCommand extends Config {
         case ConfigType.Topic =>
           adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get().asScala.toSeq
         case ConfigType.Broker | BrokerLoggerConfigType =>
-          adminClient.describeCluster(new DescribeClusterOptions()).nodes().get().asScala.map(_.idString).toSeq :+ ConfigEntityName.Default
+          adminClient.describeCluster(new DescribeClusterOptions()).nodes().get().asScala.map(_.idString).toSeq :+ BrokerDefaultEntityName
       })
 
     entities.foreach { entity =>
       entity match {
-        case "" =>
+        case BrokerDefaultEntityName =>
           println(s"Default configs for $entityType in the cluster are:")
         case _ =>
           val configSourceStr = if (describeAll) "All" else "Dynamic"
@@ -408,7 +409,7 @@ object ConfigCommand extends Config {
           Topic.validate(entityName)
         (ConfigResource.Type.TOPIC, Some(ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG))
       case ConfigType.Broker => entityName match {
-        case "" =>
+        case BrokerDefaultEntityName =>
           (ConfigResource.Type.BROKER, Some(ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG))
         case _ =>
           validateBrokerId()
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index e938a6d..b57238d 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -439,12 +439,6 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
   }
 
   @Test
-  def shouldAddBrokerDynamicConfig(): Unit = {
-    val node = new Node(1, "localhost", 9092)
-    verifyAlterBrokerConfig(node, "1", List("--entity-name", "1"))
-  }
-
-  @Test
   def shouldAddBrokerLoggerConfig(): Unit = {
     val node = new Node(1, "localhost", 9092)
     verifyAlterBrokerLoggerConfig(node, "1", "1", List(
@@ -456,7 +450,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
 
   @Test
   def testNoSpecifiedEntityOptionWithDescribeBrokersInZKIsAllowed(): Unit = {
-    val optsList = List("--zookeeper", "localhost:9092",
+    val optsList = List("--zookeeper", zkConnect,
       "--entity-type", ConfigType.Broker,
       "--describe"
     )
@@ -546,6 +540,12 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
     verifyAlterBrokerConfig(node, "", List("--entity-default"))
   }
 
+  @Test
+  def shouldAddBrokerDynamicConfig(): Unit = {
+    val node = new Node(1, "localhost", 9092)
+    verifyAlterBrokerConfig(node, "1", List("--entity-name", "1"))
+  }
+
   def verifyAlterBrokerConfig(node: Node, resourceName: String, resourceOpts: List[String]): Unit = {
     val optsList = List("--bootstrap-server", "localhost:9092",
       "--entity-type", "brokers",
@@ -592,6 +592,40 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
     EasyMock.reset(alterResult, describeResult)
   }
 
+  @Test
+  def shouldDescribeConfigBrokerWithoutEntityName(): Unit = {
+    val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092",
+      "--entity-type", "brokers",
+      "--describe"))
+
+    val BrokerDefaultEntityName = ""
+    val resourceCustom = new ConfigResource(ConfigResource.Type.BROKER, "1")
+    val resourceDefault = new ConfigResource(ConfigResource.Type.BROKER, BrokerDefaultEntityName)
+    val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]]
+    val emptyConfig = new Config(util.Collections.emptyList[ConfigEntry])
+    val resultMap = Map(resourceCustom -> emptyConfig, resourceDefault -> emptyConfig).asJava
+    future.complete(resultMap)
+    val describeResult: DescribeConfigsResult = EasyMock.createNiceMock(classOf[DescribeConfigsResult])
+    // make sure it will be called 2 times: (1) for broker "1" (2) for default broker ""
+    EasyMock.expect(describeResult.all()).andReturn(future).times(2)
+
+    val node = new Node(1, "localhost", 9092)
+    val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) {
+      override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = {
+        assertTrue("Synonyms not requested", options.includeSynonyms())
+        val resource = resources.iterator.next
+        assertEquals(ConfigResource.Type.BROKER, resource.`type`)
+        assertTrue(resourceCustom.name == resource.name || resourceDefault.name == resource.name)
+        assertEquals(1, resources.size)
+        describeResult
+      }
+    }
+    EasyMock.replay(describeResult)
+    ConfigCommand.describeConfig(mockAdminClient, describeOpts)
+    EasyMock.verify(describeResult)
+    EasyMock.reset(describeResult)
+  }
+
   def verifyAlterBrokerLoggerConfig(node: Node, resourceName: String, entityName: String,
                                     describeConfigEntries: List[ConfigEntry]): Unit = {
     val optsList = List("--bootstrap-server", "localhost:9092",