You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2020/05/10 12:55:33 UTC

[kafka] branch trunk updated: KAFKA-9942: ConfigCommand fails to set client quotas for default users with --bootstrap-server. (#8628)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5583089  KAFKA-9942: ConfigCommand fails to set client quotas for default users with --bootstrap-server. (#8628)
5583089 is described below

commit 5583089df03ac6def37418e6b7c16f4e78ca010d
Author: Brian Byrne <bb...@confluent.io>
AuthorDate: Sun May 10 05:54:08 2020 -0700

    KAFKA-9942: ConfigCommand fails to set client quotas for default users with --bootstrap-server. (#8628)
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 .../src/main/scala/kafka/admin/ConfigCommand.scala | 35 +++++++++++----
 .../scala/unit/kafka/admin/ConfigCommandTest.scala | 50 ++++++++++++++++------
 2 files changed, 65 insertions(+), 20 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 991b0f1..2b8ace4 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -362,21 +362,26 @@ object ConfigCommand extends Config {
         ).asJavaCollection
         adminClient.incrementalAlterConfigs(Map(configResource -> alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
 
-      case ConfigType.User =>
-      case ConfigType.Client =>
-        val oldConfig: Map[String, java.lang.Double] = getClientQuotasConfig(adminClient, entityTypes, entityNames)
+      case ConfigType.User | ConfigType.Client =>
+        val oldConfig = getClientQuotasConfig(adminClient, entityTypes, entityNames)
 
         val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
         if (invalidConfigs.nonEmpty)
           throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")
 
-        val entity = new ClientQuotaEntity(opts.entityTypes.map { entType =>
+        val alterEntityTypes = entityTypes.map { entType =>
           entType match {
             case ConfigType.User => ClientQuotaEntity.USER
             case ConfigType.Client => ClientQuotaEntity.CLIENT_ID
             case _ => throw new IllegalArgumentException(s"Unexpected entity type: ${entType}")
           }
-        }.zip(opts.entityNames).toMap.asJava)
+        }
+        val alterEntityNames = entityNames.map(en => if (en.nonEmpty) en else null)
+
+        // Explicitly populate a HashMap to ensure nulls are recorded properly.
+        val alterEntityMap = new java.util.HashMap[String, String]
+        alterEntityTypes.zip(alterEntityNames).foreach { case (k, v) => alterEntityMap.put(k, v) }
+        val entity = new ClientQuotaEntity(alterEntityMap)
 
         val alterOptions = new AlterClientQuotasOptions().validateOnly(false)
         val alterOps = (configsToBeAddedMap.map { case (key, value) =>
@@ -480,8 +485,18 @@ object ConfigCommand extends Config {
   private def describeClientQuotasConfig(adminClient: Admin, entityTypes: List[String], entityNames: List[String]) = {
     getAllClientQuotasConfigs(adminClient, entityTypes, entityNames).foreach { case (entity, entries) =>
       val entityEntries = entity.entries.asScala
-      val entityStr = (entityEntries.get(ClientQuotaEntity.USER).map(u => s"user-principal '${u}'") ++
-        entityEntries.get(ClientQuotaEntity.CLIENT_ID).map(c => s"client-id '${c}'")).mkString(", ")
+
+      def entitySubstr(entityType: String): Option[String] =
+        entityEntries.get(entityType).map { name =>
+          val typeStr = entityType match {
+            case ClientQuotaEntity.USER => "user-principal"
+            case ClientQuotaEntity.CLIENT_ID => "client-id"
+          }
+          if (name != null) s"${typeStr} '${name}'"
+          else s"the default ${typeStr}"
+        }
+
+      val entityStr = (entitySubstr(ClientQuotaEntity.USER) ++ entitySubstr(ClientQuotaEntity.CLIENT_ID)).mkString(", ")
       val entriesStr = entries.asScala.map(e => s"${e._1}=${e._2}").mkString(", ")
       println(s"Configs for ${entityStr} are ${entriesStr}")
     }
@@ -501,7 +516,11 @@ object ConfigCommand extends Config {
         case Some(_) => throw new IllegalArgumentException(s"Unexpected entity type ${entityTypeOpt.get}")
         case None => throw new IllegalArgumentException("More entity names specified than entity types")
       }
-      entityNameOpt.map(ClientQuotaFilterComponent.ofEntity(entityType, _)).getOrElse(ClientQuotaFilterComponent.ofEntityType(entityType))
+      entityNameOpt match {
+        case Some("") => ClientQuotaFilterComponent.ofDefaultEntity(entityType)
+        case Some(name) => ClientQuotaFilterComponent.ofEntity(entityType, name)
+        case None => ClientQuotaFilterComponent.ofEntityType(entityType)
+      }
     }
 
     adminClient.describeClientQuotas(ClientQuotaFilter.containsOnly(components.asJava)).entities.get(30, TimeUnit.SECONDS).asScala
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index 1a7d43e..c793e4a 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.config.{ConfigException, ConfigResource}
 import org.apache.kafka.common.errors.InvalidConfigurationException
 import org.apache.kafka.common.internals.KafkaFutureImpl
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter}
+import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
 import org.apache.kafka.common.utils.Sanitizer
@@ -398,16 +398,32 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
     ConfigCommand.alterConfigWithZk(null, createOpts, new TestAdminZkClient(zkClient))
   }
 
-  @Test
-  def shouldAddClientConfig(): Unit = {
+  def testShouldAddClientConfig(user: Option[String], clientId: Option[String]): Unit = {
+    def toValues(entityName: Option[String], entityType: String, command: String):
+        (Array[String], Option[String], Option[ClientQuotaFilterComponent]) = {
+      entityName match {
+        case Some(null) =>
+          (Array("--entity-type", command, "--entity-default"), Some(null),
+            Some(ClientQuotaFilterComponent.ofDefaultEntity(entityType)))
+        case Some(name) =>
+          (Array("--entity-type", command, "--entity-name", name), Some(name),
+            Some(ClientQuotaFilterComponent.ofEntity(entityType, name)))
+        case None => (Array.empty, None, None)
+      }
+    }
+    val (userArgs, userEntity, userComponent) = toValues(user, ClientQuotaEntity.USER, "users")
+    val (clientIdArgs, clientIdEntity, clientIdComponent) = toValues(clientId, ClientQuotaEntity.CLIENT_ID, "clients")
+
     val createOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092",
-      "--entity-name", "my-client-id",
-      "--entity-type", "clients",
       "--alter",
       "--add-config", "consumer_byte_rate=20000,producer_byte_rate=10000",
-      "--delete-config", "request_percentage"))
+      "--delete-config", "request_percentage") ++ userArgs ++ clientIdArgs)
 
-    val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.CLIENT_ID -> "my-client-id")).asJava)
+    // Explicitly populate a HashMap to ensure nulls are recorded properly.
+    val entityMap = new java.util.HashMap[String, String]
+    userEntity.foreach(u => entityMap.put(ClientQuotaEntity.USER, u))
+    clientIdEntity.foreach(c => entityMap.put(ClientQuotaEntity.CLIENT_ID, c))
+    val entity = new ClientQuotaEntity(entityMap)
 
     var describedConfigs = false
     val describeFuture = new KafkaFutureImpl[util.Map[ClientQuotaEntity, util.Map[String, java.lang.Double]]]
@@ -424,12 +440,10 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
     val node = new Node(1, "localhost", 9092)
     val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) {
       override def describeClientQuotas(filter: ClientQuotaFilter, options: DescribeClientQuotasOptions): DescribeClientQuotasResult = {
-        assertEquals(1, filter.components.size)
         assertTrue(filter.strict)
-        val component = filter.components.asScala.head
-        assertEquals(ClientQuotaEntity.CLIENT_ID, component.entityType)
-        assertTrue(component.`match`.isPresent)
-        assertEquals("my-client-id", component.`match`.get)
+        val components = filter.components.asScala.toSeq
+        userComponent.foreach(c => assertTrue(components.contains(c)))
+        clientIdComponent.foreach(c => assertTrue(components.contains(c)))
         describedConfigs = true
         describeResult
       }
@@ -459,6 +473,18 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
   }
 
   @Test
+  def shouldAddClientConfig(): Unit = {
+    testShouldAddClientConfig(Some("test-user-1"), Some("test-client-1"))
+    testShouldAddClientConfig(Some("test-user-2"), Some(null))
+    testShouldAddClientConfig(Some("test-user-3"), None)
+    testShouldAddClientConfig(Some(null), Some("test-client-2"))
+    testShouldAddClientConfig(Some(null), Some(null))
+    testShouldAddClientConfig(Some(null), None)
+    testShouldAddClientConfig(None, Some("test-client-3"))
+    testShouldAddClientConfig(None, Some(null))
+  }
+
+  @Test
   def shouldAddTopicConfigUsingZookeeper(): Unit = {
     val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
       "--entity-name", "my-topic",