You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2020/03/05 17:08:11 UTC
[kafka] branch 2.5 updated: KAFKA-9661: Propagate includeSynonyms
option to AdminClient in ConfigCommand (#8229)
This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 3f7af52 KAFKA-9661: Propagate includeSynonyms option to AdminClient in ConfigCommand (#8229)
3f7af52 is described below
commit 3f7af5209705e73c3c6f030030288c21dcb011a9
Author: David Arthur <mu...@gmail.com>
AuthorDate: Thu Mar 5 11:46:37 2020 -0500
KAFKA-9661: Propagate includeSynonyms option to AdminClient in ConfigCommand (#8229)
---
.../src/main/scala/kafka/admin/ConfigCommand.scala | 8 +++--
.../scala/unit/kafka/admin/ConfigCommandTest.scala | 34 ++++++++++++++++++++--
2 files changed, 37 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index d8fc59d..562a91a 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -28,7 +28,7 @@ import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, PasswordEncod
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, ListTopicsOptions, Config => JConfig}
+import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, Config => JConfig}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.errors.InvalidConfigurationException
@@ -367,7 +367,7 @@ object ConfigCommand extends Config {
println(s"Completed updating default config for $entityType in the cluster.")
}
- private def describeConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
+ private[admin] def describeConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
val entityType = opts.entityTypes.head
val entityName = opts.entityNames.headOption
val describeAll = opts.options.has(opts.allOpt)
@@ -426,7 +426,9 @@ object ConfigCommand extends Config {
dynamicConfigSource
val configResource = new ConfigResource(configResourceType, entityName)
- val configs = adminClient.describeConfigs(Collections.singleton(configResource)).all.get(30, TimeUnit.SECONDS)
+ val describeOptions = new DescribeConfigsOptions().includeSynonyms(includeSynonyms)
+ val configs = adminClient.describeConfigs(Collections.singleton(configResource), describeOptions)
+ .all.get(30, TimeUnit.SECONDS)
configs.get(configResource).entries.asScala
.filter(entry => configSourceFilter match {
case Some(configSource) => entry.source == configSource
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index a1f5f39..e938a6d 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -359,6 +359,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
val node = new Node(1, "localhost", 9092)
val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) {
override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = {
+ assertFalse("Config synonyms requested unnecessarily", options.includeSynonyms())
assertEquals(1, resources.size)
val resource = resources.iterator.next
assertEquals(resource.`type`, ConfigResource.Type.TOPIC)
@@ -391,6 +392,34 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
}
@Test
+ def shouldDescribeConfigSynonyms(): Unit = {
+ val resourceName = "my-topic"
+ val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092",
+ "--entity-name", resourceName,
+ "--entity-type", "topics",
+ "--describe",
+ "--all"))
+
+ val resource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName)
+ val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]]
+ future.complete(util.Collections.singletonMap(resource, new Config(util.Collections.emptyList[ConfigEntry])))
+ val describeResult: DescribeConfigsResult = EasyMock.createNiceMock(classOf[DescribeConfigsResult])
+ EasyMock.expect(describeResult.all()).andReturn(future).once()
+
+ val node = new Node(1, "localhost", 9092)
+ val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) {
+ override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = {
+ assertTrue("Synonyms not requested", options.includeSynonyms())
+ assertEquals(Set(resource), resources.asScala.toSet)
+ describeResult
+ }
+ }
+ EasyMock.replay(describeResult)
+ ConfigCommand.describeConfig(mockAdminClient, describeOpts)
+ EasyMock.reset(describeResult)
+ }
+
+ @Test
def shouldAddBrokerQuotaConfig(): Unit = {
val alterOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
"--entity-name", "1",
@@ -539,6 +568,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) {
override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = {
+ assertFalse("Config synonyms requested unnecessarily", options.includeSynonyms())
assertEquals(1, resources.size)
val resource = resources.iterator.next
assertEquals(ConfigResource.Type.BROKER, resource.`type`)
@@ -585,7 +615,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
EasyMock.expect(alterResult.all()).andReturn(alterFuture)
val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) {
- override def describeConfigs(resources: util.Collection[ConfigResource]): DescribeConfigsResult = {
+ override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = {
assertEquals(1, resources.size)
val resource = resources.iterator.next
assertEquals(ConfigResource.Type.BROKER_LOGGER, resource.`type`)
@@ -1098,7 +1128,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
}
class DummyAdminClient(node: Node) extends MockAdminClient(util.Collections.singletonList(node), node) {
- override def describeConfigs(resources: util.Collection[ConfigResource]): DescribeConfigsResult =
+ override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult =
EasyMock.createNiceMock(classOf[DescribeConfigsResult])
override def incrementalAlterConfigs(configs: util.Map[ConfigResource, util.Collection[AlterConfigOp]],
options: AlterConfigsOptions): AlterConfigsResult = EasyMock.createNiceMock(classOf[AlterConfigsResult])