You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2024/03/27 17:57:30 UTC

(kafka) branch 3.6 updated: KAFKA-16411: Correctly migrate default client quota entities (#15584)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new fbff947d259 KAFKA-16411: Correctly migrate default client quota entities (#15584)
fbff947d259 is described below

commit fbff947d25956100b1189fa4ee4ca64ea775df11
Author: Colin P. McCabe <cm...@apache.org>
AuthorDate: Wed Mar 27 09:38:11 2024 -0700

    KAFKA-16411: Correctly migrate default client quota entities (#15584)
    
    KAFKA-16222 fixed a bug whereby we didn't undo the name sanitization used on client quota entity names
    stored in ZooKeeper. However, it incorrectly claimed to fix the handling of default client quota
    entities. It also failed to correctly re-sanitize when syncronizing the data back to ZooKeeper.
    
    This PR fixes ZkConfigMigrationClient to do the sanitization correctly on both the read and write
    paths. We do de-sanitization before invoking the visitors, since after all it does not make sense to
    do the same de-sanitization step in each and every visitor.
    
    Additionally, this PR fixes a bug causing default entities to be converted incorrectly. For example,
    ClientQuotaEntity(user -> null) is stored under the /config/users/<default> znode in ZooKeeper. In
    KRaft it appears as a ClientQuotaRecord with EntityData(entityType=users, entityName=null).
    Prior to this PR, this was being converted to a ClientQuotaRecord with EntityData(entityType=users,
    entityName=""). That represents a quota on the user whose name is the empty string (yes, we allow
    users to name themselves with the empty string, sadly.)
    
    The confusion appears to have arisen because for TOPIC and BROKER configurations, the default
    ConfigResource is indeed the one named with the empty (not null) string. For example, the default
    topic configuration resource is ConfigResource(name="", type=TOPIC).  However, things are different
    for client quotas. Default client quota entities in KRaft (and also in AdminClient) are represented
    by maps with null values. For example, the default User entity is represented by Map("user" ->
    null).  In retrospect, using a map with null values was a poor choice; a Map<String,
    Optional<String>> would have made more sense. However, this is the way the API currently is and we
    have to convert correctly.
    
    There was an additional level of confusion present in KAFKA-16222 where someone thought that using
    the ZooKeeper placeholder string "<default>" in the AdminClient API would yield a default client
    quota entity. Thise seems to have been suggested by the ConfigEntityName class that was created
    recently. In fact, <default> is not part of any public API in Kafka. Accordingly, this PR also
    renames ConfigEntityName.DEFAULT to ZooKeeperInternals.DEFAULT_STRING, to make it clear that the
    string <default> is just a detail of the ZooKeeper implementation.  It is not used in the Kafka API
    to indicate defaults. Hopefully this will avoid confusion in the future.
    
    Finally, the PR also creates KRaftClusterTest.testDefaultClientQuotas to get extra test coverage of
    setting default client quotas.
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>, Igor Soarez <so...@apple.com>
    
    Conflicts: Do not backport the changes to create ZooKeeperInternals.DEFAULT_STRING to this branch,
    to make the cherry-pick smaller.
---
 .../main/scala/kafka/zk/ZkMigrationClient.scala    |  4 -
 .../zk/migration/ZkConfigMigrationClient.scala     | 89 ++++++++++++++++------
 .../kafka/server/KRaftClusterTest.scala            | 63 +++++++++++++++
 .../kafka/zk/ZkMigrationIntegrationTest.scala      | 45 ++++++-----
 .../zk/migration/ZkConfigMigrationClientTest.scala | 22 +++---
 5 files changed, 167 insertions(+), 56 deletions(-)

diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
index 76e0b47aee8..a11a84c017b 100644
--- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
@@ -27,7 +27,6 @@ import org.apache.kafka.common.errors.ControllerMovedException
 import org.apache.kafka.common.metadata._
 import org.apache.kafka.common.resource.ResourcePattern
 import org.apache.kafka.common.security.scram.ScramCredential
-import org.apache.kafka.common.utils.Sanitizer
 import org.apache.kafka.common.{TopicIdPartition, Uuid}
 import org.apache.kafka.metadata.DelegationTokenData
 import org.apache.kafka.metadata.PartitionRegistration
@@ -226,9 +225,6 @@ class ZkMigrationClient(
         entityDataList: util.List[ClientQuotaRecord.EntityData],
         quotas: util.Map[String, lang.Double]
       ): Unit = {
-        entityDataList.forEach(entityData => {
-          entityData.setEntityName(Sanitizer.desanitize(entityData.entityName()))
-        })
         val batch = new util.ArrayList[ApiMessageAndVersion]()
         quotas.forEach((key, value) => {
           batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
diff --git a/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala b/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala
index 55fb048e686..844c1aabc4c 100644
--- a/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala
@@ -21,6 +21,7 @@ import kafka.server.{ConfigEntityName, ConfigType, DynamicBrokerConfig, DynamicC
 import kafka.utils.{Logging, PasswordEncoder}
 import kafka.zk.ZkMigrationClient.{logAndRethrow, wrapZkException}
 import kafka.zk._
+import kafka.zk.migration.ZkConfigMigrationClient.getSanitizedClientQuotaZNodeName
 import kafka.zookeeper.{CreateRequest, DeleteRequest, SetDataRequest}
 import org.apache.kafka.clients.admin.ScramMechanism
 import org.apache.kafka.common.config.types.Password
@@ -29,6 +30,7 @@ import org.apache.kafka.common.errors.InvalidRequestException
 import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
 import org.apache.kafka.common.quota.ClientQuotaEntity
 import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
+import org.apache.kafka.common.utils.Sanitizer
 import org.apache.kafka.metadata.migration.ConfigMigrationClient.ClientQuotaVisitor
 import org.apache.kafka.metadata.migration.{ConfigMigrationClient, MigrationClientException, ZkMigrationLeadershipState}
 import org.apache.zookeeper.KeeperException.Code
@@ -49,11 +51,10 @@ class ZkConfigMigrationClient(
 
 
   /**
-   * In ZK, we use the special string "&lt;default&gt;" to represent the default entity.
-   * In KRaft, we use an empty string. This method builds an EntityData that converts the special ZK string
-   * to the special KRaft string.
+   * In ZK, we use the special string "&lt;default&gt;" to represent the default config entity.
+   * In KRaft, we use an empty string. This method converts the between the two conventions.
    */
-  private def fromZkEntityName(entityName: String): String = {
+  private def fromZkConfigEntityName(entityName: String): String = {
     if (entityName.equals(ConfigEntityName.Default)) {
       ""
     } else {
@@ -61,7 +62,7 @@ class ZkConfigMigrationClient(
     }
   }
 
-  private def toZkEntityName(entityName: String): String = {
+  private def toZkConfigEntityName(entityName: String): String = {
     if (entityName.isEmpty) {
       ConfigEntityName.Default
     } else {
@@ -69,22 +70,35 @@ class ZkConfigMigrationClient(
     }
   }
 
-  private def buildEntityData(entityType: String, entityName: String): EntityData = {
-    new EntityData().setEntityType(entityType).setEntityName(fromZkEntityName(entityName))
+  private def buildClientQuotaEntityData(
+    entityType: String,
+    znodeName: String
+  ): EntityData = {
+    val result = new EntityData().setEntityType(entityType)
+    if (znodeName.equals(ConfigEntityName.Default)) {
+      // Default __client quota__ entity names are null. This is different than default __configs__,
+      // which have their names set to the empty string instead.
+      result.setEntityName(null)
+    } else {
+      // ZNode names are sanitized before being stored in ZooKeeper.
+      // For example, @ is turned into %40. Undo the sanitization here.
+      result.setEntityName(Sanitizer.desanitize(znodeName))
+    }
+    result
   }
 
 
   override def iterateClientQuotas(visitor: ClientQuotaVisitor): Unit = {
     def migrateEntityType(zkEntityType: String, entityType: String): Unit = {
       adminZkClient.fetchAllEntityConfigs(zkEntityType).foreach { case (name, props) =>
-        val entity = List(buildEntityData(entityType, name)).asJava
+        val entity = List(buildClientQuotaEntityData(entityType, name)).asJava
 
         ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { mechanism =>
           val propertyValue = props.getProperty(mechanism.mechanismName)
           if (propertyValue != null) {
             val scramCredentials = ScramCredentialUtils.credentialFromString(propertyValue)
             logAndRethrow(this, s"Error in client quota visitor for SCRAM credential. User was $entity.") {
-              visitor.visitScramCredential(name, mechanism, scramCredentials)
+              visitor.visitScramCredential(Sanitizer.desanitize(name), mechanism, scramCredentials)
             }
             props.remove(mechanism.mechanismName)
           }
@@ -105,14 +119,14 @@ class ZkConfigMigrationClient(
     migrateEntityType(ConfigType.User, ClientQuotaEntity.USER)
     migrateEntityType(ConfigType.Client, ClientQuotaEntity.CLIENT_ID)
 
-    adminZkClient.fetchAllChildEntityConfigs(ConfigType.User, ConfigType.Client).foreach { case (name, props) =>
+    adminZkClient.fetchAllChildEntityConfigs(ConfigType.User, ConfigType.Client).foreach { case (znodePath, props) =>
       // Taken from ZkAdminManager
-      val components = name.split("/")
+      val components = znodePath.split("/")
       if (components.size != 3 || components(1) != "clients")
-        throw new IllegalArgumentException(s"Unexpected config path: ${name}")
+        throw new IllegalArgumentException(s"Unexpected config path: ${znodePath}")
       val entity = List(
-        buildEntityData(ClientQuotaEntity.USER, components(0)),
-        buildEntityData(ClientQuotaEntity.CLIENT_ID, components(2))
+        buildClientQuotaEntityData(ClientQuotaEntity.USER, components(0)),
+        buildClientQuotaEntityData(ClientQuotaEntity.CLIENT_ID, components(2))
       )
       val quotaMap = props.asScala.map { case (key, value) =>
         val doubleValue = try lang.Double.valueOf(value) catch {
@@ -132,7 +146,7 @@ class ZkConfigMigrationClient(
   override def iterateBrokerConfigs(configConsumer: BiConsumer[String, util.Map[String, String]]): Unit = {
     val brokerEntities = zkClient.getAllEntitiesWithConfig(ConfigType.Broker)
     zkClient.getEntitiesConfigs(ConfigType.Broker, brokerEntities.toSet).foreach { case (broker, props) =>
-      val brokerResource = fromZkEntityName(broker)
+      val brokerResource = fromZkConfigEntityName(broker)
       val decodedProps = props.asScala.map { case (key, value) =>
         if (DynamicBrokerConfig.isPasswordConfig(key))
           key -> passwordEncoder.decode(value).value
@@ -154,7 +168,7 @@ class ZkConfigMigrationClient(
   }
 
   override def readTopicConfigs(topicName: String, configConsumer: Consumer[util.Map[String, String]]): Unit = {
-    val topicResource = fromZkEntityName(topicName)
+    val topicResource = fromZkConfigEntityName(topicName)
     val props = zkClient.getEntityConfigs(ConfigType.Topic, topicResource)
     val decodedProps = props.asScala.map { case (key, value) =>
       if (DynamicBrokerConfig.isPasswordConfig(key))
@@ -179,7 +193,7 @@ class ZkConfigMigrationClient(
       case _ => None
     }
 
-    val configName = toZkEntityName(configResource.name())
+    val configName = toZkConfigEntityName(configResource.name())
     if (configType.isDefined) {
       val props = new Properties()
       configMap.forEach { case (key, value) =>
@@ -218,7 +232,7 @@ class ZkConfigMigrationClient(
       case _ => None
     }
 
-    val configName = toZkEntityName(configResource.name())
+    val configName = toZkConfigEntityName(configResource.name())
     if (configType.isDefined) {
       val path = ConfigEntityZNode.path(configType.get, configName)
       val requests = Seq(DeleteRequest(path, ZkVersion.MatchAnyVersion))
@@ -247,10 +261,9 @@ class ZkConfigMigrationClient(
     scram: util.Map[String, String],
     state: ZkMigrationLeadershipState
   ): ZkMigrationLeadershipState = wrapZkException {
-    val entityMap = entity.asScala
-    val user = entityMap.get(ClientQuotaEntity.USER).map(toZkEntityName)
-    val client = entityMap.get(ClientQuotaEntity.CLIENT_ID).map(toZkEntityName)
-    val ip = entityMap.get(ClientQuotaEntity.IP).map(toZkEntityName)
+    val user: Option[String] = getSanitizedClientQuotaZNodeName(entity, ClientQuotaEntity.USER)
+    val client: Option[String] = getSanitizedClientQuotaZNodeName(entity, ClientQuotaEntity.CLIENT_ID)
+    val ip: Option[String] = getSanitizedClientQuotaZNodeName(entity, ClientQuotaEntity.IP)
     val props = new Properties()
 
     val (configType, path, configKeys) = if (user.isDefined && client.isEmpty) {
@@ -348,3 +361,35 @@ class ZkConfigMigrationClient(
   }
 }
 
+object ZkConfigMigrationClient {
+  /**
+   * Find the znode name to use for a ClientQuotaEntity.
+   *
+   * @param entity      The client quota entity map. See org.apache.kafka.common.ClientQuotaEntity.
+   * @param component   The component that we want a znode name for.
+   * @return            Some(znodeName) if there is a znode path; None otherwise.
+   */
+  def getSanitizedClientQuotaZNodeName(
+    entity: util.Map[String, String],
+    component: String
+  ): Option[String] = {
+    if (!entity.containsKey(component)) {
+      // There is no znode path, because the component wasn't found. For example, if the
+      // entity was (user -> "bob") and our component was "ip", we would return None here.
+      None
+    } else {
+      val rawValue = entity.get(component)
+      if (rawValue == null) {
+        // A raw value of null means this is a default entity. For example, (user -> null) means
+        // the default user. Yes, this means we stored a null value in the map and it did not mean
+        // "not present." This is an unfortunate API that should be revisited at some point.
+        Some(ConfigEntityName.Default)
+      } else {
+        // We found a non-null value, and now we need to sanitize it. For example, "c@@ldude" will
+        // turn into c%40%40ldude, so that we can use it as a znode name in ZooKeeper.
+        Some(Sanitizer.sanitize(rawValue))
+      }
+    }
+  }
+}
+
diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index dbea33f9058..48b985e1732 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.metadata.{ConfigRecord, FeatureLevelRecord}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors._
+import org.apache.kafka.common.quota.ClientQuotaAlteration.Op
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
 import org.apache.kafka.common.requests.{ApiError, DescribeClusterRequest, DescribeClusterResponse}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
@@ -324,6 +325,68 @@ class KRaftClusterTest {
     }
   }
 
+  def setConsumerByteRate(
+    admin: Admin,
+    entity: ClientQuotaEntity,
+    value: Long
+  ): Unit = {
+    admin.alterClientQuotas(Collections.singletonList(
+      new ClientQuotaAlteration(entity, Collections.singletonList(
+        new Op("consumer_byte_rate", value.doubleValue()))))).
+        all().get()
+  }
+
+  def getConsumerByteRates(admin: Admin): Map[ClientQuotaEntity, Long] = {
+    val allFilter = ClientQuotaFilter.contains(Collections.emptyList())
+    val results = new java.util.HashMap[ClientQuotaEntity, Long]
+    admin.describeClientQuotas(allFilter).entities().get().forEach {
+      case (entity, entityMap) =>
+        Option(entityMap.get("consumer_byte_rate")).foreach {
+          case value => results.put(entity, value.longValue())
+        }
+    }
+    results.asScala.toMap
+  }
+
+  @Test
+  def testDefaultClientQuotas(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(1).
+        setNumControllerNodes(1).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState == BrokerState.RUNNING,
+        "Broker never made it to RUNNING state.")
+      val admin = Admin.create(cluster.clientProperties())
+      try {
+        val defaultUser = new ClientQuotaEntity(Collections.singletonMap[String, String]("user", null))
+        val bobUser = new ClientQuotaEntity(Collections.singletonMap[String, String]("user", "bob"))
+        TestUtils.retry(30000) {
+          assertEquals(Map(), getConsumerByteRates(admin))
+        }
+        setConsumerByteRate(admin, defaultUser, 100L)
+        TestUtils.retry(30000) {
+          assertEquals(Map(
+              defaultUser -> 100L
+            ), getConsumerByteRates(admin))
+        }
+        setConsumerByteRate(admin, bobUser, 1000L)
+        TestUtils.retry(30000) {
+          assertEquals(Map(
+            defaultUser -> 100L,
+            bobUser -> 1000L
+          ), getConsumerByteRates(admin))
+        }
+      } finally {
+        admin.close()
+      }
+    } finally {
+      cluster.close()
+    }
+  }
+
   @Test
   def testCreateClusterWithAdvertisedPortZero(): Unit = {
     val brokerPropertyOverrides: (TestKitNodes, BrokerNode) => Map[String, String] = (nodes, _) => Map(
diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index f8fe82ed9d0..187e89e770f 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -17,7 +17,7 @@
 package kafka.zk
 
 import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
-import kafka.server.{ConfigEntityName, ConfigType, ControllerRequestCompletionHandler, KafkaConfig}
+import kafka.server.{ConfigType, ControllerRequestCompletionHandler, KafkaConfig}
 import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance}
 import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTemplate, ClusterTest, Type}
 import kafka.test.junit.ClusterTestExtensions
@@ -40,7 +40,7 @@ import org.apache.kafka.common.resource.ResourcePattern
 import org.apache.kafka.common.resource.ResourceType.TOPIC
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
-import org.apache.kafka.common.utils.SecurityUtils
+import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils}
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
 import org.apache.kafka.metadata.authorizer.StandardAcl
 import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
@@ -216,11 +216,11 @@ class ZkMigrationIntegrationTest {
     createTopicResult.all().get(60, TimeUnit.SECONDS)
 
     val quotas = new util.ArrayList[ClientQuotaAlteration]()
-    val defaultUserEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> ConfigEntityName.Default).asJava)
+    val defaultUserEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, null))
     quotas.add(new ClientQuotaAlteration(defaultUserEntity, List(new ClientQuotaAlteration.Op("consumer_byte_rate", 900.0)).asJava))
-    val defaultClientIdEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> ConfigEntityName.Default).asJava)
+    val defaultClientIdEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.CLIENT_ID, null))
     quotas.add(new ClientQuotaAlteration(defaultClientIdEntity, List(new ClientQuotaAlteration.Op("consumer_byte_rate", 900.0)).asJava))
-    val defaultIpEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.IP -> null.asInstanceOf[String]).asJava)
+    val defaultIpEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.IP, null))
     quotas.add(new ClientQuotaAlteration(defaultIpEntity, List(new ClientQuotaAlteration.Op("connection_creation_rate", 9.0)).asJava))
     val userEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user/1@prod").asJava)
     quotas.add(new ClientQuotaAlteration(userEntity, List(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0)).asJava))
@@ -264,13 +264,14 @@ class ZkMigrationIntegrationTest {
       assertEquals(10, image.topics().getTopic("test-topic-3").partitions().size())
 
       val clientQuotas = image.clientQuotas().entities()
-      assertEquals(6, clientQuotas.size())
-      assertEquals(true, clientQuotas.containsKey(defaultUserEntity))
-      assertEquals(true, clientQuotas.containsKey(defaultClientIdEntity))
-      assertEquals(true, clientQuotas.containsKey(new ClientQuotaEntity(Map(ClientQuotaEntity.IP -> "").asJava))) // default ip
-      assertEquals(true, clientQuotas.containsKey(userEntity))
-      assertEquals(true, clientQuotas.containsKey(userClientEntity))
-      assertEquals(true, clientQuotas.containsKey(ipEntity))
+      assertEquals(new java.util.HashSet[ClientQuotaEntity](java.util.Arrays.asList(
+        defaultUserEntity,
+        defaultClientIdEntity,
+        defaultIpEntity,
+        userEntity,
+        userClientEntity,
+        ipEntity
+      )), clientQuotas.keySet())
     }
 
     migrationState = migrationClient.releaseControllerLeadership(migrationState)
@@ -832,11 +833,14 @@ class ZkMigrationIntegrationTest {
   def alterClientQuotas(admin: Admin): AlterClientQuotasResult = {
     val quotas = new util.ArrayList[ClientQuotaAlteration]()
     quotas.add(new ClientQuotaAlteration(
-      new ClientQuotaEntity(Map("user" -> "user1").asJava),
+      new ClientQuotaEntity(Map("user" -> "user@1").asJava),
       List(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0)).asJava))
     quotas.add(new ClientQuotaAlteration(
-      new ClientQuotaEntity(Map("user" -> "user1", "client-id" -> "clientA").asJava),
+      new ClientQuotaEntity(Map("user" -> "user@1", "client-id" -> "clientA").asJava),
       List(new ClientQuotaAlteration.Op("consumer_byte_rate", 800.0), new ClientQuotaAlteration.Op("producer_byte_rate", 100.0)).asJava))
+    quotas.add(new ClientQuotaAlteration(
+      new ClientQuotaEntity(Collections.singletonMap("user", null)),
+      List(new ClientQuotaAlteration.Op("consumer_byte_rate", 900.0), new ClientQuotaAlteration.Op("producer_byte_rate", 100.0)).asJava))
     quotas.add(new ClientQuotaAlteration(
       new ClientQuotaEntity(Map("ip" -> "8.8.8.8").asJava),
       List(new ClientQuotaAlteration.Op("connection_creation_rate", 10.0)).asJava))
@@ -854,7 +858,7 @@ class ZkMigrationIntegrationTest {
     val alterations = new util.ArrayList[UserScramCredentialAlteration]()
     alterations.add(new UserScramCredentialUpsertion("user1",
         new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 8191), "password1"))
-    alterations.add(new UserScramCredentialUpsertion("user2",
+    alterations.add(new UserScramCredentialUpsertion("user@2",
         new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 8192), "password2"))
     admin.alterUserScramCredentials(alterations)
   }
@@ -869,20 +873,21 @@ class ZkMigrationIntegrationTest {
 
   def verifyClientQuotas(zkClient: KafkaZkClient): Unit = {
     TestUtils.retry(10000) {
-      assertEquals("1000", zkClient.getEntityConfigs(ConfigType.User, "user1").getProperty("consumer_byte_rate"))
-      assertEquals("800", zkClient.getEntityConfigs("users/user1/clients", "clientA").getProperty("consumer_byte_rate"))
-      assertEquals("100", zkClient.getEntityConfigs("users/user1/clients", "clientA").getProperty("producer_byte_rate"))
+      assertEquals("1000", zkClient.getEntityConfigs(ConfigType.User, Sanitizer.sanitize("user@1")).getProperty("consumer_byte_rate"))
+      assertEquals("900", zkClient.getEntityConfigs(ConfigType.User, "<default>").getProperty("consumer_byte_rate"))
+      assertEquals("800", zkClient.getEntityConfigs("users/" + Sanitizer.sanitize("user@1") + "/clients", "clientA").getProperty("consumer_byte_rate"))
+      assertEquals("100", zkClient.getEntityConfigs("users/" + Sanitizer.sanitize("user@1") + "/clients", "clientA").getProperty("producer_byte_rate"))
       assertEquals("10", zkClient.getEntityConfigs(ConfigType.Ip, "8.8.8.8").getProperty("connection_creation_rate"))
     }
   }
 
   def verifyUserScramCredentials(zkClient: KafkaZkClient): Unit = {
     TestUtils.retry(10000) {
-      val propertyValue1 = zkClient.getEntityConfigs(ConfigType.User, "user1").getProperty("SCRAM-SHA-256")
+      val propertyValue1 = zkClient.getEntityConfigs(ConfigType.User, Sanitizer.sanitize("user1")).getProperty("SCRAM-SHA-256")
       val scramCredentials1 = ScramCredentialUtils.credentialFromString(propertyValue1)
       assertEquals(8191, scramCredentials1.iterations)
 
-      val propertyValue2 = zkClient.getEntityConfigs(ConfigType.User, "user2").getProperty("SCRAM-SHA-256")
+      val propertyValue2 = zkClient.getEntityConfigs(ConfigType.User, Sanitizer.sanitize("user@2")).getProperty("SCRAM-SHA-256")
       assertNotNull(propertyValue2)
       val scramCredentials2 = ScramCredentialUtils.credentialFromString(propertyValue2)
       assertEquals(8192, scramCredentials2.iterations)
diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala
index 243209d23bf..f04720399a8 100644
--- a/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala
@@ -141,15 +141,17 @@ class ZkConfigMigrationClientTest extends ZkMigrationTestHarness {
     RecordTestUtils.replayAllBatches(delta, batches)
     val image = delta.apply()
 
-    assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("user" -> "").asJava)))
-    assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("user" -> "user1").asJava)))
-    assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("user" -> "user1", "client-id" -> "clientA").asJava)))
-    assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("user" -> "", "client-id" -> "").asJava)))
-    assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("user" -> "", "client-id" -> "clientA").asJava)))
-    assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("client-id" -> "").asJava)))
-    assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("client-id" -> "clientB").asJava)))
-    assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("ip" -> "1.1.1.1").asJava)))
-    assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("ip" -> "").asJava)))
+    assertEquals(new util.HashSet[ClientQuotaEntity](java.util.Arrays.asList(
+      new ClientQuotaEntity(Map("user" -> null.asInstanceOf[String]).asJava),
+      new ClientQuotaEntity(Map("user" -> "user1").asJava),
+      new ClientQuotaEntity(Map("user" -> "user1", "client-id" -> "clientA").asJava),
+      new ClientQuotaEntity(Map("user" -> null.asInstanceOf[String], "client-id" -> null.asInstanceOf[String]).asJava),
+      new ClientQuotaEntity(Map("user" -> null.asInstanceOf[String], "client-id" -> "clientA").asJava),
+      new ClientQuotaEntity(Map("client-id" -> null.asInstanceOf[String]).asJava),
+      new ClientQuotaEntity(Map("client-id" -> "clientB").asJava),
+      new ClientQuotaEntity(Map("ip" -> "1.1.1.1").asJava),
+      new ClientQuotaEntity(Map("ip" -> null.asInstanceOf[String]).asJava))),
+      image.entities().keySet())
   }
 
   @Test
@@ -185,7 +187,7 @@ class ZkConfigMigrationClientTest extends ZkMigrationTestHarness {
     assertEquals(4, migrationState.migrationZkVersion())
 
     migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
-      Map(ClientQuotaEntity.USER -> ""),
+      Map(ClientQuotaEntity.USER -> null.asInstanceOf[String]),
       Map(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 200.0),
       ConfigType.User, "<default>")
     assertEquals(5, migrationState.migrationZkVersion())