You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2024/03/27 17:57:30 UTC
(kafka) branch 3.6 updated: KAFKA-16411: Correctly migrate default client quota entities (#15584)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push:
new fbff947d259 KAFKA-16411: Correctly migrate default client quota entities (#15584)
fbff947d259 is described below
commit fbff947d25956100b1189fa4ee4ca64ea775df11
Author: Colin P. McCabe <cm...@apache.org>
AuthorDate: Wed Mar 27 09:38:11 2024 -0700
KAFKA-16411: Correctly migrate default client quota entities (#15584)
KAFKA-16222 fixed a bug whereby we didn't undo the name sanitization used on client quota entity names
stored in ZooKeeper. However, it incorrectly claimed to fix the handling of default client quota
entities. It also failed to correctly re-sanitize when syncronizing the data back to ZooKeeper.
This PR fixes ZkConfigMigrationClient to do the sanitization correctly on both the read and write
paths. We do de-sanitization before invoking the visitors, since after all it does not make sense to
do the same de-sanitization step in each and every visitor.
Additionally, this PR fixes a bug causing default entities to be converted incorrectly. For example,
ClientQuotaEntity(user -> null) is stored under the /config/users/<default> znode in ZooKeeper. In
KRaft it appears as a ClientQuotaRecord with EntityData(entityType=users, entityName=null).
Prior to this PR, this was being converted to a ClientQuotaRecord with EntityData(entityType=users,
entityName=""). That represents a quota on the user whose name is the empty string (yes, we allow
users to name themselves with the empty string, sadly.)
The confusion appears to have arisen because for TOPIC and BROKER configurations, the default
ConfigResource is indeed the one named with the empty (not null) string. For example, the default
topic configuration resource is ConfigResource(name="", type=TOPIC). However, things are different
for client quotas. Default client quota entities in KRaft (and also in AdminClient) are represented
by maps with null values. For example, the default User entity is represented by Map("user" ->
null). In retrospect, using a map with null values was a poor choice; a Map<String,
Optional<String>> would have made more sense. However, this is the way the API currently is and we
have to convert correctly.
There was an additional level of confusion present in KAFKA-16222 where someone thought that using
the ZooKeeper placeholder string "<default>" in the AdminClient API would yield a default client
quota entity. Thise seems to have been suggested by the ConfigEntityName class that was created
recently. In fact, <default> is not part of any public API in Kafka. Accordingly, this PR also
renames ConfigEntityName.DEFAULT to ZooKeeperInternals.DEFAULT_STRING, to make it clear that the
string <default> is just a detail of the ZooKeeper implementation. It is not used in the Kafka API
to indicate defaults. Hopefully this will avoid confusion in the future.
Finally, the PR also creates KRaftClusterTest.testDefaultClientQuotas to get extra test coverage of
setting default client quotas.
Reviewers: Manikumar Reddy <ma...@gmail.com>, Igor Soarez <so...@apple.com>
Conflicts: Do not backport the changes to create ZooKeeperInternals.DEFAULT_STRING to this branch,
to make the cherry-pick smaller.
---
.../main/scala/kafka/zk/ZkMigrationClient.scala | 4 -
.../zk/migration/ZkConfigMigrationClient.scala | 89 ++++++++++++++++------
.../kafka/server/KRaftClusterTest.scala | 63 +++++++++++++++
.../kafka/zk/ZkMigrationIntegrationTest.scala | 45 ++++++-----
.../zk/migration/ZkConfigMigrationClientTest.scala | 22 +++---
5 files changed, 167 insertions(+), 56 deletions(-)
diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
index 76e0b47aee8..a11a84c017b 100644
--- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
@@ -27,7 +27,6 @@ import org.apache.kafka.common.errors.ControllerMovedException
import org.apache.kafka.common.metadata._
import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.security.scram.ScramCredential
-import org.apache.kafka.common.utils.Sanitizer
import org.apache.kafka.common.{TopicIdPartition, Uuid}
import org.apache.kafka.metadata.DelegationTokenData
import org.apache.kafka.metadata.PartitionRegistration
@@ -226,9 +225,6 @@ class ZkMigrationClient(
entityDataList: util.List[ClientQuotaRecord.EntityData],
quotas: util.Map[String, lang.Double]
): Unit = {
- entityDataList.forEach(entityData => {
- entityData.setEntityName(Sanitizer.desanitize(entityData.entityName()))
- })
val batch = new util.ArrayList[ApiMessageAndVersion]()
quotas.forEach((key, value) => {
batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
diff --git a/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala b/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala
index 55fb048e686..844c1aabc4c 100644
--- a/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala
@@ -21,6 +21,7 @@ import kafka.server.{ConfigEntityName, ConfigType, DynamicBrokerConfig, DynamicC
import kafka.utils.{Logging, PasswordEncoder}
import kafka.zk.ZkMigrationClient.{logAndRethrow, wrapZkException}
import kafka.zk._
+import kafka.zk.migration.ZkConfigMigrationClient.getSanitizedClientQuotaZNodeName
import kafka.zookeeper.{CreateRequest, DeleteRequest, SetDataRequest}
import org.apache.kafka.clients.admin.ScramMechanism
import org.apache.kafka.common.config.types.Password
@@ -29,6 +30,7 @@ import org.apache.kafka.common.errors.InvalidRequestException
import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
+import org.apache.kafka.common.utils.Sanitizer
import org.apache.kafka.metadata.migration.ConfigMigrationClient.ClientQuotaVisitor
import org.apache.kafka.metadata.migration.{ConfigMigrationClient, MigrationClientException, ZkMigrationLeadershipState}
import org.apache.zookeeper.KeeperException.Code
@@ -49,11 +51,10 @@ class ZkConfigMigrationClient(
/**
- * In ZK, we use the special string "<default>" to represent the default entity.
- * In KRaft, we use an empty string. This method builds an EntityData that converts the special ZK string
- * to the special KRaft string.
+ * In ZK, we use the special string "<default>" to represent the default config entity.
+ * In KRaft, we use an empty string. This method converts the between the two conventions.
*/
- private def fromZkEntityName(entityName: String): String = {
+ private def fromZkConfigEntityName(entityName: String): String = {
if (entityName.equals(ConfigEntityName.Default)) {
""
} else {
@@ -61,7 +62,7 @@ class ZkConfigMigrationClient(
}
}
- private def toZkEntityName(entityName: String): String = {
+ private def toZkConfigEntityName(entityName: String): String = {
if (entityName.isEmpty) {
ConfigEntityName.Default
} else {
@@ -69,22 +70,35 @@ class ZkConfigMigrationClient(
}
}
- private def buildEntityData(entityType: String, entityName: String): EntityData = {
- new EntityData().setEntityType(entityType).setEntityName(fromZkEntityName(entityName))
+ private def buildClientQuotaEntityData(
+ entityType: String,
+ znodeName: String
+ ): EntityData = {
+ val result = new EntityData().setEntityType(entityType)
+ if (znodeName.equals(ConfigEntityName.Default)) {
+ // Default __client quota__ entity names are null. This is different than default __configs__,
+ // which have their names set to the empty string instead.
+ result.setEntityName(null)
+ } else {
+ // ZNode names are sanitized before being stored in ZooKeeper.
+ // For example, @ is turned into %40. Undo the sanitization here.
+ result.setEntityName(Sanitizer.desanitize(znodeName))
+ }
+ result
}
override def iterateClientQuotas(visitor: ClientQuotaVisitor): Unit = {
def migrateEntityType(zkEntityType: String, entityType: String): Unit = {
adminZkClient.fetchAllEntityConfigs(zkEntityType).foreach { case (name, props) =>
- val entity = List(buildEntityData(entityType, name)).asJava
+ val entity = List(buildClientQuotaEntityData(entityType, name)).asJava
ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { mechanism =>
val propertyValue = props.getProperty(mechanism.mechanismName)
if (propertyValue != null) {
val scramCredentials = ScramCredentialUtils.credentialFromString(propertyValue)
logAndRethrow(this, s"Error in client quota visitor for SCRAM credential. User was $entity.") {
- visitor.visitScramCredential(name, mechanism, scramCredentials)
+ visitor.visitScramCredential(Sanitizer.desanitize(name), mechanism, scramCredentials)
}
props.remove(mechanism.mechanismName)
}
@@ -105,14 +119,14 @@ class ZkConfigMigrationClient(
migrateEntityType(ConfigType.User, ClientQuotaEntity.USER)
migrateEntityType(ConfigType.Client, ClientQuotaEntity.CLIENT_ID)
- adminZkClient.fetchAllChildEntityConfigs(ConfigType.User, ConfigType.Client).foreach { case (name, props) =>
+ adminZkClient.fetchAllChildEntityConfigs(ConfigType.User, ConfigType.Client).foreach { case (znodePath, props) =>
// Taken from ZkAdminManager
- val components = name.split("/")
+ val components = znodePath.split("/")
if (components.size != 3 || components(1) != "clients")
- throw new IllegalArgumentException(s"Unexpected config path: ${name}")
+ throw new IllegalArgumentException(s"Unexpected config path: ${znodePath}")
val entity = List(
- buildEntityData(ClientQuotaEntity.USER, components(0)),
- buildEntityData(ClientQuotaEntity.CLIENT_ID, components(2))
+ buildClientQuotaEntityData(ClientQuotaEntity.USER, components(0)),
+ buildClientQuotaEntityData(ClientQuotaEntity.CLIENT_ID, components(2))
)
val quotaMap = props.asScala.map { case (key, value) =>
val doubleValue = try lang.Double.valueOf(value) catch {
@@ -132,7 +146,7 @@ class ZkConfigMigrationClient(
override def iterateBrokerConfigs(configConsumer: BiConsumer[String, util.Map[String, String]]): Unit = {
val brokerEntities = zkClient.getAllEntitiesWithConfig(ConfigType.Broker)
zkClient.getEntitiesConfigs(ConfigType.Broker, brokerEntities.toSet).foreach { case (broker, props) =>
- val brokerResource = fromZkEntityName(broker)
+ val brokerResource = fromZkConfigEntityName(broker)
val decodedProps = props.asScala.map { case (key, value) =>
if (DynamicBrokerConfig.isPasswordConfig(key))
key -> passwordEncoder.decode(value).value
@@ -154,7 +168,7 @@ class ZkConfigMigrationClient(
}
override def readTopicConfigs(topicName: String, configConsumer: Consumer[util.Map[String, String]]): Unit = {
- val topicResource = fromZkEntityName(topicName)
+ val topicResource = fromZkConfigEntityName(topicName)
val props = zkClient.getEntityConfigs(ConfigType.Topic, topicResource)
val decodedProps = props.asScala.map { case (key, value) =>
if (DynamicBrokerConfig.isPasswordConfig(key))
@@ -179,7 +193,7 @@ class ZkConfigMigrationClient(
case _ => None
}
- val configName = toZkEntityName(configResource.name())
+ val configName = toZkConfigEntityName(configResource.name())
if (configType.isDefined) {
val props = new Properties()
configMap.forEach { case (key, value) =>
@@ -218,7 +232,7 @@ class ZkConfigMigrationClient(
case _ => None
}
- val configName = toZkEntityName(configResource.name())
+ val configName = toZkConfigEntityName(configResource.name())
if (configType.isDefined) {
val path = ConfigEntityZNode.path(configType.get, configName)
val requests = Seq(DeleteRequest(path, ZkVersion.MatchAnyVersion))
@@ -247,10 +261,9 @@ class ZkConfigMigrationClient(
scram: util.Map[String, String],
state: ZkMigrationLeadershipState
): ZkMigrationLeadershipState = wrapZkException {
- val entityMap = entity.asScala
- val user = entityMap.get(ClientQuotaEntity.USER).map(toZkEntityName)
- val client = entityMap.get(ClientQuotaEntity.CLIENT_ID).map(toZkEntityName)
- val ip = entityMap.get(ClientQuotaEntity.IP).map(toZkEntityName)
+ val user: Option[String] = getSanitizedClientQuotaZNodeName(entity, ClientQuotaEntity.USER)
+ val client: Option[String] = getSanitizedClientQuotaZNodeName(entity, ClientQuotaEntity.CLIENT_ID)
+ val ip: Option[String] = getSanitizedClientQuotaZNodeName(entity, ClientQuotaEntity.IP)
val props = new Properties()
val (configType, path, configKeys) = if (user.isDefined && client.isEmpty) {
@@ -348,3 +361,35 @@ class ZkConfigMigrationClient(
}
}
+object ZkConfigMigrationClient {
+ /**
+ * Find the znode name to use for a ClientQuotaEntity.
+ *
+ * @param entity The client quota entity map. See org.apache.kafka.common.ClientQuotaEntity.
+ * @param component The component that we want a znode name for.
+ * @return Some(znodeName) if there is a znode path; None otherwise.
+ */
+ def getSanitizedClientQuotaZNodeName(
+ entity: util.Map[String, String],
+ component: String
+ ): Option[String] = {
+ if (!entity.containsKey(component)) {
+ // There is no znode path, because the component wasn't found. For example, if the
+ // entity was (user -> "bob") and our component was "ip", we would return None here.
+ None
+ } else {
+ val rawValue = entity.get(component)
+ if (rawValue == null) {
+ // A raw value of null means this is a default entity. For example, (user -> null) means
+ // the default user. Yes, this means we stored a null value in the map and it did not mean
+ // "not present." This is an unfortunate API that should be revisited at some point.
+ Some(ConfigEntityName.Default)
+ } else {
+ // We found a non-null value, and now we need to sanitize it. For example, "c@@ldude" will
+ // turn into c%40%40ldude, so that we can use it as a znode name in ZooKeeper.
+ Some(Sanitizer.sanitize(rawValue))
+ }
+ }
+ }
+}
+
diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index dbea33f9058..48b985e1732 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.metadata.{ConfigRecord, FeatureLevelRecord}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors._
+import org.apache.kafka.common.quota.ClientQuotaAlteration.Op
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
import org.apache.kafka.common.requests.{ApiError, DescribeClusterRequest, DescribeClusterResponse}
import org.apache.kafka.common.security.auth.KafkaPrincipal
@@ -324,6 +325,68 @@ class KRaftClusterTest {
}
}
+ def setConsumerByteRate(
+ admin: Admin,
+ entity: ClientQuotaEntity,
+ value: Long
+ ): Unit = {
+ admin.alterClientQuotas(Collections.singletonList(
+ new ClientQuotaAlteration(entity, Collections.singletonList(
+ new Op("consumer_byte_rate", value.doubleValue()))))).
+ all().get()
+ }
+
+ def getConsumerByteRates(admin: Admin): Map[ClientQuotaEntity, Long] = {
+ val allFilter = ClientQuotaFilter.contains(Collections.emptyList())
+ val results = new java.util.HashMap[ClientQuotaEntity, Long]
+ admin.describeClientQuotas(allFilter).entities().get().forEach {
+ case (entity, entityMap) =>
+ Option(entityMap.get("consumer_byte_rate")).foreach {
+ case value => results.put(entity, value.longValue())
+ }
+ }
+ results.asScala.toMap
+ }
+
+ @Test
+ def testDefaultClientQuotas(): Unit = {
+ val cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setNumBrokerNodes(1).
+ setNumControllerNodes(1).build()).build()
+ try {
+ cluster.format()
+ cluster.startup()
+ TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState == BrokerState.RUNNING,
+ "Broker never made it to RUNNING state.")
+ val admin = Admin.create(cluster.clientProperties())
+ try {
+ val defaultUser = new ClientQuotaEntity(Collections.singletonMap[String, String]("user", null))
+ val bobUser = new ClientQuotaEntity(Collections.singletonMap[String, String]("user", "bob"))
+ TestUtils.retry(30000) {
+ assertEquals(Map(), getConsumerByteRates(admin))
+ }
+ setConsumerByteRate(admin, defaultUser, 100L)
+ TestUtils.retry(30000) {
+ assertEquals(Map(
+ defaultUser -> 100L
+ ), getConsumerByteRates(admin))
+ }
+ setConsumerByteRate(admin, bobUser, 1000L)
+ TestUtils.retry(30000) {
+ assertEquals(Map(
+ defaultUser -> 100L,
+ bobUser -> 1000L
+ ), getConsumerByteRates(admin))
+ }
+ } finally {
+ admin.close()
+ }
+ } finally {
+ cluster.close()
+ }
+ }
+
@Test
def testCreateClusterWithAdvertisedPortZero(): Unit = {
val brokerPropertyOverrides: (TestKitNodes, BrokerNode) => Map[String, String] = (nodes, _) => Map(
diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index f8fe82ed9d0..187e89e770f 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -17,7 +17,7 @@
package kafka.zk
import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
-import kafka.server.{ConfigEntityName, ConfigType, ControllerRequestCompletionHandler, KafkaConfig}
+import kafka.server.{ConfigType, ControllerRequestCompletionHandler, KafkaConfig}
import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance}
import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTemplate, ClusterTest, Type}
import kafka.test.junit.ClusterTestExtensions
@@ -40,7 +40,7 @@ import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.resource.ResourceType.TOPIC
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
-import org.apache.kafka.common.utils.SecurityUtils
+import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils}
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
import org.apache.kafka.metadata.authorizer.StandardAcl
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
@@ -216,11 +216,11 @@ class ZkMigrationIntegrationTest {
createTopicResult.all().get(60, TimeUnit.SECONDS)
val quotas = new util.ArrayList[ClientQuotaAlteration]()
- val defaultUserEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> ConfigEntityName.Default).asJava)
+ val defaultUserEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, null))
quotas.add(new ClientQuotaAlteration(defaultUserEntity, List(new ClientQuotaAlteration.Op("consumer_byte_rate", 900.0)).asJava))
- val defaultClientIdEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> ConfigEntityName.Default).asJava)
+ val defaultClientIdEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.CLIENT_ID, null))
quotas.add(new ClientQuotaAlteration(defaultClientIdEntity, List(new ClientQuotaAlteration.Op("consumer_byte_rate", 900.0)).asJava))
- val defaultIpEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.IP -> null.asInstanceOf[String]).asJava)
+ val defaultIpEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.IP, null))
quotas.add(new ClientQuotaAlteration(defaultIpEntity, List(new ClientQuotaAlteration.Op("connection_creation_rate", 9.0)).asJava))
val userEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user/1@prod").asJava)
quotas.add(new ClientQuotaAlteration(userEntity, List(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0)).asJava))
@@ -264,13 +264,14 @@ class ZkMigrationIntegrationTest {
assertEquals(10, image.topics().getTopic("test-topic-3").partitions().size())
val clientQuotas = image.clientQuotas().entities()
- assertEquals(6, clientQuotas.size())
- assertEquals(true, clientQuotas.containsKey(defaultUserEntity))
- assertEquals(true, clientQuotas.containsKey(defaultClientIdEntity))
- assertEquals(true, clientQuotas.containsKey(new ClientQuotaEntity(Map(ClientQuotaEntity.IP -> "").asJava))) // default ip
- assertEquals(true, clientQuotas.containsKey(userEntity))
- assertEquals(true, clientQuotas.containsKey(userClientEntity))
- assertEquals(true, clientQuotas.containsKey(ipEntity))
+ assertEquals(new java.util.HashSet[ClientQuotaEntity](java.util.Arrays.asList(
+ defaultUserEntity,
+ defaultClientIdEntity,
+ defaultIpEntity,
+ userEntity,
+ userClientEntity,
+ ipEntity
+ )), clientQuotas.keySet())
}
migrationState = migrationClient.releaseControllerLeadership(migrationState)
@@ -832,11 +833,14 @@ class ZkMigrationIntegrationTest {
def alterClientQuotas(admin: Admin): AlterClientQuotasResult = {
val quotas = new util.ArrayList[ClientQuotaAlteration]()
quotas.add(new ClientQuotaAlteration(
- new ClientQuotaEntity(Map("user" -> "user1").asJava),
+ new ClientQuotaEntity(Map("user" -> "user@1").asJava),
List(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0)).asJava))
quotas.add(new ClientQuotaAlteration(
- new ClientQuotaEntity(Map("user" -> "user1", "client-id" -> "clientA").asJava),
+ new ClientQuotaEntity(Map("user" -> "user@1", "client-id" -> "clientA").asJava),
List(new ClientQuotaAlteration.Op("consumer_byte_rate", 800.0), new ClientQuotaAlteration.Op("producer_byte_rate", 100.0)).asJava))
+ quotas.add(new ClientQuotaAlteration(
+ new ClientQuotaEntity(Collections.singletonMap("user", null)),
+ List(new ClientQuotaAlteration.Op("consumer_byte_rate", 900.0), new ClientQuotaAlteration.Op("producer_byte_rate", 100.0)).asJava))
quotas.add(new ClientQuotaAlteration(
new ClientQuotaEntity(Map("ip" -> "8.8.8.8").asJava),
List(new ClientQuotaAlteration.Op("connection_creation_rate", 10.0)).asJava))
@@ -854,7 +858,7 @@ class ZkMigrationIntegrationTest {
val alterations = new util.ArrayList[UserScramCredentialAlteration]()
alterations.add(new UserScramCredentialUpsertion("user1",
new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 8191), "password1"))
- alterations.add(new UserScramCredentialUpsertion("user2",
+ alterations.add(new UserScramCredentialUpsertion("user@2",
new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 8192), "password2"))
admin.alterUserScramCredentials(alterations)
}
@@ -869,20 +873,21 @@ class ZkMigrationIntegrationTest {
def verifyClientQuotas(zkClient: KafkaZkClient): Unit = {
TestUtils.retry(10000) {
- assertEquals("1000", zkClient.getEntityConfigs(ConfigType.User, "user1").getProperty("consumer_byte_rate"))
- assertEquals("800", zkClient.getEntityConfigs("users/user1/clients", "clientA").getProperty("consumer_byte_rate"))
- assertEquals("100", zkClient.getEntityConfigs("users/user1/clients", "clientA").getProperty("producer_byte_rate"))
+ assertEquals("1000", zkClient.getEntityConfigs(ConfigType.User, Sanitizer.sanitize("user@1")).getProperty("consumer_byte_rate"))
+ assertEquals("900", zkClient.getEntityConfigs(ConfigType.User, "<default>").getProperty("consumer_byte_rate"))
+ assertEquals("800", zkClient.getEntityConfigs("users/" + Sanitizer.sanitize("user@1") + "/clients", "clientA").getProperty("consumer_byte_rate"))
+ assertEquals("100", zkClient.getEntityConfigs("users/" + Sanitizer.sanitize("user@1") + "/clients", "clientA").getProperty("producer_byte_rate"))
assertEquals("10", zkClient.getEntityConfigs(ConfigType.Ip, "8.8.8.8").getProperty("connection_creation_rate"))
}
}
def verifyUserScramCredentials(zkClient: KafkaZkClient): Unit = {
TestUtils.retry(10000) {
- val propertyValue1 = zkClient.getEntityConfigs(ConfigType.User, "user1").getProperty("SCRAM-SHA-256")
+ val propertyValue1 = zkClient.getEntityConfigs(ConfigType.User, Sanitizer.sanitize("user1")).getProperty("SCRAM-SHA-256")
val scramCredentials1 = ScramCredentialUtils.credentialFromString(propertyValue1)
assertEquals(8191, scramCredentials1.iterations)
- val propertyValue2 = zkClient.getEntityConfigs(ConfigType.User, "user2").getProperty("SCRAM-SHA-256")
+ val propertyValue2 = zkClient.getEntityConfigs(ConfigType.User, Sanitizer.sanitize("user@2")).getProperty("SCRAM-SHA-256")
assertNotNull(propertyValue2)
val scramCredentials2 = ScramCredentialUtils.credentialFromString(propertyValue2)
assertEquals(8192, scramCredentials2.iterations)
diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala
index 243209d23bf..f04720399a8 100644
--- a/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala
@@ -141,15 +141,17 @@ class ZkConfigMigrationClientTest extends ZkMigrationTestHarness {
RecordTestUtils.replayAllBatches(delta, batches)
val image = delta.apply()
- assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("user" -> "").asJava)))
- assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("user" -> "user1").asJava)))
- assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("user" -> "user1", "client-id" -> "clientA").asJava)))
- assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("user" -> "", "client-id" -> "").asJava)))
- assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("user" -> "", "client-id" -> "clientA").asJava)))
- assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("client-id" -> "").asJava)))
- assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("client-id" -> "clientB").asJava)))
- assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("ip" -> "1.1.1.1").asJava)))
- assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("ip" -> "").asJava)))
+ assertEquals(new util.HashSet[ClientQuotaEntity](java.util.Arrays.asList(
+ new ClientQuotaEntity(Map("user" -> null.asInstanceOf[String]).asJava),
+ new ClientQuotaEntity(Map("user" -> "user1").asJava),
+ new ClientQuotaEntity(Map("user" -> "user1", "client-id" -> "clientA").asJava),
+ new ClientQuotaEntity(Map("user" -> null.asInstanceOf[String], "client-id" -> null.asInstanceOf[String]).asJava),
+ new ClientQuotaEntity(Map("user" -> null.asInstanceOf[String], "client-id" -> "clientA").asJava),
+ new ClientQuotaEntity(Map("client-id" -> null.asInstanceOf[String]).asJava),
+ new ClientQuotaEntity(Map("client-id" -> "clientB").asJava),
+ new ClientQuotaEntity(Map("ip" -> "1.1.1.1").asJava),
+ new ClientQuotaEntity(Map("ip" -> null.asInstanceOf[String]).asJava))),
+ image.entities().keySet())
}
@Test
@@ -185,7 +187,7 @@ class ZkConfigMigrationClientTest extends ZkMigrationTestHarness {
assertEquals(4, migrationState.migrationZkVersion())
migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
- Map(ClientQuotaEntity.USER -> ""),
+ Map(ClientQuotaEntity.USER -> null.asInstanceOf[String]),
Map(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 200.0),
ConfigType.User, "<default>")
assertEquals(5, migrationState.migrationZkVersion())