You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2020/03/05 17:08:11 UTC

[kafka] branch 2.5 updated: KAFKA-9661: Propagate includeSynonyms option to AdminClient in ConfigCommand (#8229)

This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 3f7af52  KAFKA-9661: Propagate includeSynonyms option to AdminClient in ConfigCommand (#8229)
3f7af52 is described below

commit 3f7af5209705e73c3c6f030030288c21dcb011a9
Author: David Arthur <mu...@gmail.com>
AuthorDate: Thu Mar 5 11:46:37 2020 -0500

    KAFKA-9661: Propagate includeSynonyms option to AdminClient in ConfigCommand (#8229)
---
 .../src/main/scala/kafka/admin/ConfigCommand.scala |  8 +++--
 .../scala/unit/kafka/admin/ConfigCommandTest.scala | 34 ++++++++++++++++++++--
 2 files changed, 37 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index d8fc59d..562a91a 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -28,7 +28,7 @@ import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, PasswordEncod
 import kafka.utils.Implicits._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, ListTopicsOptions, Config => JConfig}
+import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, Config => JConfig}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.errors.InvalidConfigurationException
@@ -367,7 +367,7 @@ object ConfigCommand extends Config {
       println(s"Completed updating default config for $entityType in the cluster.")
   }
 
-  private def describeConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
+  private[admin] def describeConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
     val entityType = opts.entityTypes.head
     val entityName = opts.entityNames.headOption
     val describeAll = opts.options.has(opts.allOpt)
@@ -426,7 +426,9 @@ object ConfigCommand extends Config {
       dynamicConfigSource
 
     val configResource = new ConfigResource(configResourceType, entityName)
-    val configs = adminClient.describeConfigs(Collections.singleton(configResource)).all.get(30, TimeUnit.SECONDS)
+    val describeOptions = new DescribeConfigsOptions().includeSynonyms(includeSynonyms)
+    val configs = adminClient.describeConfigs(Collections.singleton(configResource), describeOptions)
+      .all.get(30, TimeUnit.SECONDS)
     configs.get(configResource).entries.asScala
       .filter(entry => configSourceFilter match {
         case Some(configSource) => entry.source == configSource
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index a1f5f39..e938a6d 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -359,6 +359,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
     val node = new Node(1, "localhost", 9092)
     val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) {
       override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = {
+        assertFalse("Config synonyms requested unnecessarily", options.includeSynonyms())
         assertEquals(1, resources.size)
         val resource = resources.iterator.next
         assertEquals(resource.`type`, ConfigResource.Type.TOPIC)
@@ -391,6 +392,34 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
   }
 
   @Test
+  def shouldDescribeConfigSynonyms(): Unit = {
+    val resourceName = "my-topic"
+    val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092",
+      "--entity-name", resourceName,
+      "--entity-type", "topics",
+      "--describe",
+      "--all"))
+
+    val resource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName)
+    val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]]
+    future.complete(util.Collections.singletonMap(resource, new Config(util.Collections.emptyList[ConfigEntry])))
+    val describeResult: DescribeConfigsResult = EasyMock.createNiceMock(classOf[DescribeConfigsResult])
+    EasyMock.expect(describeResult.all()).andReturn(future).once()
+
+    val node = new Node(1, "localhost", 9092)
+    val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) {
+      override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = {
+        assertTrue("Synonyms not requested", options.includeSynonyms())
+        assertEquals(Set(resource), resources.asScala.toSet)
+        describeResult
+      }
+    }
+    EasyMock.replay(describeResult)
+    ConfigCommand.describeConfig(mockAdminClient, describeOpts)
+    EasyMock.reset(describeResult)
+  }
+
+  @Test
   def shouldAddBrokerQuotaConfig(): Unit = {
     val alterOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
       "--entity-name", "1",
@@ -539,6 +568,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
 
     val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) {
       override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = {
+        assertFalse("Config synonyms requested unnecessarily", options.includeSynonyms())
         assertEquals(1, resources.size)
         val resource = resources.iterator.next
         assertEquals(ConfigResource.Type.BROKER, resource.`type`)
@@ -585,7 +615,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
     EasyMock.expect(alterResult.all()).andReturn(alterFuture)
 
     val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) {
-      override def describeConfigs(resources: util.Collection[ConfigResource]): DescribeConfigsResult = {
+      override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = {
         assertEquals(1, resources.size)
         val resource = resources.iterator.next
         assertEquals(ConfigResource.Type.BROKER_LOGGER, resource.`type`)
@@ -1098,7 +1128,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
   }
 
   class DummyAdminClient(node: Node) extends MockAdminClient(util.Collections.singletonList(node), node) {
-    override def describeConfigs(resources: util.Collection[ConfigResource]): DescribeConfigsResult =
+    override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult =
       EasyMock.createNiceMock(classOf[DescribeConfigsResult])
     override def incrementalAlterConfigs(configs: util.Map[ConfigResource, util.Collection[AlterConfigOp]],
       options: AlterConfigsOptions): AlterConfigsResult = EasyMock.createNiceMock(classOf[AlterConfigsResult])