You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2020/05/10 12:55:33 UTC
[kafka] branch trunk updated: KAFKA-9942: ConfigCommand fails to
set client quotas for default users with --bootstrap-server. (#8628)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5583089 KAFKA-9942: ConfigCommand fails to set client quotas for default users with --bootstrap-server. (#8628)
5583089 is described below
commit 5583089df03ac6def37418e6b7c16f4e78ca010d
Author: Brian Byrne <bb...@confluent.io>
AuthorDate: Sun May 10 05:54:08 2020 -0700
KAFKA-9942: ConfigCommand fails to set client quotas for default users with --bootstrap-server. (#8628)
Reviewers: Colin P. McCabe <cm...@apache.org>
---
.../src/main/scala/kafka/admin/ConfigCommand.scala | 35 +++++++++++----
.../scala/unit/kafka/admin/ConfigCommandTest.scala | 50 ++++++++++++++++------
2 files changed, 65 insertions(+), 20 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 991b0f1..2b8ace4 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -362,21 +362,26 @@ object ConfigCommand extends Config {
).asJavaCollection
adminClient.incrementalAlterConfigs(Map(configResource -> alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
- case ConfigType.User =>
- case ConfigType.Client =>
- val oldConfig: Map[String, java.lang.Double] = getClientQuotasConfig(adminClient, entityTypes, entityNames)
+ case ConfigType.User | ConfigType.Client =>
+ val oldConfig = getClientQuotasConfig(adminClient, entityTypes, entityNames)
val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
if (invalidConfigs.nonEmpty)
throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")
- val entity = new ClientQuotaEntity(opts.entityTypes.map { entType =>
+ val alterEntityTypes = entityTypes.map { entType =>
entType match {
case ConfigType.User => ClientQuotaEntity.USER
case ConfigType.Client => ClientQuotaEntity.CLIENT_ID
case _ => throw new IllegalArgumentException(s"Unexpected entity type: ${entType}")
}
- }.zip(opts.entityNames).toMap.asJava)
+ }
+ val alterEntityNames = entityNames.map(en => if (en.nonEmpty) en else null)
+
+ // Explicitly populate a HashMap to ensure nulls are recorded properly.
+ val alterEntityMap = new java.util.HashMap[String, String]
+ alterEntityTypes.zip(alterEntityNames).foreach { case (k, v) => alterEntityMap.put(k, v) }
+ val entity = new ClientQuotaEntity(alterEntityMap)
val alterOptions = new AlterClientQuotasOptions().validateOnly(false)
val alterOps = (configsToBeAddedMap.map { case (key, value) =>
@@ -480,8 +485,18 @@ object ConfigCommand extends Config {
private def describeClientQuotasConfig(adminClient: Admin, entityTypes: List[String], entityNames: List[String]) = {
getAllClientQuotasConfigs(adminClient, entityTypes, entityNames).foreach { case (entity, entries) =>
val entityEntries = entity.entries.asScala
- val entityStr = (entityEntries.get(ClientQuotaEntity.USER).map(u => s"user-principal '${u}'") ++
- entityEntries.get(ClientQuotaEntity.CLIENT_ID).map(c => s"client-id '${c}'")).mkString(", ")
+
+ def entitySubstr(entityType: String): Option[String] =
+ entityEntries.get(entityType).map { name =>
+ val typeStr = entityType match {
+ case ClientQuotaEntity.USER => "user-principal"
+ case ClientQuotaEntity.CLIENT_ID => "client-id"
+ }
+ if (name != null) s"${typeStr} '${name}'"
+ else s"the default ${typeStr}"
+ }
+
+ val entityStr = (entitySubstr(ClientQuotaEntity.USER) ++ entitySubstr(ClientQuotaEntity.CLIENT_ID)).mkString(", ")
val entriesStr = entries.asScala.map(e => s"${e._1}=${e._2}").mkString(", ")
println(s"Configs for ${entityStr} are ${entriesStr}")
}
@@ -501,7 +516,11 @@ object ConfigCommand extends Config {
case Some(_) => throw new IllegalArgumentException(s"Unexpected entity type ${entityTypeOpt.get}")
case None => throw new IllegalArgumentException("More entity names specified than entity types")
}
- entityNameOpt.map(ClientQuotaFilterComponent.ofEntity(entityType, _)).getOrElse(ClientQuotaFilterComponent.ofEntityType(entityType))
+ entityNameOpt match {
+ case Some("") => ClientQuotaFilterComponent.ofDefaultEntity(entityType)
+ case Some(name) => ClientQuotaFilterComponent.ofEntity(entityType, name)
+ case None => ClientQuotaFilterComponent.ofEntityType(entityType)
+ }
}
adminClient.describeClientQuotas(ClientQuotaFilter.containsOnly(components.asJava)).entities.get(30, TimeUnit.SECONDS).asScala
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index 1a7d43e..c793e4a 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.config.{ConfigException, ConfigResource}
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.internals.KafkaFutureImpl
import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter}
+import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
import org.apache.kafka.common.utils.Sanitizer
@@ -398,16 +398,32 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
ConfigCommand.alterConfigWithZk(null, createOpts, new TestAdminZkClient(zkClient))
}
- @Test
- def shouldAddClientConfig(): Unit = {
+ def testShouldAddClientConfig(user: Option[String], clientId: Option[String]): Unit = {
+ def toValues(entityName: Option[String], entityType: String, command: String):
+ (Array[String], Option[String], Option[ClientQuotaFilterComponent]) = {
+ entityName match {
+ case Some(null) =>
+ (Array("--entity-type", command, "--entity-default"), Some(null),
+ Some(ClientQuotaFilterComponent.ofDefaultEntity(entityType)))
+ case Some(name) =>
+ (Array("--entity-type", command, "--entity-name", name), Some(name),
+ Some(ClientQuotaFilterComponent.ofEntity(entityType, name)))
+ case None => (Array.empty, None, None)
+ }
+ }
+ val (userArgs, userEntity, userComponent) = toValues(user, ClientQuotaEntity.USER, "users")
+ val (clientIdArgs, clientIdEntity, clientIdComponent) = toValues(clientId, ClientQuotaEntity.CLIENT_ID, "clients")
+
val createOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092",
- "--entity-name", "my-client-id",
- "--entity-type", "clients",
"--alter",
"--add-config", "consumer_byte_rate=20000,producer_byte_rate=10000",
- "--delete-config", "request_percentage"))
+ "--delete-config", "request_percentage") ++ userArgs ++ clientIdArgs)
- val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.CLIENT_ID -> "my-client-id")).asJava)
+ // Explicitly populate a HashMap to ensure nulls are recorded properly.
+ val entityMap = new java.util.HashMap[String, String]
+ userEntity.foreach(u => entityMap.put(ClientQuotaEntity.USER, u))
+ clientIdEntity.foreach(c => entityMap.put(ClientQuotaEntity.CLIENT_ID, c))
+ val entity = new ClientQuotaEntity(entityMap)
var describedConfigs = false
val describeFuture = new KafkaFutureImpl[util.Map[ClientQuotaEntity, util.Map[String, java.lang.Double]]]
@@ -424,12 +440,10 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
val node = new Node(1, "localhost", 9092)
val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) {
override def describeClientQuotas(filter: ClientQuotaFilter, options: DescribeClientQuotasOptions): DescribeClientQuotasResult = {
- assertEquals(1, filter.components.size)
assertTrue(filter.strict)
- val component = filter.components.asScala.head
- assertEquals(ClientQuotaEntity.CLIENT_ID, component.entityType)
- assertTrue(component.`match`.isPresent)
- assertEquals("my-client-id", component.`match`.get)
+ val components = filter.components.asScala.toSeq
+ userComponent.foreach(c => assertTrue(components.contains(c)))
+ clientIdComponent.foreach(c => assertTrue(components.contains(c)))
describedConfigs = true
describeResult
}
@@ -459,6 +473,18 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
}
@Test
+ def shouldAddClientConfig(): Unit = {
+ testShouldAddClientConfig(Some("test-user-1"), Some("test-client-1"))
+ testShouldAddClientConfig(Some("test-user-2"), Some(null))
+ testShouldAddClientConfig(Some("test-user-3"), None)
+ testShouldAddClientConfig(Some(null), Some("test-client-2"))
+ testShouldAddClientConfig(Some(null), Some(null))
+ testShouldAddClientConfig(Some(null), None)
+ testShouldAddClientConfig(None, Some("test-client-3"))
+ testShouldAddClientConfig(None, Some(null))
+ }
+
+ @Test
def shouldAddTopicConfigUsingZookeeper(): Unit = {
val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
"--entity-name", "my-topic",