You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2024/03/26 00:04:16 UTC
(kafka) 02/02: Improve tests
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch KAFKA-16411
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit bf1900e9971cddd6c08e38baf40c9ae04dfc84cc
Author: Colin P. McCabe <cm...@apache.org>
AuthorDate: Mon Mar 25 17:01:37 2024 -0700
Improve tests
---
.../kafka/zk/ZkMigrationIntegrationTest.scala | 22 +++++++++++++---------
1 file changed, 13 insertions(+), 9 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index 16d68b3ae38..a3e5ecc0814 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.resource.ResourceType.TOPIC
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
-import org.apache.kafka.common.utils.SecurityUtils
+import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils}
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
import org.apache.kafka.metadata.authorizer.StandardAcl
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
@@ -881,11 +881,14 @@ class ZkMigrationIntegrationTest {
def alterClientQuotas(admin: Admin): AlterClientQuotasResult = {
val quotas = new util.ArrayList[ClientQuotaAlteration]()
quotas.add(new ClientQuotaAlteration(
- new ClientQuotaEntity(Map("user" -> "user1").asJava),
+ new ClientQuotaEntity(Map("user" -> "user@1").asJava),
List(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0)).asJava))
quotas.add(new ClientQuotaAlteration(
- new ClientQuotaEntity(Map("user" -> "user1", "client-id" -> "clientA").asJava),
+ new ClientQuotaEntity(Map("user" -> "user@1", "client-id" -> "clientA").asJava),
List(new ClientQuotaAlteration.Op("consumer_byte_rate", 800.0), new ClientQuotaAlteration.Op("producer_byte_rate", 100.0)).asJava))
+ quotas.add(new ClientQuotaAlteration(
+ new ClientQuotaEntity(Collections.singletonMap("user", null)),
+ List(new ClientQuotaAlteration.Op("consumer_byte_rate", 900.0), new ClientQuotaAlteration.Op("producer_byte_rate", 100.0)).asJava))
quotas.add(new ClientQuotaAlteration(
new ClientQuotaEntity(Map("ip" -> "8.8.8.8").asJava),
List(new ClientQuotaAlteration.Op("connection_creation_rate", 10.0)).asJava))
@@ -903,7 +906,7 @@ class ZkMigrationIntegrationTest {
val alterations = new util.ArrayList[UserScramCredentialAlteration]()
alterations.add(new UserScramCredentialUpsertion("user1",
new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 8191), "password1"))
- alterations.add(new UserScramCredentialUpsertion("user2",
+ alterations.add(new UserScramCredentialUpsertion("user@2",
new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 8192), "password2"))
admin.alterUserScramCredentials(alterations)
}
@@ -918,20 +921,21 @@ class ZkMigrationIntegrationTest {
def verifyClientQuotas(zkClient: KafkaZkClient): Unit = {
TestUtils.retry(10000) {
- assertEquals("1000", zkClient.getEntityConfigs(ConfigType.USER, "user1").getProperty("consumer_byte_rate"))
- assertEquals("800", zkClient.getEntityConfigs("users/user1/clients", "clientA").getProperty("consumer_byte_rate"))
- assertEquals("100", zkClient.getEntityConfigs("users/user1/clients", "clientA").getProperty("producer_byte_rate"))
+ assertEquals("1000", zkClient.getEntityConfigs(ConfigType.USER, Sanitizer.sanitize("user@1")).getProperty("consumer_byte_rate"))
+ assertEquals("900", zkClient.getEntityConfigs(ConfigType.USER, "<default>").getProperty("consumer_byte_rate"))
+ assertEquals("800", zkClient.getEntityConfigs("users/" + Sanitizer.sanitize("user@1") + "/clients", "clientA").getProperty("consumer_byte_rate"))
+ assertEquals("100", zkClient.getEntityConfigs("users/" + Sanitizer.sanitize("user@1") + "/clients", "clientA").getProperty("producer_byte_rate"))
assertEquals("10", zkClient.getEntityConfigs(ConfigType.IP, "8.8.8.8").getProperty("connection_creation_rate"))
}
}
def verifyUserScramCredentials(zkClient: KafkaZkClient): Unit = {
TestUtils.retry(10000) {
- val propertyValue1 = zkClient.getEntityConfigs(ConfigType.USER, "user1").getProperty("SCRAM-SHA-256")
+ val propertyValue1 = zkClient.getEntityConfigs(ConfigType.USER, Sanitizer.sanitize("user1")).getProperty("SCRAM-SHA-256")
val scramCredentials1 = ScramCredentialUtils.credentialFromString(propertyValue1)
assertEquals(8191, scramCredentials1.iterations)
- val propertyValue2 = zkClient.getEntityConfigs(ConfigType.USER, "user2").getProperty("SCRAM-SHA-256")
+ val propertyValue2 = zkClient.getEntityConfigs(ConfigType.USER, Sanitizer.sanitize("user@2")).getProperty("SCRAM-SHA-256")
assertNotNull(propertyValue2)
val scramCredentials2 = ScramCredentialUtils.credentialFromString(propertyValue2)
assertEquals(8192, scramCredentials2.iterations)