You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2024/03/26 00:04:16 UTC

(kafka) 02/02: Improve tests

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch KAFKA-16411
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit bf1900e9971cddd6c08e38baf40c9ae04dfc84cc
Author: Colin P. McCabe <cm...@apache.org>
AuthorDate: Mon Mar 25 17:01:37 2024 -0700

    Improve tests
---
 .../kafka/zk/ZkMigrationIntegrationTest.scala      | 22 +++++++++++++---------
 1 file changed, 13 insertions(+), 9 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index 16d68b3ae38..a3e5ecc0814 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.common.resource.ResourcePattern
 import org.apache.kafka.common.resource.ResourceType.TOPIC
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
-import org.apache.kafka.common.utils.SecurityUtils
+import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils}
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
 import org.apache.kafka.metadata.authorizer.StandardAcl
 import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
@@ -881,11 +881,14 @@ class ZkMigrationIntegrationTest {
   def alterClientQuotas(admin: Admin): AlterClientQuotasResult = {
     val quotas = new util.ArrayList[ClientQuotaAlteration]()
     quotas.add(new ClientQuotaAlteration(
-      new ClientQuotaEntity(Map("user" -> "user1").asJava),
+      new ClientQuotaEntity(Map("user" -> "user@1").asJava),
       List(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0)).asJava))
     quotas.add(new ClientQuotaAlteration(
-      new ClientQuotaEntity(Map("user" -> "user1", "client-id" -> "clientA").asJava),
+      new ClientQuotaEntity(Map("user" -> "user@1", "client-id" -> "clientA").asJava),
       List(new ClientQuotaAlteration.Op("consumer_byte_rate", 800.0), new ClientQuotaAlteration.Op("producer_byte_rate", 100.0)).asJava))
+    quotas.add(new ClientQuotaAlteration(
+      new ClientQuotaEntity(Collections.singletonMap("user", null)),
+      List(new ClientQuotaAlteration.Op("consumer_byte_rate", 900.0), new ClientQuotaAlteration.Op("producer_byte_rate", 100.0)).asJava))
     quotas.add(new ClientQuotaAlteration(
       new ClientQuotaEntity(Map("ip" -> "8.8.8.8").asJava),
       List(new ClientQuotaAlteration.Op("connection_creation_rate", 10.0)).asJava))
@@ -903,7 +906,7 @@ class ZkMigrationIntegrationTest {
     val alterations = new util.ArrayList[UserScramCredentialAlteration]()
     alterations.add(new UserScramCredentialUpsertion("user1",
         new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 8191), "password1"))
-    alterations.add(new UserScramCredentialUpsertion("user2",
+    alterations.add(new UserScramCredentialUpsertion("user@2",
         new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 8192), "password2"))
     admin.alterUserScramCredentials(alterations)
   }
@@ -918,20 +921,21 @@ class ZkMigrationIntegrationTest {
 
   def verifyClientQuotas(zkClient: KafkaZkClient): Unit = {
     TestUtils.retry(10000) {
-      assertEquals("1000", zkClient.getEntityConfigs(ConfigType.USER, "user1").getProperty("consumer_byte_rate"))
-      assertEquals("800", zkClient.getEntityConfigs("users/user1/clients", "clientA").getProperty("consumer_byte_rate"))
-      assertEquals("100", zkClient.getEntityConfigs("users/user1/clients", "clientA").getProperty("producer_byte_rate"))
+      assertEquals("1000", zkClient.getEntityConfigs(ConfigType.USER, Sanitizer.sanitize("user@1")).getProperty("consumer_byte_rate"))
+      assertEquals("900", zkClient.getEntityConfigs(ConfigType.USER, "<default>").getProperty("consumer_byte_rate"))
+      assertEquals("800", zkClient.getEntityConfigs("users/" + Sanitizer.sanitize("user@1") + "/clients", "clientA").getProperty("consumer_byte_rate"))
+      assertEquals("100", zkClient.getEntityConfigs("users/" + Sanitizer.sanitize("user@1") + "/clients", "clientA").getProperty("producer_byte_rate"))
       assertEquals("10", zkClient.getEntityConfigs(ConfigType.IP, "8.8.8.8").getProperty("connection_creation_rate"))
     }
   }
 
   def verifyUserScramCredentials(zkClient: KafkaZkClient): Unit = {
     TestUtils.retry(10000) {
-      val propertyValue1 = zkClient.getEntityConfigs(ConfigType.USER, "user1").getProperty("SCRAM-SHA-256")
+      val propertyValue1 = zkClient.getEntityConfigs(ConfigType.USER, Sanitizer.sanitize("user1")).getProperty("SCRAM-SHA-256")
       val scramCredentials1 = ScramCredentialUtils.credentialFromString(propertyValue1)
       assertEquals(8191, scramCredentials1.iterations)
 
-      val propertyValue2 = zkClient.getEntityConfigs(ConfigType.USER, "user2").getProperty("SCRAM-SHA-256")
+      val propertyValue2 = zkClient.getEntityConfigs(ConfigType.USER, Sanitizer.sanitize("user@2")).getProperty("SCRAM-SHA-256")
       assertNotNull(propertyValue2)
       val scramCredentials2 = ScramCredentialUtils.credentialFromString(propertyValue2)
       assertEquals(8192, scramCredentials2.iterations)